Reactive Golf or Iteratees and all that stuff in practice, part 2
Contents
Last week, we saw how to get data from a REST API and modify it to our liking in order to send it off for indexing to ElasticSearch. Now, let’s see how to get the data into ElasticSearch in a resource-friendly manner.
Indexing with ElasticSearch and elastic4s
ElasticSearch has a RESTful index API through which to send data for indexing, like for example so (from the ElasticSearch documentation):
|
|
It also supports bulk requests, which is what we’ll need, since it makes little sense to issue one HTTP request for each document.
In order to send data over to ElasticSearch, we’ll make use of the elastic4s client, which has an asynchronous scala API and hence makes things very convenient for us:
|
|
In the example above, we make use of a custom com.sksamuel.elastic4s.source.Source
which tells elastic4s how to deal with documents. It’s basically just a way of turning the GolfIndexDocument
-s into JSON, the implementation is pretty straightforward:
|
|
Now, the only thing left to do is to get the data from our Enumerator and the subsequent Enumerator chain, group it, and send it off. We’ll do this by passing on the stream to a consuming Iteratee.
Bulk indexing with a grouping Iteratee
I haven’t found any pre-build tooling for grouping chunks (even though I have the nagging feeling that Enumeratee.grouped
serves that purpose, but couldn’t find a way to use it with a given chunk size), so I built one based on the provided Iteratee.foldM[E, A]
tooling. In our case, what we’d like to do is to accumulate a certain number of chunks, and given a reasonable size have been reached, send a bulk request over to ElasticSearch.
A first approach to achieving this with an Iteratee would look for example like this:
|
|
What we’re doing here is just to fold the incoming stream of GolfIndexDocument
by giving it a function that accumulates things into a list, until that list has reached 100 elements, and then sends things off to ElasticSearch, returning a new List with the current element. At each “iteration” of our incoming stream, i.e. at each step, we either return a List
with the current aggregated elements, or when we have reached the desired group size, we send things off to ElasticSearch, and then continue by providing a new List
that contains the current new element.
There’s a catch, however: if we have exactly 10000 elements, this may work nicely, but if we have anything that’s not a multiple of 100, we’ll loose some elements. The way to address this is to call map
on our Iteratee (previously called mapDone
):
|
|
There’s still something a bit annoying with the code above: the Await.result
call looks a bit like an intruder in our otherwise reactive code. So, instead of using Iteratee.fold
, we can use Iteratee.foldM
(M stands for Monadic) which has the following signature:
|
|
So our Iteratee becomes:
|
|
Now, the method above could be useful in other places as well, so let’s make it a bit more generic:
|
|
A few observations:
- the
chunkSize
is now configurable - the
consumeInChunks
iteratee takes a function that given a list of elements, will consume the elements and return aFuture
, in our case, this is theindexDocuments
method - we also have an optional custom callback to be executed when all is done
- since we prepend elements to the list we group into, because that is much faster than appending, the order of our chunks isn’t quite preserved. In the example of indexing things in ElasticSearch, this hardly matters, but in other cases, the order might be of importance. That being said, it then would be more efficient to iterate over the list in reverse order in the consumer function, rather than reversing our list, because
List.reverse()
has a complexity ofO(n)
.
Putting it all together
Let’s see how all of this looks like when put together:
|
|
We could instead use the apply
method of the Enumerator which would then yield a future (which is what we actually do in practice). In that case, we would need to tell the Enumerator that it is finished by adding »> Enumerator.eof
at the end of our Enumeratee chain, which is to say “the stream is done”. Without it, we’d wait for more data indefinitely.
So, that’s all for today, in the last part we’ll see if this really works and what are the advantages of it over using non-reactive techniques.