Click here to Skip to main content
15,918,193 members
Articles / Web Development / Apache

Apache Kafka 0.9 Scala Producer/Consumer With Some RxScala Pixie Dust

Rate me:
Please Sign up or sign in to vote.
5.00/5 (12 votes)
15 Mar 2016CPOL17 min read 37.2K   11   11
An article on how to use the open source Apache Kafka messaging framework, with a bit of RxScala thrown in for good luck

Introduction

For my job at the moment, I am roughly spending 50% of my time working on .NET and the other 50% of the time working with Scala. As such a lot of Scala/JVM toys have spiked my interest of late. My latest quest was to try and learn Apache Kafka, well enough that I at least understood the core concepts. I have even read a book or two on  Apache Kafka, now, so feel I am at least talking partial sense in this article.

So what is Apache Kafka, exactly?

Here is what the Apache Kafka folks have to say about their own tool.

Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.


Fast
A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients.

Scalable
Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers

Durable
Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact.

Distributed by Design
Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.

Taken from http://kafka.apache.org/ up on date 11/03/16

Apache Kafka was designed and built by a team of engineers at LinkedIn, where I am sure you will agree they probably had to deal with quite a bit of data.

In this article I will talk you through some of the core Apache Kafka concepts, and will also show how to create a Scala Apache Kafka Producer and a Scala Apache Kafka Consumer. I will also sprinkle some RxScala pixie dust on top of the Apache Kafka Consumer code such that the RX operators to be applied to the incoming Apache Kafka messages.

I have actually tried to emulate a popular .NET article I wrote a while back using SignalR and RX wired up to a simple WPF user interface. Where it makes sense in the sections below I will call out to this .NET article as it may be of interest to some of you.

 

Where Is The Code?

I have put the code up on my GitHub account. There are actually 2 branches for this Git repo.  

  1. There is a branch where I got the basic plumbing working, just simple Producer/Consumer. Where Producer sends JSON to Consumer. You can grab that code from this branch
  2. There is another branch which I fleshed things out a lot more, and introduced a more structured/re-usable/generic Consumer API. As I state above, I also introduced RxScala into the mix, such that Rx could piggy back the Kafka consumer(s) incoming messages. You can grab the more fleshed out version (that this article is based on) from this branch

 

Kafka Concepts

This section will talk about core Apache Kafka concepts that will help you understand the code a bit better.

 

General Idea

As already stated Apache Kafka is a state of the art distributed / fault tolerant messaging system. It also has some other things up its sleeve, such as the ability to do sharding, pub and event sourcing.

Lets talk a bit about the simple operations.

There is a producer that produces data, and it will would typically connect to a Apache Kafka broker (more than likely in a cluster) the producer would pump out data on what is known as a "Topic". A topic is some identification that both producer and consumer agree upon. The producer will produce messages on a topic, and a consumer can be set up to read from a topic. In its most basic form, that is it. However lets talk about some of the nice stuff mentioned above

 

Partitioning

So Apache Kafka has the concept of partitions for a given topic. What this means is that you can come up with some way of splitting the messages across multiple partitions. Say you logged messages by IP Address, you might decide that all messages that start with 127.192 would go to one partition and all others would go somewhere else. Apache Kafka allows this. You would have to write some code for your specific user case but its possible.

 

Pub Sub vs Queues

Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each message goes to one of them; in publish-subscribe the message is broadcast to all consumers. Apache Kafka offers a single consumer abstraction that generalizes both of these—the consumer group.


Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

Perhaps the following diagram will illustrate this a bit better

 

Image 1

 

Event Sourcing

Now it may not be that obvious that a distributed clustered fault tolerant messaging system can be used for event sourcing. However Apache Kafka has the idea of a commit log (per partition) that is able to store message (on disk) for a fixed period of time. Consumer can choose to commit to the log, which moves their index into it, but they may also move back to a certain time and consumer all messages from the log from that index.

The index is committed by the consumer but is maintained in Apache Zookeeper which is a crazed bit of software which is used to co-ordinate distributed applications. I may do another post on ZooKeeper, but that's another story. For now just know that ZooKeeper is the one that maintains the index (for each partition).

 

Producer(s)

The producers job is to publish messages to a topic and that is about all the consumer really has to worry about. As stated above it the consumer which decides which consumer group they will use, which in turn dictates which consumer(s) get to see which messages.

 

Consumer(s)

Consumers are expected to provide several bits of information. Namely:

  • Apache Kafka broker connection details
  • Topic details
  • Consumer group details

It is by supplying these details that the messages are correctly received in the consumer(s).

 

Partition Offset

As mentioned above Apache Kafka has the concept of a commit log. It is in fact slightly more complex, as each partition has a commit log. The partitioning strategy is something that your application code must define. However for every partition for a topic there is a separate commit log/offset. This offset it adjusted by the consumer, where the actual value is stored in Zookeeper (Apache Kafka takes care of that part).

It should be noted that even though you can make multithreaded consumers, only one thread may access the offset for a partition, so even if you had multiple threads running doing the same thing, only one of them would be able to adjust the commit log.

Image 2

 

Consumer Groups

Image 3

As stated above there is a concept of a "consumer group" which the consumers use. If more than one consumer share a consumer group then the message will go to one of the consumers. If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

 

How Consumer(s) Use Zookeeper

Apache Kafka uses another Apache project internally, namely Apache Zookeeper. Apache Zookeeper is used to store the consumer offsets. It is outside the scope of this article to get you all familiar with Apache Zookeeper, but in a nutshell Apache Zookeeper is used to provide distributed application synchronization.

 

A Demo App

This section will talk you through a simple Scala based demo app, which does the following:

  • Has a simple Scala producer that pumps out JSON serialized messages to a topic/consumer group
  • Has a simple Scala consumer that is capable of read from many topics
  • Has a generic abstraction for a general purpose Apache Kafka client, that also exposes the incoming messages as a RxScala Observable[T] stream
  • Has a generic repository abstraction. Such that a single repository may be used for monitoring a particular  Apache Kafka topic

 

Setup

If you were running things in a production environment this section would not apply. If however you are like me, and like to try things in isolation in the safety of your own box (where my own box is a Windows 10 one), you should read on.

Essentially for production you would have clusters of Apache Kafka and more than like  a Zookeeper cluster too, which would all run on Linux servers.

However for trying stuff out this is a bit cumbersome. So lets stick with trying to get Apache Kafka / Zookeeper working as a single local instance on a Windows machine. How do we do that?

The best place to start is actually following another codeproject tutorial :

www.codeproject.com/Articles/1068998/Running-Apache-Kafka-on-Windows-without-Cygwin

If you follow that guide all the way to the end you will have a pretty good understanding of what you need to do in order to get Apache Kafka / Zookeeper up and running.

 

 

Tasks That Must Be Done Every Time You Restart Your Box

There are a number of tasks you will need to do every time you restart your box. Essentially you MUST do these 2 steps before you can expect the Producer/Consumer code attached to this article to work for you.

Associated with the github code is a file called "Kafka.txt" which contains some useful notes.

You will need to ensure you run these 2 command lines (as Administrator) each time you wish to run the Producer/Consumer code attached to this article. 

NOTE : You may have to change the paths depending on where you installed things

 

1. Running Zookeeper
cd C:\Apache\Apache-zookeeper-3.4.6\bin
zkserver


2. Running Kafka
cd C:\Apache\Apache-Kafka-0.9.0.0
.\bin\windows\kafka-server-start.bat .\config\server.properties

 

 

Creating The Topics (Do this only once)

You will also need to create the topics for the attached Producer/Consumer code. You will only have to do this one, and you must also ensure both Kafka/Zookeeper are running, see 2 steps above.

Once you know they are both running we can simply use the following command line


cd C:\Apache\Apache-Kafka-0.9.0.0\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fast-messages
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic order-placed-messages

 

Where it can be seen that we setup certain things for the topic, such as which ZooKeeper broker to use, replication/partition count. All good stuff I am sure you will agree.

 

 

 

The Producer

The producer is the easier component to explain, and essentially consists of a couple of classes and a config file. We will now go through each of them in turn

 

Producer Properties

The producer needs certain properties to be filled in order to run. For this we just use Google Guava library, which allows us to read config values easily.

Here is the properties file for the producer

bootstrap.servers=localhost:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true

 

The Messages

The messages themselves are simple case classes that are serialized across the wire using the Play JSON library. Here are the messages that are included with the Producer.

Scala
package Messages

import play.api.libs.json.Json

//==============================
// serialize to json
//==============================
// val json = Json.toJson(FastMessage("test", 30))
//==============================
// deserialize
//==============================
// var otherFast = json.as[FastMessage]
// val name = otherFast.name
// println(s"The name of the other fast one is $name")
case class FastMessage(name: String, number: Int)

object FastMessageJsonImplicits {
  implicit val fastMessageFmt = Json.format[FastMessage]
  implicit val fastMessageWrites = Json.writes[FastMessage]
  implicit val fastMessageReads = Json.reads[FastMessage]
}



package Messages

import play.api.libs.json.Json

//==============================
// serialize to json
//==============================
// val json = Json.toJson(OrderPlacedMessage(new java.util.Date()))
//==============================
// deserialize
//==============================
// var theOrder = json.as[OrderPlacedMessage]
// val timeStamp = theOrder.timeOfMessage
// println(s"The name of the other is $timeStamp")
case class OrderPlacedMessage(timeOfMessage: java.util.Date)

object OrderPlacedMessageJsonImplicits {
  implicit val orderPlacedMessageFmt = Json.format[OrderPlacedMessage]
  implicit val orderPlacedMessageWrites = Json.writes[OrderPlacedMessage]
  implicit val orderPlacedMessageReads = Json.reads[OrderPlacedMessage]
}

Not much more to say there, these objects are pretty much just property bags to be honest.

 

The Producer Loop

The producer needs to produce the messages. How the producer produces messages is up to your business needs. For me I simply wanted a loop that would spit out new messages every so often. To do this I have a new Runnable (threading abstraction) that will run the message producing logic (for a given message type) every x-time. As there are 2 messages that the demo code supports there are 2 such Runnable classes. We will look at this in one second.

For now here is the producers code, where it can be seen that this class pretty much just hands of the execution of the 2 message runners to the standard Executors.newSingleThreadScheduledExecutor JVM executor.

Scala
import org.apache.kafka.clients.producer.KafkaProducer
import java.util.Properties
import com.google.common.io.Resources
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

object ScalaProducer {
  def main(args: Array[String]): Unit = {
    val scalaProducer = new ScalaProducer()
    scalaProducer.run(args)
  }
}

/**
  * This producer will send a bunch of messages to topic "fast-messages". Every so often,
  * it will send a message to "heartBeat-messages". This shows how messages can be sent to
  * multiple topics. On the receiving end, we will see both kinds of messages but will
  * also see how the two topics aren't really synchronized.
  */
class ScalaProducer {

  def run(args: Array[String]) : Unit = {

    println("Press enter to start producer")
    scala.io.StdIn.readLine()

    var producer : KafkaProducer[String, String] = null
    var closeableKafkaProducer : CloseableKafkaProducer = null

    try {
      val props = Resources.getResource("producer.props").openStream()
      val properties = new Properties()
      properties.load(props)
      producer = new KafkaProducer[String,String](properties)
      closeableKafkaProducer = new CloseableKafkaProducer(producer)

      //"fast-messages"
      val fastMessageRunnable = 
		new FastMessageRunnable("fast-messages",closeableKafkaProducer)
      val fastMessageRunnerScheduler = 
		Executors.newSingleThreadScheduledExecutor()
      fastMessageRunnerScheduler.scheduleAtFixedRate(fastMessageRunnable, 0, 3, TimeUnit.SECONDS);

      //"order-placed-messages"
      val orderPlacedMessageRunnable = 
		new OrderPlacedMessageRunnable("order-placed-messages",closeableKafkaProducer)
      val orderPlacedMessageScheduler = 
		Executors.newSingleThreadScheduledExecutor()
      orderPlacedMessageScheduler.scheduleAtFixedRate(orderPlacedMessageRunnable, 0, 1, TimeUnit.SECONDS);

      println("producing messages")
      scala.io.StdIn.readLine()

    }
    catch {
        case throwable : Throwable =>
          val st = throwable.getStackTrace()
          println(s"Got exception : $st")
    }
    finally {
      if(closeableKafkaProducer != null) {
        closeableKafkaProducer.closeProducer()
      }
    }
  }
}

 

A Runnable For The Message Production

As stated above I took the decision to put the producing of each message in its own Runnable, which would be shceduled to produce a new message of the required type every x-time. So what do these Runnable classes look like. Well they start with a simple base class that does the actual publishing of the message using the KafkaProducer[String,String] class, which I wrap like this just so I can close it safely at any time:

Scala
import org.apache.kafka.clients.producer.KafkaProducer

class CloseableKafkaProducer(val producer : KafkaProducer[String, String]) {
  var isClosed : Boolean = false

  def closeProducer() : Unit = {
    if(!isClosed) {
      producer.close()
    }
  }
}

 

The actual Runnable code looks like this, where we simple use the  KafkaProducer[String,String] send(..) method.

Scala
import org.apache.kafka.clients.producer.ProducerRecord

abstract class MessageRunnable extends Runnable {

  val topic : String
  val closeableKafkaProducer : CloseableKafkaProducer

  override def run() : Unit = {
    try {
      val message = produceMessage()
      println("running > " + message)
      closeableKafkaProducer.producer.send(new ProducerRecord[String, String](topic, message))
      closeableKafkaProducer.producer.flush()
      println(s"Sent message on topic '$topic'")
    }
    catch {
      case throwable: Throwable => {
        val errMessage = throwable.getMessage
        println(s"Got exception : $errMessage")
        closeableKafkaProducer.closeProducer()
      }
    }
  }

  def produceMessage() : String
}

Inheritors of this class are used to create the correct JSON format to send across the wire, as shown below for the "fast-messages" topic where we make use of the FastMessage scala case class, which is sent as a JSON payload.

Scala
import java.util.Calendar
import play.api.libs.json.Json
import Messages.{FastMessage}
import Messages.FastMessageJsonImplicits._

class FastMessageRunnable(
	val topic : String, 
	val closeableKafkaProducer : CloseableKafkaProducer) 
  extends MessageRunnable {
	
  override def produceMessage(): String = {
    Json.toJson(FastMessage("FastMessage_" + 
    	Calendar.getInstance().getTime().toString(),1)).toString()
  }
}

 

The Consumer

NOTE : For the consumer I only setup the "fast-messages" topic code, but you would just follow how that works for the "order-placed-messages" topic. Should be identical setup

The consumer is a bit more complicated than the producer, but this is only the case due to the fact that I wanted to carry out certain things, such as :

  • I wanted to use Rx
  • I wanted to come up with a generic approach, such that it would act more like a repository, that you would just use if you wanted to listen to a certain message type. Essentially the messages would be exposed as an Observable[T] such that you could use all the fancy Rx (for scala) goodies.

I will walk through each of the composite parts that make this happen. 

 

Consumer Properties

The consumer needs certain properties to be filled in order to run. For this we just use Google Guava library, which allows us to read config values easily.

Here is the properties file for the producer

bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000

# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way.  No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152

 

 

The Underlying Kafka Consumer Bit

As I stated above I wanted to be able to use Rx and create re-usable bit of the architecture. As such I put my thinking cap on, and decided what I needed was a generic Kafka consumer, and a specialization of that, that simply dealt with the correct type of JSON message deserialization (like we did for the producer). To this end there is one common base class (there will only ever be this one, see GenericKafkaConsumer) and one consumer class (see FastMessageKafkaConsumer) per message type.

Lets start with the common Kafka consumer base class, where the following are the main points of the code

  • That this generic class takes a topic, and will only deal with the message if the incoming record topic matches the requested topic
  • There is a message reading loop which is making use of the KafkaConsumer[string, string] to read the messages of the wire
  • That there is a abstract method readTopicJson(..) which is expected to be implemented by inheritors of this class
  • That there is an RxScala Subject being used to expose the incoming messages as an Observable[T]

 

GenericKafkaConsumer

Scala
import java.io.Closeable
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
import Messages.FastMessage
import com.google.common.io.Resources
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Arrays
import java.util.Properties
import java.util.Random
import play.api.libs.json.{Reads, Json}
import rx.lang.scala.Observable
import rx.lang.scala.subjects.PublishSubject

///**
//  * This program reads messages from a single topic.
//  * Whenever a message is received it is pumped out using RX
//  */
abstract class GenericKafkaConsumer[T](topic : String) extends Closeable with Runnable {
  val topicSubject = PublishSubject.apply[T]()
  var consumer : KafkaConsumer[String, String] = null
  val pool : ExecutorService = Executors.newFixedThreadPool(1)
  var shouldRun : Boolean = true

  def startConsuming() : Unit = {
    pool.execute(this)
  }


  def run() : Unit = {

    try {

      val props = Resources.getResource("consumer.props").openStream()
      val properties = new Properties()
      properties.load(props)
      if (properties.getProperty("group.id") == null) {
        properties.setProperty("group.id", "group-" + new Random().nextInt(100000))
      }
      consumer = new KafkaConsumer[String, String](properties)
      consumer.subscribe(Arrays.asList(topic))
      var timeouts = 0

      println(s"THE TOPIC IS : $topic")

      while (shouldRun) {
        println("consumer loop running, wait for messages")
        // read records with a short timeout. If we time out, we don't really care.
        val records : ConsumerRecords[String, String] = consumer.poll(200)
        val recordCount = records.count()
        if (recordCount == 0) {
          timeouts = timeouts + 1
        } else {
          println(s"Got $recordCount records after $timeouts timeouts\n")
          timeouts = 0
        }
        val it = records.iterator()
        while(it.hasNext()) {
          val record : ConsumerRecord[String,String] = it.next()
          val recordTopic = record.topic()
          if(recordTopic == topic) {
            val message = readTopicJson(record,topic)
            message.map(x =>  {
              println(s"Message about to be RX published is $x")
              topicSubject.onNext(x)
              consumer.commitSync()
            })
          }
          else {
            println(s"Unknown message seen for topic '$recordTopic' .....crazy stuff")
          }
        }
      }
    }
    catch {
      case throwable : Throwable =>
        shouldRun = false
        val st = throwable.getStackTrace()
        println(s"Got exception : $st")
        topicSubject.onError(throwable)
    }
    finally {
      shutdownAndAwaitTermination(pool)
    }
  }

  protected def readJsonResponse[T](
         record: ConsumerRecord[String,String],
         topicDescription : String)(implicit reader: Reads[T]) : Option[T] = {
    try {
      println(s"$topicDescription >")
      Some(Json.parse(record.value()).as[T])
    }
    catch {
      case throwable: Throwable =>
        val st = throwable.getStackTrace()
        println(s"Got exception : $st")
        None
    }
  }

  def getMessageStream() : Observable[T]  = {
    topicSubject.asInstanceOf[Observable[T]]
  }

  override def close() : Unit = {
    println(s"GneericKafkaConsumer closed")
    shouldRun = false
    shutdownAndAwaitTermination(pool)
  }

  def shutdownAndAwaitTermination(pool : ExecutorService) : Unit = {
    // Disable new tasks from being submitted
    pool.shutdown()
    try {
      // Wait a while for existing tasks to terminate
      if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
        // Cancel currently executing tasks
        pool.shutdownNow()
        // Wait a while for tasks to respond to being cancelled
        if (!pool.awaitTermination(60, TimeUnit.SECONDS))
          println("Pool did not terminate")
      }
    }
    catch {
      case throwable : Throwable =>
        val st = throwable.getStackTrace()
        println(s"Got exception : $st")
        // (Re-)Cancel if current thread also interrupted
        pool.shutdownNow()
        // Preserve interrupt status
        Thread.currentThread().interrupt()
    }
  }

  //implemented by inheritors
  def readTopicJson(record : ConsumerRecord[String,String], topic : String) : Option[T]
}

 

So now that we have gone through what this generic base class does, lets look at the specialization.

 

FastMessageKafkaClient

IMPORTANT NOTE : You will need one of these per message type, and you will need to a mapping to the MessageClient class too.

So we have a nice base class that does most of the work for us. The only thing it doesn't do is the deserialization. For that I decided to use a specialization of the base class that would do this work. Here is an example of that for the FastMessage. As with the producer side, I make use of the Play JSON library.

Scala
import Messages.FastMessage
import Messages.FastMessageJsonImplicits._
import org.apache.kafka.clients.consumer.ConsumerRecord


class FastMessageKafkaConsumer
  extends GenericKafkaConsumer[FastMessage](Consumers.fastMessageTopic) {

  def pushOneOut(m : FastMessage) : Unit = {
    topicSubject.onNext(m)
  }

  override def readTopicJson(
          record : ConsumerRecord[String,String],
          topic : String) : Option[FastMessage] = {
    readJsonResponse[FastMessage](record,topic)
  }
}

 

Making A Generic Re-Usable RxScala Message Client

So far we have a generic Kafka consumer that exposes a RxScala Subject as an Observable[T], which gets its OnNext called whenever an imcoming message is read that matches the topic of the consumer. We have also seen that there is specializations of this generic base class that provide the correct JSON deserialization.

So what else did we want to do?

Well if you recall I said I wanted to achieve the following:

  • I wanted to use Rx
  • I wanted to come up with a generic approach, such that it would act more like a repository, that you would just use if you wanted to listen to a certain message type. Essentially the messages would be exposed as an Observable[T] such that you could use all the fancy Rx (for scala) goodies.

So how do we do that. Well put quite simply we want to create a better, more reliable, retryable Observable[T] stream. In the demo code this is the job of the MessageClient class that looks like this:

Scala
import rx.lang.scala.subscriptions.CompositeSubscription
import rx.lang.scala.{Subscription, Observable}

class MessageClient() {
  val consumerMap = setupMap()

  def setupMap() : Map[String, (() => GenericKafkaConsumer[AnyRef])] = {
    val map = Map[String, () => GenericKafkaConsumer[AnyRef]]()
    //TODO : you would need to add other consumers to the map here
    //TODO : you would need to add other consumers to the map here
    //TODO : you would need to add other consumers to the map here
    //TODO : you would need to add other consumers to the map here
    val updatedMap = map + (Consumers.fastMessageTopic, () =>
    {
      new FastMessageKafkaConsumer().asInstanceOf[GenericKafkaConsumer[AnyRef]]}
    )
    updatedMap
  }

  def getMessageStreamForTopic[T](topic : String) : Observable[T] = {
    Observable.create[T](observer => {
      consumerMap.get(topic) match {
        case Some(messageFactory) => {
          try {
            val streamSource = messageFactory().asInstanceOf[GenericKafkaConsumer[T]]
            streamSource.startConsuming()
            val sub = streamSource.getMessageStream().subscribe(observer)
            CompositeSubscription(sub, Subscription(streamSource.close()))
          }
          catch {
            case throwable : Throwable =>
              val st = throwable.getStackTrace()
              println(s"Got exception : $st")
              Subscription()
          }
        }
        case _ => {
            println("OH NO THATS BAD")
            observer.onCompleted()
            Subscription()
        }
      }
    }).publish.refCount
  }
}

Essentially what is going on here, is that use Observable.Create(..) to create an Observable[T] for the topic that is being requested. We also ensure that we use publish to ensure that the stream is shared (for late subscribers) and we use refCount to use the automatic disposal when there are no active subscriptions left.

 

 

The Generic Repository

Now that we have a nice MessageClient class that does most of the work for us, we just need to make use of it within the generic repository code. One could argue that the MessageClient and the code below could be squished together into one class, but it just felt a better separation of concerns to me to have separate classes.

The main points below are

  • That we new up the MessageClient to create the mappings, and deal with the stream creation (where it uses the previously discussed consumer classes).
  • That we use several Rx operators that will repeat the stream on failure, will also publish it to share the stream, and will use the auto dispose semantics of refCount
  • We use Observable.defer(..) to act as a factory that only returns the Observable[T] when its called the first time
Scala
import rx.lang.scala.Observable

object GenericRepository {

  private lazy val messageClient = new MessageClient()

  def GetMessageStream[T](topic: String): Observable[T] = {
    Observable.defer[T](messageClient.getMessageStreamForTopic[T](topic))
      .repeat
      .publish
      .refCount
  }
}

 

Final Usage

With all this in place the final usage ends up looking like this, which I am sure you will agree is pretty simple.

Scala
import Messages.FastMessage

object ScalaConsumer {

  def main(args: Array[String]): Unit = {

    var subs = GenericRepository.GetMessageStream[FastMessage](Consumers.fastMessageTopic)
      .subscribe(x => {
        println(s"RX SUBJECT stuff working, got this FAST MESSAGE : $x")
      })
  }
}

 

The Rx parts of this article are very similar to techniques I have used in the .NET world, so I am pretty ok with their usage. If you are interested in the .NET articles here are the links:

 

Proof That All This Stuff Actually Works

So we have covered a lot of ground, so how about a nice screen shot to prove that all this stuff actually works.

The screen shot below shows the producer and a consumer (remember the consumer provided within the demo code at GitHub only consumes the "fast-messages" topic, which is why you NEVER see any consumer output for the "order-placed-messages"

Image 4

CLICK FOR BIGGER IMAGE

 

That's It

And that is all I have to say this time. I hope you have enjoyed it. If you have a comment/vote is always well received so don't be shy ;-)

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions

 
QuestionConsumer Properties File Pin
Member 1254878326-May-16 11:02
Member 1254878326-May-16 11:02 
AnswerRe: Consumer Properties File Pin
Member 1254878327-May-16 5:06
Member 1254878327-May-16 5:06 
GeneralRe: Consumer Properties File Pin
Sacha Barber27-May-16 6:27
Sacha Barber27-May-16 6:27 
GeneralRe: Consumer Properties File Pin
Sacha Barber27-May-16 6:28
Sacha Barber27-May-16 6:28 
GeneralRe: Consumer Properties File Pin
Member 125487837-Jun-16 10:14
Member 125487837-Jun-16 10:14 
GeneralRe: Consumer Properties File Pin
Sacha Barber7-Jun-16 10:19
Sacha Barber7-Jun-16 10:19 
GeneralRe: Consumer Properties File Pin
Member 125487837-Jun-16 10:41
Member 125487837-Jun-16 10:41 
GeneralRe: Consumer Properties File Pin
Sacha Barber7-Jun-16 18:40
Sacha Barber7-Jun-16 18:40 
PraiseRe: Consumer Properties File Pin
Member 1507871619-Jul-22 8:19
Member 1507871619-Jul-22 8:19 
QuestionComparison to RabbitMQ? Pin
Marc Clifton16-Mar-16 5:27
mvaMarc Clifton16-Mar-16 5:27 
AnswerRe: Comparison to RabbitMQ? Pin
Sacha Barber16-Mar-16 5:53
Sacha Barber16-Mar-16 5:53 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.