A handful Akka techniques
Contents
I’ve been using Akka for 3 years in a variety of projects and by now, I have a hard time imagining to deal with some of the parts of my work without it. Sure, Scala provides other powerful paradigms for dealing with concurrency, but I find Actors to be one of the most elegant concept when it comes to reasoning about it.
There are a few techniques that I’ve used repeatedly in projects and that I would like to share. Make no mistake though
- I am by no means an Akka expert, so some of them may turn out to be sub-optimal techniques or even anti-patterns - use them at your own risk, and make sure you understand their limitations. Also, if you haven’t done so yet, I would by all means recommend to sit down with a fresh cup of coffee and read the excellent Akka documentation.
The cuckoo clock
I’m using this technique mainly in Play framework projects. The first version of the framework had a mechanism to schedule tasks repeatedly, and it dropped that mechanism in version 2 with the recommendation of using Akka’s schedulers.
The main idea behind this technique is to encapsulate the logic of a task that needs to be done repeatedly or at a certain time in one actor, and to wake the actor up by sending it a message:
|
|
Let’s look at how this mechanism works in detail:
- first off, we need to initialize our scheduler. For this purpose, I find it useful to use the actor’s lifecycle
methods
preStart
andpostStop
. It would be possible to declare the scheduler someplace else entirely and to send the messages to the actor, however I find that encapsulating the logic makes for much better maintenance, especially when there are several mechanisms of the sort in the same application. - we create the scheduler using the ActorSystem’s
scheduler.schedule
method, pass in the initial delay (after which the message will be sent the first time), then the interval at which the message should be sent, the receive (in our case, we want it to be sent to the actor itself, so we useself
), and the message to send. - when the actor is shut down, we also make sure to cancel the scheduler as we’d otherwise continue sending messages every 5 minutes into the void
- as for any actor, the implementation of what is to be executed repeatedly happens in the
receive
method - in our case, we only handle messages of the kindSYNC_ALL_ORDERS
- one important thing at this point is to deal with failure: I would recommend to execute any kind of code inside of a try-catch block as demonstrated above - it would be an overkill to make use of Akka’s error handling strategies in this case, and we don’t want our scheduler to crash if one of the synchronizations goes wrong.
Now, one last thing we need to do is to set the mechanism in motion. For a Play application, a good place for this would be the infamous Global:
|
|
And that’s it! Our cukoo clock is ready to go off every 5 minutes.
Note: the scheduler API is well-suited for tasks that involve repetition, less so for tasks that need to go off at certain time or date. It is however possible to achieve this with the API by calculating the duration between the scheduler initialization and the planned time. For these cases, I’m making use of the nscala-time library, which is a wrapper around the excellent Joda Time library.
Batch processor
One thing I find myself doing rather often with Akka is to massively parallize a given task, which often involves data crunching of sorts. This can be a simple set-up that only involves one supervisor and many workers of the same kind, or be something more complex involving a pipeline of sorts.
In most cases this kind of pattern involves a producer (a database such as MongoDB, MySQL, BaseX, Amazon SimpleDB; a multi-gigabyte XML file, etc.) from which items need to be picked and sent off for processing.
Let’s look at an example of a simple supervisor - workers chain. We’ll make a few assumption for this example:
- the time for obtaining a new batch of data is neglectable, so we’re not interested in optimizing the process to fetch new data ahead of time
- our JVM has a decent amount of memory available so that the workers are able to queue all items in their mailbox
Note: The code is intentionally left abstract, as we’re not interested in how data is being fetched from the source or processed, but about what is done with it.
|
|
Let’s go through the example step by step:
- we have a supervisor (
BatchProcessor
), a worker definition(ItemProcessingWorker
) and a set of messages (ProcessBatch
,ProcessItem
,ProcessedOneItem
, etc.) - the constructor of the
BatchProcessor
takes as parameter the ID of the data set we want to process (assuming for example that the data lives in a database and has an identifier). A newBatchProcessor
has to be created for each set we want to handle - the
BatchProcessor
instantiates a number of childworkers
by using a RoundRobinRouter - the supervisor has a number of members that keep track of the state of the processing. It also keeps track of
whatever
ItemProcessingError
may have occurred - the mechanism then works as follows:
- to get things started, the
BatchProcessor
should receive aProcessBatch
message from a client - it then fetches a first batch from the source and hands it to the
processBatch
method. If this is the first time it runs it will also show how many items there are in total - the
processBatch
initialises the state for this batch by resetting counters and setting the batch size, and then goes on to distribute the work to the workers viabatch foreach { item => workers ! item }
. - when an item is processed, the worker actor replies with the
ProcessedOneItem
message, or with anItemProcessingError
in case of failure. We use this information to keep track of the progress of the processing. - the
continueProcessing
method reports progress for every 100 processed items (in practice, this needs to be adapted to the size of the data at hand). When the current batch has been entirely processed (the sum of successful and failed items equals the size of the current batch), it sends the supervisor itself anotherProcessBatch
method. The unimplementedfetchBatch
method can make use of the counters in order to know which offset to use to retrieve the next batch of items. - finally, when there’s no data left to process, the
processBatch
method informs us about this and the processing is over.
- to get things started, the
The method depicted above is of course very generic. In practice, the process often needs to be fine-tuned or can be adapted to be more performant for the specific use-case at hand. The nature of the data source often dictates how the details work. However, this is one of the things I like about Akka - it only provides the bare-bones infrastructure and leaves it up to the developer to choose which mechanism will be used for implementing the pipeline. Some improvements that we’d probably need in a real-life application include:
- error handling using Akka’s supervisor strategies. Right now, our worker is very simple, and has a try - catch statement around the
process
method. But we don’t really recover from errors, we just let things fail and report the amount of errors we have. This may work well if those errors are fatal, but in practice it may be worth re-trying to process some of the items. - inform the client that our work is done. This could be as simple as saving the actor reference when the first
ProcessBatch
message is received, and sending aWorkDone
message when all items are processed (don’t try sending that message tosender
because by the time all work is done, the most likely sender of a message will be one of the child actors) - have nicer metrics regarding the processing, such as processing rate. I often use Code Hale’s metrics library for that purpose.
Poor man’s back-pressure
If you are a member of the Akka core team, skip this section. For the rest of humankind, this topic relates directly to one assumption we’ve done in the previous example: “our JVM has a decent amount of memory available so that the workers are able to queue all items in their mailbox”. In reality, this may not always be the case. However, if you continue to fetch data from a producer and pass it on to the workers, you will eventually run out of memory: by default, Akka uses an UnboundedMailbox which will keep on filling up.
In the example above, this problem is easy to fix, because we’re only ever passing on to the next batch when all items of the current one have been processed, which means that choosing an appropriate batch size is sufficient. However, we may have chosen a different implementation which pulls data out of the source continuously, because doing so is somewhat more expensive, or because you’re interested in running the system at maximum capacity.
In such a situation, we need a means to tell the supervisor to slow down. This concept is also known as back-pressure, and I invite you to read how to properly implement it. Sometimes, however, you may not be interested in optimally solving the back-pressure problem, and in this case, it may be ok to violate Akka’s prime directive: don’t block inside an actor.
It may not be optimal, nor elegant, but I’ve found this mechanism to work pretty well, provided that you have a rough idea of the memory limitation of the application at hand and that you’re not interested in scaling dynamically.
In order to be able to slow down appropriately, we need to decouple the supervisor and the data producer. This is how an implementation may look like:
|
|
So how does this work? Let’s have a closer look:
- we no longer retrieve data in batches - or rather, we do retrieve it in batches, but we don’t wait for a batch to be done in order to fetch more data. Hence, we don’t keep track of batch state.
- we keep track of the current amount of items that are being processed through the
currentItemCount
counter in theProcessor
which gets increased each time an item is sent out and decreased each time a success or failure is reported - the data is no longer fetched directly in the
Processor
but instead in a child thereof calledDataProducer
. Each time an item has been processed, theProcessor
sends itself aProcess
message which has as consequence to ask the producer for more data - unless our current item count is already above the maximum load we can handle - now to the interesting part: the
DataProducer
will not retrieve new data right away, but instead check with the supervisor what the current load is. If it is higher than the maximum allowed (50 items in our example, because we’re e.g. dealing with high-resolution images or something silly of the like that takes up a lot of memory) it will wait for 5 seconds and check again, until the load is again acceptable. Only then does it send new data to theProcessor
which continues to do its job until there’s no more data left.
Some comments about this technique:
- as said, you need to have a good sense of what the maximum should be. This mechanism is everything else than dynamic and won’t work if items are highly heterogenous (in the sense that processing them does take very different amounts of time)
- it may be best to wait for a bunch of items to be processed before requesting a new batch, depending on the case at hand
- the error handling in the example above would need to be improved for a real-life usage
There are many other mechanisms to deal with work distribution that can easily be implemented with Akka. One other mechanism I’d like to try out when I get a chance is work stealing, wherein the workers take an active role of retrieving work instead of having it pushed to them.
Gotchas
During my time with Akka I’ve run in a few gotchas. They mostly amount to not having read the documentation well enough, but I’ve seen other do some of those mistakes, so I think they might be useful to list here.
Create too many root actors
Root actors, created directly via the ActorSystem
instead of the context, should be used with care: creating such
actors is expensive as it requires to synchronize at the level of the guardian. Also, in most cases, you don’t need to
have many root actors, but rather one actor for the task at hand that itself has a number of children.
Closing over a future
This is an easy reasoning mistake to make and is explained in detail in this post. Let’s consider this example:
|
|
So what’s the problem? computeResult
produces a Future
, thus it is switching to a different execution context and
liberating the main thread. This means that another ComputeResult
message may come in, replacing the sender
reference. When the first future completes, we would then answer to the wrong sender.
An easy fix for this is to capture the sender in a val
like so:
|
|
Misinterpreting the meaning of Future.andThen
This is mainly related to Futures rather than Actors, but I’ve encountered it while mixing both
paradigms. The Future API has a method called andThen
“used purely for side-effecting purposes”. In other words,
it won’t transform a future and chain itself to it, but rather be executed independently.
I was wrongly assuming that andThen
would return a new, transformed, Future, so I tried doing the following:
|
|
pipeTo
is a quite powerful tool in Akka’s toolkit which allows to send the result of a future to an actor (under the
hood it creates an anonymous actor that waits for the future to complete).
The correct implementation would be:
|
|
Conclusion
In conclusion, I would like to re-iterate that Akka is a real useful tool to have in your toolset, and I think it’s going to stay around for a bit longer, so I can only advise anyone who hasn’t done so yet to go and try it out (even if you’re in Java-land, there’s a Java API as well). Another nice paradigm that is getting standardised as we speak are reactive streams, which deal with back-pressure as integral part of the concept.