Reactive Golf or Iteratees and all that stuff in practice, part 1

I’m currently working on a project that involves synchronizing large amounts of data from backend systems for the purpose of making it searchable. Amongst other things, one domain the system deals with is Golf and the booking of tee times (i.e. a time at which to start playing). If I got things correctly, tee times are there in order to avoid getting hit by a player that starts later on (I haven’t played golf yet).

The portal is written on top of the Play! Framework and the search engine in use is ElasticSearch.

The data flow

The data flow

There are two cases of data synchronization:

  • a full synchronization run, when initially connecting a new tenant or regularly at night to make sure both systems really are in sync, as pull from the portal
  • when a single tee time is modified (e.g. when it gets booked), as push from the backend systems

The case we’re going to explore more into depth is the full synchronization, as this is the case that will need to scale up as the number of tenants and the variety of data to synchronize will grow.

Both the backend and ElasticSearch use JSON to encode the data, and the portal has to do some enhancements to turn the raw tee time into a document suitable for indexing.

Fetching the data

In order to fetch the data from a tenant, we’re using Play’s play.api.libs.WS library, like so:

The infinite request timeout is there because we may potentially get a fair amount of data, which in turn may take quite a while to reach us all.

Reactive streaming

The idea behind “reactive” streaming is that we only use resources (CPU, memory, etc.) when there’s a reason for it, i.e. when there’s data. Instead of retrieving all of the data in memory and then sending it off to ElasticSearch, our portal should be able to pass it on directly to ElasticSearch. Otherwise, it may require quite a bit of memory to handle streams that come in at the same time.

The tooling that the Play framework offers for this purpose are the so-called Iteratees, Enumerators and Enumeratees. There’s an excellent introduction by Pascal Voitot on the topic, I hope that this practical example can help to give a bit more background as to how this tooling can be used.

Basic piping: connecting input and output

So on one hand, we are doing a WS call to retrieve our data, and on the other hand, we’d like to send it out after being modified a bit. Play 2.2 offers the Concurrent.joined[A] method to do just that. There are 3 things that are good to know prior to looking into this method:

  • an iteratee is the data consumer, i.e. the “pipe” into which data will flow
  • an enumerator is the data producer, i.e. where the data flows from
  • these streams always deal with a certain type of data. In our case, the type of data is going to be an array of bytes, since we get raw data over the web from a call to a REST API.

The scaladoc of the Concurrent.joined[A] method says:

So, we need to apply the iteratee in the pair to something (for the “pipe” to get some “water”), and subsequently we can run the whole machinery by applying the enumerator to it. Let’s see how that would look like on the one end with our previous WS call:

This may look a bit spooky, but it’s quite simple. The get method performs a get request which takes a consumer, and which then eventually will return a connected iteratee. From the scaladoc:

With the flatMap statement, we’re taking whatever is inside of that Future “box” and run it, and then unwrap the result of the run (which is itself a future, because that’s what the run function returns).

Ok, now, we have some water coming in on one end of the pipe. Let’s see what to do with it.

Enumeratees or how to transform streams on the fly

An enumerator is a data producer, and an enumeratee is a nifty tool that lets us work on a reactive data stream on the fly. We can compose our enumeratee with an enumerator, like so:

Ok, that’s quite a bit of enumeratees there at once. Let’s look into this in detail:

  • the &> sign just means “compose”, i.e. chain things together
  • the first map enumeratee will take the input (which is of type Array[Byte]) and parse it as JSON
  • the second map enumeratee will take the resulting JsValue-s from the previous enumeratee and read them into something with the format.reads function
  • the third enumeratee collects all values of a certain kind, i.e. acts as a filter. It gets as input a JsResult[TeeTime], which may either be JsSuccess or JsError (this is part of the standard Play JSON library)
  • finally, the last enumeratee will pick out the TeeTime-s returned by the previous step and put them into a GolfIndexDocument ready to be sent off to ElasticSearch

This is all pretty great. There’s just one gotcha: the pipe-line above assumes that we’ll always be able to turn our first chunk of Array[Byte] into a JsValue, i.e. that it is a complete JSON token, which doesn’t get cut in the middle (e.g. {"foo": "ba and then r"}). That actually may hold true for not too large objects. However, it would also mean that we’d expect to get a stream of serialized JSON objects from our backend. But that’s not the case… we get a JSON array containing all of those objects.

So at this point, we would need an enumeratee that is capable of dealing with incomplete chunks of JSON, and reassemble them, reactively, taking into account all kind of edge cases… which all sounds pretty complex. Luckily for us, the universe created James Roper, and James Roper created the JSON iteratees and enumeratees.

So armed with this new tool, let’s rewrite our pipe-line:

Now that’s pretty cool, isn’t it? There’s just one last catch: the enumerator provides data of type Array[Byte] and the JsonEnumeratees.jsArray enumeratee expects data of type… Array[Char].

We could fix this by being rather blunt and inserting the following enumeratee:

However, that would then only work for ASCII encoding, and we likely will get something a bit richer, such as UTF-8. Luckily enough, James also wrote a tool for this, so our complete chain now looks as follows:

Voilà, we now have a pipe-line that turns a raw array of bytes into full-fledged GolfIndexDocument-s ready to be sent off to ElasticSearch!

We’ll see how to do this in part 2, so stay tuned!

Comments 8

  1. Hi Manuel,

    Thanks for a very useful article. Timely as well as you posted it on the very day I am looking to do exactly this!

    One question, what is the best way to get jroper’s play extras library into my play project? There doesn’t seem to be an sbt archive for it and I’m not familiar yet with pulling libs directly from github.


    1. Post

      Hi Doug,

      thanks! James’ iteratees-extras are available from the typesafe release repository with the following parameters:

      “” %% “iteratees-extras” % “1.1.0”

  2. Hi,
    thanks for the really interesting series.
    You are saying: “An enumeratee is a data producer”. Shouldn’t that be:
    “An enumerator is a data producer”?


    1. Post
  3. Nice article,

    If I understand correctly, you’re trading a huge memory consumption for a slower elastic search call? (as the 1-n parsed JS objects are sent individually and in sequence to the elastic service)

    1. Post

      It’s not so much an elastic call (the computation isn’t distributed on several nodes) as it is a non-blocking implementation, i.e. the data is parsed on the fly, rather than at-once. This is especially useful when the memory consumption would grow really high (because of a lot of data & a lot of data transfers at once).

  4. I guess I am late commenting on this dated article – but from an architectural perspective, in the case of full sync that happens at night time as mentioned in the article, unless I am missing something, why didn’t you consider batch ( a file transfer) as an architectural pattern? Couldn’t your web service drop a file at nights so that you can parse the stuff in the batch and send it over to Elastic Search?

    Awesome stuff from a technology perspective! Please keep up the good stuff.

    1. Post

      Doing the whole transfer based on files would certainly have been a possibility, if the generation of the files and parsing would have been done using a streaming mechanism since they wouldn’t really fit into memory (Jackson comes to mind for streaming JSON parsing, although I am not sure how well it would interact with Scala case classes). However this would mean to store files on disk at each end instead of handling the whole process on-the-fly, and would have been more costly in resources. The other advantage of this approach is that the same pipeline can be used for updating discrete tee times during the day if they are changed (for example when a golf course becomes unavailable).

Leave a Reply

Your email address will not be published.