I’ve been using Akka for 3 years in a variety of projects and by now, I have a hard time imagining to deal with some of the parts of my work without it. Sure, Scala provides other powerful paradigms for dealing with concurrency, but I find Actors to be one of the most elegant concept when it comes to reasoning about it.

There are a few techniques that I’ve used repeatedly in projects and that I would like to share. Make no mistake though

  • I am by no means an Akka expert, so some of them may turn out to be sub-optimal techniques or even anti-patterns - use them at your own risk, and make sure you understand their limitations. Also, if you haven’t done so yet, I would by all means recommend to sit down with a fresh cup of coffee and read the excellent Akka documentation.

The cuckoo clock

I’m using this technique mainly in Play framework projects. The first version of the framework had a mechanism to schedule tasks repeatedly, and it dropped that mechanism in version 2 with the recommendation of using Akka’s schedulers.

The main idea behind this technique is to encapsulate the logic of a task that needs to be done repeatedly or at a certain time in one actor, and to wake the actor up by sending it a message:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ScheduledOrderSynchronizer extends Actor {

  private val SYNC_ALL_ORDERS = "SYNC_ALL_ORDERS"

  private var scheduler: Cancellable = _

  override def preStart(): Unit = {
    import scala.concurrent.duration._
    scheduler = context.system.scheduler.schedule(
      initialDelay = 10 seconds,
      interval = 5 minutes,
      receiver = self,
      message = SYNC_ALL_ORDERS
    )
  }

  override def postStop(): Unit = {
    scheduler.cancel()
  }

  def receive = {
    case SYNC_ALL_ORDERS =>
      try {
        // synchronize all the orders
      } catch {
        case t: Throwable =>
        // report errors
      }
  }

}

Let’s look at how this mechanism works in detail:

  • first off, we need to initialize our scheduler. For this purpose, I find it useful to use the actor’s lifecycle methods preStart and postStop. It would be possible to declare the scheduler someplace else entirely and to send the messages to the actor, however I find that encapsulating the logic makes for much better maintenance, especially when there are several mechanisms of the sort in the same application.
  • we create the scheduler using the ActorSystem’s scheduler.schedule method, pass in the initial delay (after which the message will be sent the first time), then the interval at which the message should be sent, the receive (in our case, we want it to be sent to the actor itself, so we use self), and the message to send.
  • when the actor is shut down, we also make sure to cancel the scheduler as we’d otherwise continue sending messages every 5 minutes into the void
  • as for any actor, the implementation of what is to be executed repeatedly happens in the receive method - in our case, we only handle messages of the kind SYNC_ALL_ORDERS
  • one important thing at this point is to deal with failure: I would recommend to execute any kind of code inside of a try-catch block as demonstrated above - it would be an overkill to make use of Akka’s error handling strategies in this case, and we don’t want our scheduler to crash if one of the synchronizations goes wrong.

Now, one last thing we need to do is to set the mechanism in motion. For a Play application, a good place for this would be the infamous Global:

1
2
3
4
5
object Global extends GlobalSettings {

  override def onStart(app: Application) {
    Akka.system.actorOf(Props(new ScheduledOrderSynchronizer), name = "orderSynchronizer")
  }

And that’s it! Our cukoo clock is ready to go off every 5 minutes.

Note: the scheduler API is well-suited for tasks that involve repetition, less so for tasks that need to go off at certain time or date. It is however possible to achieve this with the API by calculating the duration between the scheduler initialization and the planned time. For these cases, I’m making use of the nscala-time library, which is a wrapper around the excellent Joda Time library.

Batch processor

One thing I find myself doing rather often with Akka is to massively parallize a given task, which often involves data crunching of sorts. This can be a simple set-up that only involves one supervisor and many workers of the same kind, or be something more complex involving a pipeline of sorts.

One supervisor and its army of workers

Multi-step structure

In most cases this kind of pattern involves a producer (a database such as MongoDB, MySQL, BaseX, Amazon SimpleDB; a multi-gigabyte XML file, etc.) from which items need to be picked and sent off for processing.

Let’s look at an example of a simple supervisor - workers chain. We’ll make a few assumption for this example:

  • the time for obtaining a new batch of data is neglectable, so we’re not interested in optimizing the process to fetch new data ahead of time
  • our JVM has a decent amount of memory available so that the workers are able to queue all items in their mailbox

Note: The code is intentionally left abstract, as we’re not interested in how data is being fetched from the source or processed, but about what is done with it.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
import akka.actor.{Props, Actor}
import akka.event.Logging
import akka.routing.RoundRobinRouter

abstract class BatchProcessor(dataSetId: Long) extends Actor {

  val log = Logging(context.system, "application")

  val workers = context.actorOf(Props[ItemProcessingWorker].withRouter(RoundRobinRouter(100)))

  var totalItemCount = -1
  var currentBatchSize: Int = 0
  var currentProcessedItemsCount: Int = 0
  var currentProcessingErrors: List[ItemProcessingError] = List.empty

  var allProcessedItemsCount = 0
  var allProcessingErrors: List[ItemProcessingError] = List.empty

  def receive = {

    case ProcessBatch =>
      if (totalItemCount == -1) {
        totalItemCount = totalItems
        log.info(s"Starting to process set with ID $dataSetId, we have $totalItemCount items to go through")
      }
      val batch = fetchBatch
      processBatch(batch)

    case ProcessedOneItem =>
      currentProcessedItemsCount = currentProcessedItemsCount + 1
      continueProcessing()

    case error@ItemProcessingError(_, _, _) =>
      currentProcessingErrors = error :: currentProcessingErrors
      continueProcessing()

  }

  def processBatch(batch: List[BatchItem]) = {

    if (batch.isEmpty) {
      log.info(s"Done migrating all items for data set $dataSetId. $totalItems processed items, we had ${allProcessingErrors.size} errors in total")
    } else {
      // reset processing state for the current batch
      currentBatchSize = batch.size
      allProcessedItemsCount = currentProcessedItemsCount + allProcessedItemsCount
      currentProcessedItemsCount = 0
      allProcessingErrors = currentProcessingErrors ::: allProcessingErrors
      currentProcessingErrors = List.empty

      // distribute the work
      batch foreach { item =>
        workers ! item
      }
    }

  }

  def continueProcessing() = {

    val itemsProcessed = currentProcessedItemsCount + currentProcessingErrors.size

    if (itemsProcessed > 0 && itemsProcessed % 100 == 0) {
      log.info(s"Processed $itemsProcessed out of $currentBatchSize with ${currentProcessingErrors.size} errors")
    }

    if (itemsProcessed == currentBatchSize) {
      self ! ProcessBatch
    }

  }

  def totalItems: Int

  def fetchBatch: List[BatchItem]

}

abstract class ItemProcessingWorker extends Actor {

  def receive = {
    case ProcessItem(item) =>
      try {
        process(item) match {
          case None => sender ! ProcessedOneItem
          case Some(error) => sender ! error
        }
      } catch {
        case t: Throwable =>
          sender ! ItemProcessingError(item.id, "Unhandled error", Some(t))
      }
  }

  def process(item: BatchItem): Option[ItemProcessingError]

}

case object ProcessBatch

trait BatchItem {
  val id: Int
}

case class ProcessItem(item: BatchItem)

case object ProcessedOneItem

case class ItemProcessingError(itemId: Int, message: String, error: Option[Throwable])

Let’s go through the example step by step:

  • we have a supervisor (BatchProcessor), a worker definition(ItemProcessingWorker) and a set of messages (ProcessBatch, ProcessItem, ProcessedOneItem, etc.)
  • the constructor of the BatchProcessor takes as parameter the ID of the data set we want to process (assuming for example that the data lives in a database and has an identifier). A new BatchProcessor has to be created for each set we want to handle
  • the BatchProcessor instantiates a number of child workers by using a RoundRobinRouter
  • the supervisor has a number of members that keep track of the state of the processing. It also keeps track of whatever ItemProcessingError may have occurred
  • the mechanism then works as follows:
    • to get things started, the BatchProcessor should receive a ProcessBatch message from a client
    • it then fetches a first batch from the source and hands it to the processBatch method. If this is the first time it runs it will also show how many items there are in total
    • the processBatch initialises the state for this batch by resetting counters and setting the batch size, and then goes on to distribute the work to the workers via batch foreach { item => workers ! item }.
    • when an item is processed, the worker actor replies with the ProcessedOneItem message, or with an ItemProcessingError in case of failure. We use this information to keep track of the progress of the processing.
    • the continueProcessing method reports progress for every 100 processed items (in practice, this needs to be adapted to the size of the data at hand). When the current batch has been entirely processed (the sum of successful and failed items equals the size of the current batch), it sends the supervisor itself another ProcessBatch method. The unimplemented fetchBatch method can make use of the counters in order to know which offset to use to retrieve the next batch of items.
    • finally, when there’s no data left to process, the processBatch method informs us about this and the processing is over.

The method depicted above is of course very generic. In practice, the process often needs to be fine-tuned or can be adapted to be more performant for the specific use-case at hand. The nature of the data source often dictates how the details work. However, this is one of the things I like about Akka - it only provides the bare-bones infrastructure and leaves it up to the developer to choose which mechanism will be used for implementing the pipeline. Some improvements that we’d probably need in a real-life application include:

  • error handling using Akka’s supervisor strategies. Right now, our worker is very simple, and has a try - catch statement around the process method. But we don’t really recover from errors, we just let things fail and report the amount of errors we have. This may work well if those errors are fatal, but in practice it may be worth re-trying to process some of the items.
  • inform the client that our work is done. This could be as simple as saving the actor reference when the first ProcessBatch message is received, and sending a WorkDone message when all items are processed (don’t try sending that message to sender because by the time all work is done, the most likely sender of a message will be one of the child actors)
  • have nicer metrics regarding the processing, such as processing rate. I often use Code Hale’s metrics library for that purpose.

Poor man’s back-pressure

If you are a member of the Akka core team, skip this section. For the rest of humankind, this topic relates directly to one assumption we’ve done in the previous example: “our JVM has a decent amount of memory available so that the workers are able to queue all items in their mailbox”. In reality, this may not always be the case. However, if you continue to fetch data from a producer and pass it on to the workers, you will eventually run out of memory: by default, Akka uses an UnboundedMailbox which will keep on filling up.

In the example above, this problem is easy to fix, because we’re only ever passing on to the next batch when all items of the current one have been processed, which means that choosing an appropriate batch size is sufficient. However, we may have chosen a different implementation which pulls data out of the source continuously, because doing so is somewhat more expensive, or because you’re interested in running the system at maximum capacity.

In such a situation, we need a means to tell the supervisor to slow down. This concept is also known as back-pressure, and I invite you to read how to properly implement it. Sometimes, however, you may not be interested in optimally solving the back-pressure problem, and in this case, it may be ok to violate Akka’s prime directive: don’t block inside an actor.

It may not be optimal, nor elegant, but I’ve found this mechanism to work pretty well, provided that you have a rough idea of the memory limitation of the application at hand and that you’re not interested in scaling dynamically.

In order to be able to slow down appropriately, we need to decouple the supervisor and the data producer. This is how an implementation may look like:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import akka.actor.{Props, Actor}
import akka.event.Logging
import akka.routing.RoundRobinRouter
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._

abstract class Processor(dataSetId: Long) extends Actor {

  val log = Logging(context.system, "application")

  val workers = context.actorOf(Props[ItemProcessingWorker].withRouter(RoundRobinRouter(100)))
  val producer = context.actorOf(Props[DataProducer], name = "dataProducer")

  var totalItemCount = -1
  var currentItemCount = 0

  var allProcessedItemsCount = 0
  var allProcessingErrors: List[ItemProcessingError] = List.empty

  val MAX_LOAD = 50

  def receive = {

    case Process =>
      if (totalItemCount == -1) {
        totalItemCount = totalItems
        log.info(s"Starting to process set with ID $dataSetId, we have $totalItemCount items to go through")
      }
      val index = allProcessedItemsCount + allProcessingErrors.size
      if (currentItemCount < MAX_LOAD) {
        producer ! FetchData(index)
      }

    case Data(items) =>
      processBatch(items)

    case GetLoad =>
      sender ! currentItemCount

    case ProcessedOneItem =>
      allProcessedItemsCount = allProcessedItemsCount + 1
      currentItemCount = currentItemCount - 1
      continueProcessing()

    case error@ItemProcessingError(_, _, _) =>
      allProcessingErrors = error :: allProcessingErrors
      currentItemCount = currentItemCount - 1
      continueProcessing()

  }

  def processBatch(batch: List[Item]) = {

    if (batch.isEmpty) {
      log.info(s"Done migrating all items for data set $dataSetId. $totalItems processed items, we had ${allProcessingErrors.size} errors in total")
    } else {
      // distribute the work
      batch foreach { item =>
        workers ! item
        currentItemCount = currentItemCount + 1
      }
    }

  }

  def continueProcessing() = {

    val itemsProcessed = allProcessedItemsCount + allProcessingErrors.size

    if (itemsProcessed > 0 && itemsProcessed % 100 == 0) {
      log.info(s"Processed $itemsProcessed out of $totalItems with ${allProcessingErrors.size} errors")
    }

    self ! Process

  }

  def totalItems: Int

}

abstract class DataProducer extends Actor {

  private val MAX_LOAD = 50

  def receive = {

    case FetchData(currentIndex) =>
      throttleDown()
      sender ! Data(fetchBatch(currentIndex))

  }

  def throttleDown(): Unit = {
    implicit val timeout = Timeout(5.seconds)
    val eventuallyLoad = context.parent ? GetLoad
    try {
      val load = Await.result(eventuallyLoad, 5.seconds)

      if (load.asInstanceOf[Int] > MAX_LOAD) {
        Thread.sleep(5000)
        throttleDown()
      }

    } catch {
      case t: Throwable =>
        // we most likely have timed out - wait a bit longer
        throttleDown()
    }
  }

  def fetchBatch(currentIndex: Int): List[Item]

}

abstract class ItemProcessingWorker extends Actor {

  def receive = {
    case ProcessItem(item) =>
      try {
        process(item) match {
          case None => sender ! ProcessedOneItem
          case Some(error) => sender ! error
        }
      } catch {
        case t: Throwable =>
          sender ! ItemProcessingError(item.id, "Unhandled error", Some(t))
      }
  }

  def process(item: Item): Option[ItemProcessingError]

}

case object Process

trait Item {
  val id: Int
}

case class FetchData(currentIndex: Int)

case class Data(items: List[Item])

case object GetLoad

case class ProcessItem(item: Item)

case object ProcessedOneItem

case class ItemProcessingError(itemId: Int, message: String, error: Option[Throwable])

So how does this work? Let’s have a closer look:

  • we no longer retrieve data in batches - or rather, we do retrieve it in batches, but we don’t wait for a batch to be done in order to fetch more data. Hence, we don’t keep track of batch state.
  • we keep track of the current amount of items that are being processed through the currentItemCount counter in the Processor which gets increased each time an item is sent out and decreased each time a success or failure is reported
  • the data is no longer fetched directly in the Processor but instead in a child thereof called DataProducer. Each time an item has been processed, the Processor sends itself a Process message which has as consequence to ask the producer for more data - unless our current item count is already above the maximum load we can handle
  • now to the interesting part: the DataProducer will not retrieve new data right away, but instead check with the supervisor what the current load is. If it is higher than the maximum allowed (50 items in our example, because we’re e.g. dealing with high-resolution images or something silly of the like that takes up a lot of memory) it will wait for 5 seconds and check again, until the load is again acceptable. Only then does it send new data to the Processor which continues to do its job until there’s no more data left.

Some comments about this technique:

  • as said, you need to have a good sense of what the maximum should be. This mechanism is everything else than dynamic and won’t work if items are highly heterogenous (in the sense that processing them does take very different amounts of time)
  • it may be best to wait for a bunch of items to be processed before requesting a new batch, depending on the case at hand
  • the error handling in the example above would need to be improved for a real-life usage

There are many other mechanisms to deal with work distribution that can easily be implemented with Akka. One other mechanism I’d like to try out when I get a chance is work stealing, wherein the workers take an active role of retrieving work instead of having it pushed to them.

Gotchas

During my time with Akka I’ve run in a few gotchas. They mostly amount to not having read the documentation well enough, but I’ve seen other do some of those mistakes, so I think they might be useful to list here.

Create too many root actors

Root actors, created directly via the ActorSystem instead of the context, should be used with care: creating such actors is expensive as it requires to synchronize at the level of the guardian. Also, in most cases, you don’t need to have many root actors, but rather one actor for the task at hand that itself has a number of children.

Closing over a future

This is an easy reasoning mistake to make and is explained in detail in this post. Let’s consider this example:

1
2
3
4
5
6
7
8
9
def receive = {
  case ComputeResult(itemId: Long) =>
    computeResult(itemId).map { result =>
      // gotcha!
      sender ! result
    }
}

def computeResult(itemId: Long): Future[Int] = ???

So what’s the problem? computeResult produces a Future, thus it is switching to a different execution context and liberating the main thread. This means that another ComputeResult message may come in, replacing the sender reference. When the first future completes, we would then answer to the wrong sender.

An easy fix for this is to capture the sender in a val like so:

1
2
3
4
5
6
7
    def receive = {
      case ComputeResult(itemId: Long) =>
        val originalSender = sender
        computeResult(itemId).map { result =>
          originalSender ! result
        }
    }    

Misinterpreting the meaning of Future.andThen

This is mainly related to Futures rather than Actors, but I’ve encountered it while mixing both paradigms. The Future API has a method called andThen “used purely for side-effecting purposes”. In other words, it won’t transform a future and chain itself to it, but rather be executed independently.

I was wrongly assuming that andThen would return a new, transformed, Future, so I tried doing the following:

1
2
3
4
5
6
7
upload.andThen {
  case Success(s) => {
    FileUploadSuccess(item, s)
  }
  case Failure(t) =>
    FileUploadFailure(item, Some(t))
  } pipeTo originalSender

pipeTo is a quite powerful tool in Akka’s toolkit which allows to send the result of a future to an actor (under the hood it creates an anonymous actor that waits for the future to complete).

The correct implementation would be:

1
2
3
4
5
upload.map { result =>
  FileUploadSuccess(item, s)
 } recover { case t =>
    FileUploadFailure(item, Some(t))
 } pipeTo originalSender

Conclusion

In conclusion, I would like to re-iterate that Akka is a real useful tool to have in your toolset, and I think it’s going to stay around for a bit longer, so I can only advise anyone who hasn’t done so yet to go and try it out (even if you’re in Java-land, there’s a Java API as well). Another nice paradigm that is getting standardised as we speak are reactive streams, which deal with back-pressure as integral part of the concept.