Reactive Golf or Iteratees and all that stuff in practice, part 2

Last week, we saw how to get data from a REST API and modify it to our liking in order to send it off for indexing to ElasticSearch. Now, let’s see how to get the data into ElasticSearch in a resource-friendly manner.

Indexing with ElasticSearch and elastic4s

ElasticSearch has a RESTful index API through which to send data for indexing, like for example so (from the ElasticSearch documentation):

It also supports bulk requests, which is what we’ll need, since it makes little sense to issue one HTTP request for each document.

In order to send data over to ElasticSearch, we’ll make use of the elastic4s client, which has an asynchronous scala API and hence makes things very convenient for us:

In the example above, we make use of a custom com.sksamuel.elastic4s.source.Source which tells elastic4s how to deal with documents. It’s basically just a way of turning the GolfIndexDocument-s into JSON, the implementation is pretty straightforward:

Now, the only thing left to do is to get the data from our Enumerator and the subsequent Enumerator chain, group it, and send it off. We’ll do this by passing on the stream to a consuming Iteratee.

Bulk indexing with a grouping Iteratee

I haven’t found any pre-build tooling for grouping chunks (even though I have the nagging feeling that Enumeratee.grouped serves that purpose, but couldn’t find a way to use it with a given chunk size), so I built one based on the provided Iteratee.foldM[E, A] tooling. In our case, what we’d like to do is to accumulate a certain number of chunks, and given a reasonable size have been reached, send a bulk request over to ElasticSearch.

A first approach to achieving this with an Iteratee would look for example like this:

What we’re doing here is just to fold the incoming stream of GolfIndexDocument by giving it a function that accumulates things into a list, until that list has reached 100 elements, and then sends things off to ElasticSearch, returning a new List with the current element. At each “iteration” of our incoming stream, i.e. at each step, we either return a List with the current aggregated elements, or when we have reached the desired group size, we send things off to ElasticSearch, and then continue by providing a new List that contains the current new element.

There’s a catch, however: if we have exactly 10000 elements, this may work nicely, but if we have anything that’s not a multiple of 100, we’ll loose some elements. The way to address this is to call map on our Iteratee (previously called mapDone):

There’s still something a bit annoying with the code above: the Await.result call looks a bit like an intruder in our otherwise reactive code. So, instead of using Iteratee.fold, we can use Iteratee.foldM (M stands for Monadic) which has the following signature:

So our Iteratee becomes:

Now, the method above could be useful in other places as well, so let’s make it a bit more generic:

A few observations:

  • the chunkSize is now configurable
  • the consumeInChunks iteratee takes a function that given a list of elements, will consume the elements and return a Future, in our case, this is the indexDocuments method
  • we also have an optional custom callback to be executed when all is done
  • since we prepend elements to the list we group into, because that is much faster than appending, the order of our chunks isn’t quite preserved. In the example of indexing things in ElasticSearch, this hardly matters, but in other cases, the order might be of importance. That being said, it then would be more efficient to iterate over the list in reverse order in the consumer function, rather than reversing our list, because List.reverse() has a complexity of O(n).

Putting it all together

Let’s see how all of this looks like when put together:

We could instead use the apply method of the Enumerator which would then yield a future (which is what we actually do in practice). In that case, we would need to tell the Enumerator that it is finished by adding >>> Enumerator.eof at the end of our Enumeratee chain, which is to say “the stream is done”. Without it, we’d wait for more data indefinitely.

So, that’s all for today, in the last part we’ll see if this really works and what are the advantages of it over using non-reactive techniques.

Comments 5

  1. Pingback: Reactive Golf or Iteratees and all that stuff in practice, part 1 | In translation

    1. Post

      Hi Tiger,

      sorry, the project isn’t an open-source one, so I can’t post the whole code on Github. If I find the time I’ll see if I can post the parts outlined in the post in a coherent manner.

  2. Thanks for sharing. This post helped me a lot. Would be worthwhile to make the consumeInChunks function even more generic by returning Iteratee[E, A] instead of Iteratee[E, Unit]:

    * An Iteratee that consumes input in chunks of a given size
    * @param chunkSize the chunk size
    * @param f the consumer function
    * @param whenDone an optional callback called when the iteratee is done
    * @tparam E the type of input elements to expect
    * @tparam A the type of output elements to expect
    * @return a chunking Iteratee
    def consumeInChunks[E, A](chunkSize: Int, f: List[E] => Future[A], whenDone: Option[(() => Unit)] = None): Iteratee[E, A] = Iteratee.foldM[E, List[E]](List.empty) { (list, elem) =>

    if (list.size
    } mapM { remaining =>
    f(remaining).map(result => { => callback())

Leave a Reply

Your email address will not be published.