Click here to Skip to main content
15,868,016 members
Articles / Programming Languages / Scala

Madcap Idea Part 8: Intermediate Step, Rest API For Interactive Kafka Stream Ktable Queries

Rate me:
Please Sign up or sign in to vote.
5.00/5 (1 vote)
1 Sep 2017CPOL5 min read 4.5K   1  
This article will focus on a simple REST example in Scala. It's an intermediate post that the next one will build on, I just thought it might be useful to show this one in isolation, before we move on to the real one which is to allow interactive queries over live Kafka streams.

Last Time

So this one has taken a very long time to get up, I have been on a few weeks holiday and a few other things cropped up. This is also an intermediate post that the next one will build on, I just thought it might be useful to show this one in isolation, before we move on to the real one which is to allow interactive queries over live Kafka streams. In order to carry out the live queries, we need to expose a REST endpoint over the Kafka stream nodes, which will be the focus of the next article. This one will focus on a simple REST example in Scala.

PreAmble

Just as a reminder, this is part of my ongoing set of posts which I talk about in this post, where we will be building up to a point where we have a full app using lots of different stuff, such as these:

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

Ok, so now that we have the introductions out of the way, let's crack on with what we want to cover in this post. Which is how to create a simple Client/Server REST API in Scala.

There are several choices available, that I wanted to consider:

I did actually try out all 3 of these, with varying degrees of success. I am using Scala 2.12 and IntelliJ IDEA 17. So even though I settled on Akka Http (as I have used it before and it just works), I thought it may be useful to show a simple hello world of the server examples in the other 2.

Finch Server Example

So this would be a simple REST server written in Finch:

Java
import io.finch._
import com.twitter.finagle.Http

val api: Endpoint[String] = get("hello") { Ok("Hello, World!") }

Http.server.serve(":8080", api.toServiceAs[Text.Plain])

The main points that I liked about using Finch were:

  • It's functional
  • I liked the ScalaZ composable style endpoints
  • It had good JSON support (Circe, Argonaut, Jackson, Json4s, PlayJson)
  • It was very concise

The reasons I chose not to use it are:

  • The endpoint composition seems to get very confused in IntelliJ IDEA and required you to resort to use SBT command line, and IntelliJ IDEA would still complain about the syntax
  • No client library

HTTP4S Server/Client Example

So this would be a simple REST server/client written in Http4s.

SBT

You will need a SBT file something like this, I opted to go for a version that was considered stable, not the current develop version:

Java
name := "SImpleRestApi"
version := "1.0"
scalaVersion := "2.12.1"
val http4sVersion = "0.15.16"

libraryDependencies ++= Seq(
  "org.http4s" %% "http4s-dsl"          % http4sVersion,
  "org.http4s" %% "http4s-blaze-server" % http4sVersion,
  "org.http4s" %% "http4s-blaze-client" % http4sVersion,
  "org.http4s" %% "http4s-circe"        % http4sVersion,
  "io.circe"   %% "circe-generic"       % "0.6.1",
  "io.circe"   %% "circe-literal"       % "0.6.1"
)

Common Entities

Java
case class User(name: String)
case class Hello(greeting: String)

The Server

This is a simple http4s server that will accept a “User” case class JSON payload on the “hello” route, and will return a “Hello” case class as JSON:

Java
import Entities.{Hello, User}
import io.circe.generic.auto._
import io.circe.syntax._
import org.http4s._
import org.http4s.circe._
import org.http4s.dsl._
import org.http4s.server.blaze._

object RestServer extends App {

  run()

  def run() : Unit = {

    val jsonService = HttpService {
      case req @ POST -> Root / "hello" =>
        for {
        // Decode a User request
          user <- req.as(jsonOf[User])
          // Encode a hello response
          resp <- Ok(Hello(user.name).asJson)
        } yield (resp)

      case req @ GET -> Root / "test" =>
        for {

          resp <- Ok(Hello("Monkey").asJson)
        } yield (resp)
    }

    val builder = BlazeBuilder.bindHttp(8080).mountService(jsonService, "/")
    val blazeServer = builder.run

    scala.io.StdIn.readLine()
    ()
  }
}

There are a couple of things to note here:

  • It uses Blaze server
  • The routes are constructed in a pretty intuitive manner
  • The JSON encoding/decoding is done seamlessly and did not require anything other than including circle.generic (this is nice)

The Client

Java
import Entities.{Hello, User}
import org.http4s.client.blaze.PooledHttp1Client
import scalaz.concurrent.Task
import io.circe.generic.auto._
import io.circe.syntax._
import org.http4s.circe._
import org.http4s.dsl._
import org.http4s.client._

object RestClient extends App {

  val httpClient = PooledHttp1Client()

  run()

  def run() : Unit = {

    val hello = helloClient("foo").run
    println(s"Hello ${hello}")
    scala.io.StdIn.readLine()
    ()
  }

  def helloClient(name: String): Task[Hello] = {
    // Encode a User request
    val req = POST(uri("http://localhost:8080/hello"), User(name).asJson)
    // Decode a Hello response
    httpClient.expect(req)(jsonOf[Hello])
  }
}

The client API is quite nice and required very little code, and just like the server portion, the encoding/decoding to/from JSON is seamless. I also like the use of scalaz.concurrent.Task to manage the asynchronisity.

The main points that I liked about using http4s were:

  • It's functional
  • It looked like a nice API to use
  • It had good JSON support (Circe, Argonaut, Json4s)
  • It was very concise
  • It had a server API + client API

The reasons I chose not to use it were:

  • It took me a fair while to get the client to work, due to the documentation being a mish mash of versions where they have not kept documentation for specific versions. So I had to dig around quite a bit to get the version I was working for to work. Once it did work, I was happy with it though.
  • Documentation is clearly a mish mash of different versions, this could be better

So that really only left me one choice, the tried and tested but rather unadventurous Akka-Http, so let's look at that next.

Akka Http Server/Client Example

So this would be a simple REST server/client written in Akka Http.

SBT

This is the SBT file that I am using:

Java
name := "AkkaHttpRESTExample"

version := "0.1"

scalaVersion := "2.12.3"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.9",
  "com.typesafe.akka" %% "akka-http" % "10.0.9"
)

Common Entities

These are the common entities.

Java
package Entities

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._

final case class Item(name: String, id: Long)
final case class Order(items: List[Item])

object DomainEntitiesJsonFormats {
  implicit val itemFormat = jsonFormat2(Item)
  implicit val orderFormat = jsonFormat1(Order)
}

There are a couple of things to note above.

  • We need to use the JSON marshallers/unmarshallers jsonFormat2 / jsonFormat1 that are available within the spray.json package. These represent JSON marshallers/unmarshallers for case class(es) with 2 and 1 parameters respectively
  • That the actual entities are simple case class(es)

The Server

This is a simple server:

Java
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import Entities.DomainEntitiesJsonFormats._
import Entities._

object RestServer extends App {

  run()

  def run() {
    val itemNameRegexPattern =  """\w+""".r
    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher

    val route =
      path("hello") {
        get {
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
            "<h1>Say hello to akka-http</h1>"))
        }
      }~
      path("order" / itemNameRegexPattern) { itemName =>
        get {
          complete(Order(List[Item](Item(itemName, 1))))
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))

    StdIn.readLine() // let it run until user presses return
  }
}

The main points to note above are:

  • How the routing DSL works, and we can build multiple routes which are chained together
  • How we send JSON payload back using the complete routing directive
  • How we bind the server to the port and address

The Client

This is a client that goes hand in hand with the server code above:

Java
import Entities.DomainEntitiesJsonFormats._
import Entities._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.io.StdIn
import akka.http.scaladsl.unmarshalling.Unmarshal
import spray.json._

import scala.util.{Failure, Success}

object RestClient extends App {

  run()

  def run() {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher

    println(s"Client attempting to fetch from online " +
      "at http://localhost:8080/order/beans\nPress RETURN to stop...")

    val responseFuture: Future[Order] = {
      Http().singleRequest(HttpRequest(uri = "http://localhost:8080/order/beans"))
        .flatMap(response => Unmarshal(response.entity).to[Order])
    }

    responseFuture onComplete {
      case Failure(ex) => System.out.println(s"Failed to perform GET, reason: $ex")
      case Success(response) => System.out.println(s"Server responded with $response")
    }

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      system.terminate() // and shutdown when done
    }))

    StdIn.readLine() // let it run until user presses return
  }
}

There is not too much to note here, the main points are:

  • How the response from the Http methods are Future[T] and as such must be handled with standard Future code
  • How we need to Unmarshal the JSON string response back into a case class. This is thanks to the jsonFormat2/jsonFormat1 (marshallers/unmarshallers) we saw earlier

That’s It For Now

So I have shown a few options here both http4s and Akka Http are fairly easy to use, it’s a shame about Finch and IntelliJ, but at the end of the day, I wanted something with a Client API too.

Next Time

Next time, we will look at the actual Ratings workflow in terms of Kafka Streams, and how we can aggregate incoming rating messages inside a Kafka KStream (which will come from the React from end in the end). We will then look at how we can store these rolling stream values into a Kafka KTable and query the results using Kafka interactive queries.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United Kingdom United Kingdom
I currently hold the following qualifications (amongst others, I also studied Music Technology and Electronics, for my sins)

- MSc (Passed with distinctions), in Information Technology for E-Commerce
- BSc Hons (1st class) in Computer Science & Artificial Intelligence

Both of these at Sussex University UK.

Award(s)

I am lucky enough to have won a few awards for Zany Crazy code articles over the years

  • Microsoft C# MVP 2016
  • Codeproject MVP 2016
  • Microsoft C# MVP 2015
  • Codeproject MVP 2015
  • Microsoft C# MVP 2014
  • Codeproject MVP 2014
  • Microsoft C# MVP 2013
  • Codeproject MVP 2013
  • Microsoft C# MVP 2012
  • Codeproject MVP 2012
  • Microsoft C# MVP 2011
  • Codeproject MVP 2011
  • Microsoft C# MVP 2010
  • Codeproject MVP 2010
  • Microsoft C# MVP 2009
  • Codeproject MVP 2009
  • Microsoft C# MVP 2008
  • Codeproject MVP 2008
  • And numerous codeproject awards which you can see over at my blog

Comments and Discussions

 
-- There are no messages in this forum --