I’m currently working on a project that involves synchronizing large amounts of data from backend systems for the purpose of making it searchable. Amongst other things, one domain the system deals with is Golf and the booking of tee times (i.e. a time at which to start playing). If I got things correctly, tee times are there in order to avoid getting hit by a player that starts later on (I haven’t played golf yet).

The portal is written on top of the Play! Framework and the search engine in use is ElasticSearch.

The data flow

There are two cases of data synchronization:

  • a full synchronization run, when initially connecting a new tenant or regularly at night to make sure both systems really are in sync, as pull from the portal
  • when a single tee time is modified (e.g. when it gets booked), as push from the backend systems

The case we’re going to explore more into depth is the full synchronization, as this is the case that will need to scale up as the number of tenants and the variety of data to synchronize will grow.

Both the backend and ElasticSearch use JSON to encode the data, and the portal has to do some enhancements to turn the raw tee time into a document suitable for indexing.

Fetching the data

In order to fetch the data from a tenant, we’re using Play’s play.api.libs.WS library, like so:

1
2
3
4
5
6
WS.
  url(syncUrl).
  withQueryString(
    "authToken" -> Play.configuration.getString("backendAuthToken").getOrElse("")
  ).
  withRequestTimeout(-1)

The infinite request timeout is there because we may potentially get a fair amount of data, which in turn may take quite a while to reach us all.

Reactive streaming

The idea behind “reactive” streaming is that we only use resources (CPU, memory, etc.) when there’s a reason for it, i.e. when there’s data. Instead of retrieving all of the data in memory and then sending it off to ElasticSearch, our portal should be able to pass it on directly to ElasticSearch. Otherwise, it may require quite a bit of memory to handle streams that come in at the same time.

The tooling that the Play framework offers for this purpose are the so-called Iteratees, Enumerators and Enumeratees. There’s an excellent introduction by Pascal Voitot on the topic, I hope that this practical example can help to give a bit more background as to how this tooling can be used.

Basic piping: connecting input and output

So on one hand, we are doing a WS call to retrieve our data, and on the other hand, we’d like to send it out after being modified a bit. Play 2.2 offers the Concurrent.joined[A] method to do just that. There are 3 things that are good to know prior to looking into this method:

  • an iteratee is the data consumer, i.e. the “pipe” into which data will flow
  • an enumerator is the data producer, i.e. where the data flows from
  • these streams always deal with a certain type of data. In our case, the type of data is going to be an array of bytes, since we get raw data over the web from a call to a REST API.

The scaladoc of the Concurrent.joined[A] method says:

1
2
3
4
5
6
7
8
/**
 * Create a joined iteratee enumerator pair.
 *
 * When the enumerator is applied to an iteratee, the iteratee subsequently consumes whatever the iteratee in the pair
 * is applied to.  Consequently the enumerator is "one shot", applying it to subsequent iteratees will throw an
 * exception.
 */
def joined[A]: (Iteratee[A, Unit], Enumerator[A]) = {  }

So, we need to apply the iteratee in the pair to something (for the “pipe” to get some “water”), and subsequently we can run the whole machinery by applying the enumerator to it. Let’s see how that would look like on the one end with our previous WS call:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import scala.concurrent.ExecutionContext.Implicits.global
import play.api.libs.iteratee._

val (iteratee, enumerator) = Concurrent.joined[Array[Byte]]

WS.
  url(syncUrl).
  withQueryString(
    "authToken" -> Play.configuration.getString("backendAuthToken").getOrElse("")
  ).
  withRequestTimeout(-1).
  get(_ => iteratee).
  flatMap(_.run)

This may look a bit spooky, but it’s quite simple. The get method performs a get request which takes a consumer, and which then eventually will return a connected iteratee. From the scaladoc:

1
2
3
4
5
/**
 * performs a get with supplied body
 * @param consumer that's handling the response
 */
def get[A](consumer: ResponseHeaders => Iteratee[Array[Byte], A]): Future[Iteratee[Array[Byte], A]] =

With the flatMap statement, we’re taking whatever is inside of that Future “box” and run it, and then unwrap the result of the run (which is itself a future, because that’s what the run function returns).

Ok, now, we have some water coming in on one end of the pipe. Let’s see what to do with it.

Enumeratees or how to transform streams on the fly

An enumerator is a data producer, and an enumeratee is a nifty tool that lets us work on a reactive data stream on the fly. We can compose our enumeratee with an enumerator, like so:

1
2
3
4
5
enumerator &>
  Enumeratee.map[Array[Byte]](Json.parse) &>
  Enumeratee.map[JsValue](format.reads) &>
  Enumeratee.collect[JsResult[TeeTime]] { case JsSuccess(value, _) => value } &>
  Enumeratee.map[TeeTime](GolfIndexDocument(golfClub, issuer, _, GolfCourseDetails()))

Ok, that’s quite a bit of enumeratees there at once. Let’s look into this in detail:

  • the &> sign just means “compose”, i.e. chain things together
  • the first map enumeratee will take the input (which is of type Array[Byte]) and parse it as JSON
  • the second map enumeratee will take the resulting JsValue-s from the previous enumeratee and read them into something with the format.reads function
  • the third enumeratee collects all values of a certain kind, i.e. acts as a filter. It gets as input a JsResult[TeeTime], which may either be JsSuccess or JsError (this is part of the standard Play JSON library)
  • finally, the last enumeratee will pick out the TeeTime-s returned by the previous step and put them into a GolfIndexDocument ready to be sent off to ElasticSearch

This is all pretty great. There’s just one gotcha: the pipe-line above assumes that we’ll always be able to turn our first chunk of Array[Byte] into a JsValue, i.e. that it is a complete JSON token, which doesn’t get cut in the middle (e.g. {"foo": "ba and then r"}). That actually may hold true for not too large objects. However, it would also mean that we’d expect to get a stream of serialized JSON objects from our backend. But that’s not the case… we get a JSON array containing all of those objects.

So at this point, we would need an enumeratee that is capable of dealing with incomplete chunks of JSON, and reassemble them, reactively, taking into account all kind of edge cases… which all sounds pretty complex. Luckily for us, the universe created James Roper, and James Roper created the JSON iteratees and enumeratees.

So armed with this new tool, let’s rewrite our pipe-line:

1
2
3
4
5
enumerator &>
  play.extras.iteratees.JsonEnumeratees.jsArray &>
  Enumeratee.map[JsValue](format.reads) &>
  Enumeratee.collect[JsResult[TeeTime]] { case JsSuccess(value, _) => value } &>
  Enumeratee.map[TeeTime](GolfIndexDocument(golfClub, issuer, _, GolfCourseDetails()))

Now that’s pretty cool, isn’t it? There’s just one last catch: the enumerator provides data of type Array[Byte] and the JsonEnumeratees.jsArray enumeratee expects data of type… Array[Char].

We could fix this by being rather blunt and inserting the following enumeratee:

1
Enumeratee.map[Array[Byte]](a => a.map(_.toChar))

However, that would then only work for ASCII encoding, and we likely will get something a bit richer, such as UTF-8. Luckily enough, James also wrote a tool for this, so our complete chain now looks as follows:

1
2
3
4
5
6
enumerator &>
  play.extras.iteratees.Encoding.decode() &>
  play.extras.iteratees.JsonEnumeratees.jsArray &>
  Enumeratee.map[JsValue](format.reads) &>
  Enumeratee.collect[JsResult[SuiteTeeTime]] { case JsSuccess(value, _) => value } &>
  Enumeratee.map[TeeTime](GolfIndexDocument(golfClub, issuer, _, GolfCourseDetails()))

Voilà, we now have a pipe-line that turns a raw array of bytes into full-fledged GolfIndexDocument-s ready to be sent off to ElasticSearch!

We’ll see how to do this in part 2, so stay tuned!