Click here to Skip to main content
15,868,016 members
Articles / Web Development / React

Madcap Idea 10: Play Framework Reactive Kafka Producer

Rate me:
Please Sign up or sign in to vote.
5.00/5 (1 vote)
20 Sep 2017CPOL8 min read 7.7K   1  
Play Framework Reactive Kafka producer

Last Time

So last time, we walked through the Rating Kafka streams architecture and also showed how we can query the local state stores. I also stated that the standard KafkaProducer that was used in the last post was more for demonstration purposes and long term, we would like to swap that out with a Play framework REST endpoint that allowed us to publish a message straight from our app to the Kafka rating stream processing.

PreAmble

Just as a reminder, this is part of my ongoing set of posts which I talk about here, where we will be building up to a point where we have a full app using lots of different stuff, such as these:

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

Ok, so now that we have the introductions out of the way, let's crack on with what we want to cover in this post.

Where is the Code?

As usual, the code is on GitHub at https://github.com/sachabarber/MadCapIdea.

What Is This Post All About?

As stated in the last post, “Kafka interactive queries”, we used a standard KafkaProducer to simulate what would have come from the end user via the Play Framework API. This time, we will build out the Play Framework side of things, to include the ability to produce “rating” objects that are consumed via the rating Kafka Stream processing topology introduced in the last post

SBT

So we already had an SBT file inside of the PlayBackEndApi project, but we need to expand that to include support for a couple of things:

  • Reactive Kafka
  • Jackson JSON (play already comes with its own JSON support, but for the Kafka Serialization-DeSerialization (Serdes), I wanted to make sure it was the same as the Kafka Streams project

This means these additions to the built.sbt file:

val configVersion = "1.0.1"

libraryDependencies ++= Seq(
  "org.reactivemongo" %% "play2-reactivemongo" % "0.11.12",
  "com.typesafe.akka" % "akka-stream-kafka_2.11" % "0.17",
  "org.skinny-framework.com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.4",
  "com.typesafe"        % "config" % configVersion
)

Topics

We also want to ensure that we are using the same topics as the stream processing topology so I have just replicated that class (in reality, I should have made this stuff a common JAR, but meh).

package kafka.topics

object RatingsTopics {
    val RATING_SUBMIT_TOPIC = "rating-submit-topic"
    val RATING_OUTPUT_TOPIC = "rating-output-topic"
  }

Routing

As this is essentially a new route that would be called via the front end React app when a new Rating is given, we obviously need a new route/controller. The route is fairly simple which is as follows:

# Rating page
POST  /rating/submit/new                       controllers.RatingController.submitNewRating()

JSON

The new Rating route expects a Rating object to be provided as a POST in JSON. Here is the actual Rating object and play JSON handling for it:

package Entities

import play.api.libs.json._
import play.api.libs.functional.syntax._

case class Rating(fromEmail: String, toEmail: String, score: Float)

object Rating {
  implicit val formatter = Json.format[Rating]
}

object RatingJsonFormatters {

  implicit val ratingWrites = new Writes[Rating] {
    def writes(rating: Rating) = Json.obj(
      "fromEmail" -> rating.fromEmail,
      "toEmail" -> rating.toEmail,
      "score" -> rating.score
    )
  }

  implicit val ratingReads: Reads[Rating] = (
      (JsPath \ "fromEmail").read[String] and
      (JsPath \ "toEmail").read[String] and
      ((JsPath \ "score").read[Float])
    )(Rating.apply _)
}

Controller

So now that we have a route, let's turn our attention to the new RatingController. Which right now to just accept a new Rating just looks like this:

Java
package controllers

import javax.inject.Inject

import Actors.Rating.RatingProducerActor
import Entities.RatingJsonFormatters._
import Entities._
import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.libs.json._
import play.api.mvc.{Action, Controller}
import utils.Errors

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._

class RatingController @Inject()
(
  implicit actorSystem: ActorSystem,
  ec: ExecutionContext
) extends Controller
{

  //Error handling for streams
  //http://doc.akka.io/docs/akka/2.5.2/scala/stream/stream-error.html
  val decider: Supervision.Decider = {
    case _                      => Supervision.Restart
  }

  implicit val mat = ActorMaterializer(
    ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
  val childRatingActorProps = Props(classOf[RatingProducerActor],mat,ec)
  val rand = new Random()
  val ratingSupervisorProps = BackoffSupervisor.props(
    Backoff.onStop(
      childRatingActorProps,
      childName = s"RatingProducerActor_${rand.nextInt()}",
      minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2
    ).withSupervisorStrategy(
      OneForOneStrategy() {
        case _ => SupervisorStrategy.Restart
      })
    )

  val ratingSupervisorActorRef = 
    actorSystem.actorOf(
      ratingSupervisorProps, 
      name = "ratingSupervisor"
    )

  def submitNewRating = Action.async(parse.json) { request =>
    Json.fromJson[Rating](request.body) match {
      case JsSuccess(newRating, _) => {
        ratingSupervisorActorRef ! newRating
        Future.successful(Ok(Json.toJson(
          newRating.copy(toEmail = newRating.toEmail.toUpperCase))))
      }
      case JsError(errors) =>
        Future.successful(BadRequest(
          "Could not build a Rating from the json provided. " +
          Errors.show(errors)))
    }
  }
}

The main points from the code above are:

  • We use the standard Play framework JSON handling for the unmarshalling/marshalling to JSON
  • That controller route is async (see how it returns a Future[T])
  • That we will not process anything if the JSON is invalid
  • That the RatingController creates a supervisor actor that will supervise the creation of another actor namely a RatingProducerActor. It may look like this happens each time the RatingController is instantiated, which is true. However this only happens once as there is only one router in play, and the controller are created by the router. You can read more about this here. The short story is that the supervisor is created once, and the actor is supervises will be created using a BackOffSupervisor where the creation of the actor will be retried using an incremental back off strategy. We also use the OneForOneStrategy to ensure only the single failed child actor is effected by the supervisor.
  • That this controller is also responsible for creating a ActorMaterializer with a supervision strategy (more on this in the next section). The ActorMaterializer is used to create actors within Akka Streams workflows.

RatingProducerActor

The final part of the pipeline for this post is obviously to be able to write a Rating to a Kafka topic, via a Kafka producer. As already stated, I chose to use reactive a Reactive Kafka (akka streams Kafka producer which build upon Akka streams ideas, where we have Sinks/Sources/Flow/RunnableGraph all the good stuff. So here is the full code for the actor:

Java
package Actors.Rating

import Entities.Rating
import Serialization.JSONSerde
import akka.Done
import akka.actor.{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, MergeHub, Source}
import kafka.topics.RatingsTopics
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}


class RatingProducerActor(
    implicit materializer: ActorMaterializer,
    ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Rating]
  val ratingProducerSettings = ProducerSettings(
      context.system,
      new StringSerializer,
      new ByteArraySerializer)
    .withBootstrapServers(Settings.bootStrapServers)
    .withProperty(Settings.ACKS_CONFIG, "all")

  val ((mergeHubSink, killswitch), kafkaSourceFuture) =
    MergeHub.source[Rating](perProducerBufferSize = 16)
      .map(rating => {
        val ratingBytes = jSONSerde.serializer().serialize("", rating)
        (rating, ratingBytes)
      })
      .map { ratingWithBytes =>
        val (rating, ratingBytes) = ratingWithBytes
        new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, rating.toEmail, ratingBytes)
      }
      .viaMat(KillSwitches.single)(Keep.both)
      .toMat(Producer.plainSink(ratingProducerSettings))(Keep.both)
      .run()

  kafkaSourceFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => self ! PoisonPill
  }

  override def postStop(): Unit = {
    println(s"RatingProducerActor seen 'Done'")
    killswitch.shutdown()    
    super.postStop()
  }

  override def receive: Receive = {
    case (rating: Rating) => {
      println(s"RatingProducerActor seen ${rating}")
      Source.single(rating).runWith(mergeHubSink)
    }
    case Done => {
      println(s"RatingProducerActor seen 'Done'")
      killswitch.shutdown()
      self ! PoisonPill
    }
  }
}

I’ll be honest there is a fair bit going on in that small chunk of code above, so let's dissect it. What exactly is happening?

  • The most important point is that we simply use the actor as a vessel to host a reactive kafka akka stream RunnableGraph representing a Graph of MergeHub – > Reactive Kafka producer sink. This is completely fine and a normal thing to do. Discussing akka streams is out of scope for this post but if you want to know more, you can read more on a previous post, I did here.
  • So we now know this actor hosts a stream, but the stream could fail, or the actor could fail. So what we want is if the actor fails, the stream is stopped, and if the stream fails the actor is stopped. To do that, we need to do a couple of things:
    • STREAM FAILING: Since the RunnableGraph can return a Future[T], we can hook a callback Success/Failure on that, and send a PoisonPill to the hosting actor. Then the supervisor actor we saw above would kick in and try and create a new instance of this actor. Another thing to note is that the stream hosted in this actor uses the ActorMaterializer that was supplied by the RatingController, where we provided a restart supervision strategy for the stream.
    • ACTOR FAILING: If the actor itself fails, the Akka framework will call the postStop() method, at which point we want to shutdown the stream within this actor. So how can we shutdown the hosted stream? Well see in the middle of the stream setup, there is this line .viaMat(KillSwitches.single)(Keep.both) - this allows us to get a killswitch from the materialized values for the stream. Once we have a KillSwitch, we can simply call its shutDown() method.
    • BELTS AND BRACES: I have also provided a way for the outside world to shutdown this actor and its hosted stream. This is via sending this actor a Done message. I have not put this in yet, but the hook is there to demonstrate how you could do this.
  • We can see that there is a MergeHub source which allows external code to push stuff through the MergeHub via the materialized Sink value from within the actor
  • We can also see that the Rating object that the actor sees is indeed pushed into the MergeHub materialized Sink via this actor, and then some transformation is done on it, to grab its raw bytes
  • We can see the final stage in the RunnableGraph is the Reactive Kafka Producer.plainSink. Which would result in a message being pushed out to a Kafka topic from the hosted stream, pushed Rating object from this actor into the stream

And I think that is the main set of points about how this actor works.

The End Result

Just to prove that this is all working, here is a screen shot of the new RatingController http://localhost:9000/rating/submit/new endpoint being called with a JSON payload representing the Rating.

image

And here is the Akka Http endpoint that queries the Kafka Stream state store(s).

http://localhost:8080/ratingByEmail?email=sacha@here.com gives an empty list as we have NOT sent any Rating through for email “sacha@here.com” yet.

image

http://localhost:8080/ratingByEmail?email=henry@there.com gives 1 result which consists of the amount of Rating(s) I created.

image

http://localhost:8080/ratingByEmail?email=henry@there.com gives 3 result which consists of the amount of Rating(s) I created.

image

So that’s cool this means that we have successfully integrated publishing of a JSON payload Rating object through Kafka to the the Kafka streams Rating processor… happy days!

Conclusion

Straight after the last article, I decided to Dockerize everything (a decision I have now reversed, due to the flay nature of Dockers dependsOn and it not truly waiting for the item depended on even when using “condition : server_healthy” and “healthcheck – test” etc.), and some code must have become corrupt, as stuff from the last post stopped working.

An example from the Docker-Compose docs being:

version: '2.1'
services:
  web:
    build: .
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
  redis:
    image: redis
  db:
    image: redis
    healthcheck:
      test: "exit 0"

I love Docker but for highly complex setups, I think you are better off using a Docker-Compose file but just not trying to bring it all up in one go. I may bring the Docker bits back into the fold for anyone that is reading this that wants to play around, but I will have to think about that closely.

Once I realized that my Docker campaign was doing more harm than good, and I reverted back to my extremely hand coded, but deterministic PowerShell startup script, I found that getting the Play Framework and a Reactive Kafka (akka streams Kafka producer up and running was quite simple, and it kind of worked like a charm first time. Yay

Next Time

Next time, we should be able to making the entire rating view page work as we now have the following things:

So we should quite easily be able to turn that data into a simple bootstrap table in the React portion of this application.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions

 
-- There are no messages in this forum --