Last week, we saw how to get data from a REST API and modify it to our liking in order to send it off for indexing to ElasticSearch. Now, let’s see how to get the data into ElasticSearch in a resource-friendly manner.

Indexing with ElasticSearch and elastic4s

ElasticSearch has a RESTful index API through which to send data for indexing, like for example so (from the ElasticSearch documentation):

1
2
3
4
5
$ curl -XPUT 'http://localhost:9200/twitter/tweet/1' -d '{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elastic Search"
}'

It also supports bulk requests, which is what we’ll need, since it makes little sense to issue one HTTP request for each document.

In order to send data over to ElasticSearch, we’ll make use of the elastic4s client, which has an asynchronous scala API and hence makes things very convenient for us:

1
2
3
4
5
6
def indexDocuments(documents: Iterable[GolfIndexDocument]): Future[Boolean] = {
  val requests: Iterable[BulkCompatibleRequest] = documents.map { doc =>
    index into "golf/teeTime" source CaseClassSource(doc)
  }
  client.execute(requests.toSeq: _*).map(_.hasFailures)
}

In the example above, we make use of a custom com.sksamuel.elastic4s.source.Source which tells elastic4s how to deal with documents. It’s basically just a way of turning the GolfIndexDocument-s into JSON, the implementation is pretty straightforward:

1
2
3
4
5
6
    case class CaseClassSource[T](document: T)(implicit writes: Writes[T]) extends Source {
      def json: String = {
        val js = Json.toJson(document)
        Json.stringify(js)
      }
    }

Now, the only thing left to do is to get the data from our Enumerator and the subsequent Enumerator chain, group it, and send it off. We’ll do this by passing on the stream to a consuming Iteratee.

Bulk indexing with a grouping Iteratee

I haven’t found any pre-build tooling for grouping chunks (even though I have the nagging feeling that Enumeratee.grouped serves that purpose, but couldn’t find a way to use it with a given chunk size), so I built one based on the provided Iteratee.foldM[E, A] tooling. In our case, what we’d like to do is to accumulate a certain number of chunks, and given a reasonable size have been reached, send a bulk request over to ElasticSearch.

A first approach to achieving this with an Iteratee would look for example like this:

1
2
3
4
5
6
7
8
    def sendInChunks = Iteratee.fold[GolfIndexDocument, List[GolfIndexDocument]](List.empty) { (list, elem) =>
      if (list.size < 100) {
        elem :: list
      } else {
        Await.result(indexDocuments(list), 2 seconds)
        List(elem)
      }
    }

What we’re doing here is just to fold the incoming stream of GolfIndexDocument by giving it a function that accumulates things into a list, until that list has reached 100 elements, and then sends things off to ElasticSearch, returning a new List with the current element. At each “iteration” of our incoming stream, i.e. at each step, we either return a List with the current aggregated elements, or when we have reached the desired group size, we send things off to ElasticSearch, and then continue by providing a new List that contains the current new element.

There’s a catch, however: if we have exactly 10000 elements, this may work nicely, but if we have anything that’s not a multiple of 100, we’ll loose some elements. The way to address this is to call map on our Iteratee (previously called mapDone):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def sendInChunks = Iteratee.fold[GolfIndexDocument, List[GolfIndexDocument]](List.empty) { (list, elem) =>
  if (list.size < 100) {
    elem :: list
  } else {
    Await.result(indexDocuments(list), 2 seconds)
    List(elem)
  }
} map { remaining =>
  Await.result(indexDocuments(list), 2 seconds)
  logger.info("Yay, we're done!")
}

There’s still something a bit annoying with the code above: the Await.result call looks a bit like an intruder in our otherwise reactive code. So, instead of using Iteratee.fold, we can use Iteratee.foldM (M stands for Monadic) which has the following signature:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
/**
 * Create an [[play.api.libs.iteratee.Iteratee]] which folds the content of the Input using a given function and an initial state
 *
 * M stands for Monadic which in this case means returning a [[scala.concurrent.Future]] for the function argument f,
 * so that promises are combined in a complete reactive flow of logic.
 *
 *
 * @param state initial state
 * @param f a function folding the previous state and an input to a new promise of state
 * $paramEcSingle
 */
def foldM[E, A](state: A)(f: (A, E) => Future[A])(implicit ec: ExecutionContext): Iteratee[E, A]

So our Iteratee becomes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def sendInChunks = Iteratee.foldM[GolfIndexDocument, List[GolfIndexDocument]](List.empty) { (list, elem) =>
  if (list.size < 100) {
    Future.successful { elem :: list }
  } else {
    indexDocuments(list).map { elem => List(elem) }
  }
} map { remaining =>
  indexDocuments(list).map { result =>
    logger.info("Yay, we're done!")
  }
}

Now, the method above could be useful in other places as well, so let’s make it a bit more generic:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * An Iteratee that consumes input in chunks of a given size
 *
 * @param chunkSize the chunk size
 * @param f the consumer function
 * @param whenDone an optional callback called when the iteratee is done
 * @tparam E the type of input elements to expect
 * @tparam A the type of output elements to expect
 * @return a chunking Iteratee
 */
def consumeInChunks[E, A](chunkSize: Int, f: List[E] => Future[A], whenDone: Option[(() => Unit)] = None): Iteratee[E, Unit] = Iteratee.foldM[E, List[E]](List.empty) { (list, elem) =>

  if (list.size < chunkSize) {
    Future.successful {
      elem :: list
    }
  } else {
    f(list).map { r =>
      List(elem)
    }
  }
} map { remaining =>
  f(remaining)
  whenDone.map(callback => callback())
}

A few observations:

  • the chunkSize is now configurable
  • the consumeInChunks iteratee takes a function that given a list of elements, will consume the elements and return a Future, in our case, this is the indexDocuments method
  • we also have an optional custom callback to be executed when all is done
  • since we prepend elements to the list we group into, because that is much faster than appending, the order of our chunks isn’t quite preserved. In the example of indexing things in ElasticSearch, this hardly matters, but in other cases, the order might be of importance. That being said, it then would be more efficient to iterate over the list in reverse order in the consumer function, rather than reversing our list, because List.reverse() has a complexity of O(n).

Putting it all together

Let’s see how all of this looks like when put together:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// the basic piping
val (iteratee, enumerator) = Concurrent.joined[Array[Byte]]

// fetching the data and feeding it to the pipe-line
WS.
  url(syncUrl).
  withQueryString(
    "authToken" -> Play.configuration.getString("backendAuthToken").getOrElse("")
  ).
  withRequestTimeout(-1).
  get(_ => iteratee).
  flatMap(_.run)

// transforming the data stream to be ready for ElasticSearch
val transformedStream = enumerator &>
  play.extras.iteratees.Encoding.decode() &>
  play.extras.iteratees.JsonEnumeratees.jsArray &>
  Enumeratee.map[JsValue](format.reads) &>
  Enumeratee.collect[JsResult[SuiteTeeTime]] { case JsSuccess(value, _) => value } &>
  Enumeratee.map[TeeTime](GolfIndexDocument(golfClub, issuer, _, GolfCourseDetails()))

// consumer Iteratee that sends things in chunks to ElasticSearch
val elasticConsumer = ExtraIteratees.consumeInChunks(100, { docs =>
  indexDocuments(docs)
}, Some({ () => logger.info("Yay, we're done!") }))

// finally, get it all to run by running the Iteratee against the Enumerator
transformedStream run elasticConsumer

We could instead use the apply method of the Enumerator which would then yield a future (which is what we actually do in practice). In that case, we would need to tell the Enumerator that it is finished by adding »> Enumerator.eof at the end of our Enumeratee chain, which is to say “the stream is done”. Without it, we’d wait for more data indefinitely.

So, that’s all for today, in the last part we’ll see if this really works and what are the advantages of it over using non-reactive techniques.