No way … you’re seriously telling me you use JSON for your messaging protocol! Come on!
Each to their own I guess, but we’ll be using Apache Avro today. As mentioned in previous posts we’ll be looking to use Apache Kafka later in the series ( if that’s what this is ) as our message bus between services … or distributed streaming platform as Kafka describes itself. And Avro works nicely with Kafka you see … after the setup.
To be fair, it is actually pretty natural to kick off using a “simple” JSON message protocol; it gets systems up and running fast. But ultimately there will be pain down the road as the solution doesn’t tend to scale when there are many services being plugged into the infrastructure. Unwieldy defensive code in the best case and brittle broken code in the worst. Or the rather misguided “Let’s add a dictionary for forwards/backwards compatibility” – ouch!
It is definitely worth a read of these blogs covering the topic in a little more detail from Confluent. Building a Streaming Platform and more specifically Why Avro For Kafka Data.
Utensils Required
As usual you can check out associated code in this blog’s github project.
Project Setup
Setup a project structure like …
project/build.properties
should contain …
sbt.version=0.13.15
and project/build.properties
like …
addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "0.15.1")
For more details on the excellent sbt-avrohugger see the github page. SBT Avrohugger makes the generated classes more idiomatic for Scala users, specifically generating case classes which work well from a functional programming style perspective as described in Scala Case Classes In Depth. It is no coincidence that AKKA encourages their use for Actor messaging.
Lastly build.sbt should look something like …
name := """messages""" organization := "com.brownian-motion-driven-dev.avro-messages" version := "0.0.1" scalaVersion := "2.11.11" sbtavrohugger.SbtAvrohugger.specificAvroSettings resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/" libraryDependencies ++= Seq( "org.apache.avro" % "avro" % "1.8.1", "org.scalatest" %% "scalatest" % "2.2.4" % "test" )
Lets Design Messages
Okay, so we want to send some kind of greeting message to another service. To give our pal a wee heads up as to how we’re feeling we’ll ensure the message has an emotion indicator. Robert Plutchik – unknown to me until now – suggested that human types have eight basic emotions. Sounds like an enum to me!
We’ll also have a string to represent the greeting message itself.
Create a file called Greeting.avsc
under src/main/avro
which in Avro speak looks like
{ "type": "record", "name": "Greeting", "namespace": "com.brownianmotiondrivendev.avromessages", "fields": [ { "name": "greetingEmotion", "type": { "name": "GreetingEmotion", "type": "enum", "symbols": [ "FEAR", "ANGER", "SADNESS", "JOY", "DISGUST", "TRUST", "ANTICIPATION" ] } }, {"name": "greetingDetail", "type": "string"} ] }
And if we do sbt compile
and then do an ls target/scala-2.11/src_managed/main/compiled_avro/com/brownianmotiondrivendev/avromessages
we should see 2 generated files.
GreetingEmotion.java Greeting.scala
Where Greeting.scala is a generated case class generated using the Avro Schema.
Simple Test
So lets do a literally happy 🙂 test! i.e. see if we can use the object … simple enough, we’ll create a scalatest object in src/test/scala/com/brownianmotiondrivendev/avromessages/
called GreetingTest.scala.
Which should look like
package com.brownianmotiondrivendev.avromessages import org.scalatest.FunSuite class GreetingTest extends FunSuite { test("Basic, literal 'Happy Path' Greeting test") { val happyGreeting = Greeting(greetingEmotion = GreetingEmotion.JOY, greetingDetail = "Top of the morning to you!") println(s"I have a feeling of ${happyGreeting.greetingEmotion.toString} so I say '${happyGreeting.greetingDetail}'") assert(happyGreeting.greetingEmotion == GreetingEmotion.JOY) } }
And sbt test
should be happy times … if somewhat useless. We want to use this as a message between services, not just a pain to set up object!
Testing Avro Class With Serialization and Deserialization
Here we are going to use the file system rather than Kafka … it’s easier 🙂 but the concept is the same obviously. In both cases we need to supply the code to serialize the message and later deserialize – doesn’t matter if the intermediary is the file system, a message bus or an avocado.
So file system testing wise just add the following test
test("Test using file (de)serialization") { val sadGreeting = Greeting(greetingEmotion = GreetingEmotion.SADNESS, greetingDetail = "Boo hoo! Hello.") val GreetingFileName = "greetingTest.avro" val GreetingAvroFile = new File(GreetingFileName) GreetingAvroFile.delete() // serialize val speculativeScreenplayDatumWriter: DatumWriter[Greeting] = new SpecificDatumWriter[Greeting](Greeting.SCHEMA$) val dataFileWriter: DataFileWriter[Greeting] = new DataFileWriter[Greeting](speculativeScreenplayDatumWriter) dataFileWriter.create(Greeting.SCHEMA$, GreetingAvroFile) dataFileWriter.append(sadGreeting) dataFileWriter.close() // Deserialize from disk val speculativeScreenplayDatumReader: DatumReader[Greeting] = new SpecificDatumReader[Greeting](Greeting.SCHEMA$) val dataFileReader: DataFileReader[Greeting] = new DataFileReader[Greeting](new File(GreetingFileName), speculativeScreenplayDatumReader) var deserialisedGreeting: Greeting = null // a var ... OMG, ahhhhhhh - avert your eyes!!!! while (dataFileReader.hasNext) { deserialisedGreeting = dataFileReader.next(deserialisedGreeting) println("okay: " + deserialisedGreeting + "; dokies") } GreetingAvroFile.delete() assert(sadGreeting.greetingEmotion == GreetingEmotion.SADNESS) }
and test should once again be happy .. or sad this time I suppose.
Hints using Avro with Kafka please?
Fair enough. You would require Kafka dependencies in your project, e.g. "com.typesafe.akka" %% "akka-stream-kafka" % "0.13"
or whatever is latest these days if you are using AKKA Streams. Then extend both Serializer
andDeserializer
from org.apache.kafka.common.serialization
and rather using a org.apache.avro.file.DataFileWriter
for example you would use the org.apache.avro.specific.SpecificDatumWriter
in conjunction with a java.io.ByteArrayOutputStream
Sounds a bit messy but actually is simpler to show. Without imports etc the Serialize version looks like …
class FinalScriptSerializer extends Serializer[FinalScript] { override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} override def serialize(topic: String, data: FinalScript): Array[Byte] = { val writer: DatumWriter[FinalScript] = new SpecificDatumWriter[FinalScript](FinalScript.SCHEMA$) val out = new ByteArrayOutputStream() val encoder = new EncoderFactory().binaryEncoder(out, null) writer.write(data, encoder) encoder.flush() new Array[Byte](1024) out.toByteArray } override def close(): Unit = {} }
… which you just plug into Kafka’s akka.kafka.ProducerSettings
The Deserializer is analogous.
Wrap Up
Yeah so that was a little prescriptive. Took me a wee while to get all that bashed together when I came across it so frankly this is a nice wee reminder for me if nothing else.
Seems a little of a detour from the general journey but I wanted to get that one down and may return to annotate this a little more at a later date.
Leave a Reply