Update 21/11/2019: Fixed comment about shardId extraction in relation to ShardingEnvelope

Welcome to the fifth part of the Akka Typed series! In this part, we’ll leave the safe harbor of a single JVM and sail into the seas of distributed systems by exploring a key features of Akka Cluster with the typed API: Cluster Sharding. If you want to get a more in-depth introduction to Akka Cluster, I invite you to check out the article series on this topic.

Before we get started, here’s a quick reminder of what we’ve seen so far: in the first part we had a look at the the raison d’être of Akka Typed and what advantages it has over the classic, untyped API. In the second part we introduced the concepts of message adapters, the ask pattern and actor discovery. In the third part, we covered one of the core concepts of Actor systems: supervision and failure recovery. Finally, in the fourth part, we covered one of the most popular use-cases for Akka: event sourcing.

Redesigning the system for scale and resiliency

Up until now, our payment handling system is quite linear in the way it works and not (yet) fit for a higher throughput of requests. When having a critical look at the initial design this becomes quite visible:

All happens in an orderly fashion

Indeed, the request handling is completely sequential: in order to be processed, the request (or its derivates) must flow through the PaymentHandler, then the Configuration, then the CreditCardPaymentProcessor and then back.

Actors handle one message at the time. Our PaymentHandling and CreditCardProcessor actors will deal with one message after the other and use (at most) one CPU core for this purpose each. The same holds true for Configuration, but since this actor should in fact be a persistent actor (it is possible to modify the configuration), there can only be one of its kind. But scaling out isn’t the only issue we need to address in order to build a truly reactive payment system.

Right now, if our PaymentHandling actor crashes while a request is being processed, there’s no mechanism to ensure that it will be started again. In fact, the system won’t even remember that there was a request to handle in the first place. We could of course turn PaymentHandling into a persistent actor, remembering all the in-flight requests - but this would quickly turn into a bottleneck for the entire system.

Instead, let’s explore a slightly different approach for which we’ll need to refactor our current PaymentHandling actor (which won’t hurt anyway, since it has become rather large already). We’ll be making use of a variation of the per-session child actor pattern: for each incoming request, we’ll delegate the request handling to a dedicated actor that itself will be persistent.

From a logical perspective, this is what our system will now look like:

Payment Handling revisited: many more arrows

In order to scale out on as many machines as we require, we will be making use of three Akka Cluster features that we will be exploring in more detail later (in the rest of the article, and in the next one):

Scaling out on many machines: Cluster Sharding in green, Routers in violet and Cluster Singleton in blue

  • PaymentRequestHandler actors will be deployed using Cluster Sharding (in green on the figure above)
  • CreditCardProcessor actors will be scaled out and deployed via Routers (in violet in the picture above)
  • The Configuration actor will be deployed as a Cluster Singleton (in blue next to node 1 on the figure above)

Scaling out request handling with Cluster Sharding

Our existing PaymentHandling actor is taking care of a few things:

  • it tracks the deployment of payment processors (thanks to the receptionist we explored in part 2 of this series)
  • it receives incoming payment requests
  • it queries the Configuration actor
  • it forwards requests to the appropriate Processor

We will now proceed to splitting this actor into two pieces:

  • the PaymentHandling actor which tracks deployed payment processors, sets up cluster sharding and delegates the request to a new PaymentRequestHandler entity
  • the PaymentRequestHandler that handles a request by querying Configuration and then the appropriate Processor

Step 1: splitting PaymentHandling in two

Let’s start by simplifying our protocol. We’ll now be able to deal with two types of messages: the incoming payment requests and the updates to receptionist listings (which are not part of the public protocol). Our protocol will therefore be:

1
2
3
4
5
6
7
// ~~~ public protocol
sealed trait Command
final case class HandlePayment(amount: Money, merchantId: MerchantId, userId: UserId, sender: ActorRef[PaymentRequestHandler.Response]) extends Command

// ~~~ internal protocol
sealed trait InternalMessage extends Command
private final case class AddProcessorReference(listing: Receptionist.Listing) extends InternalMessage

Now, let’s simplify the actor to a minimum so it can listen to changes in available processors and create dedicated actors per request. We will start without sharding by simply creating new child actors for the moment:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
object PaymentHandling {

  def apply(configuration: ActorRef[ConfigurationRequest]): Behavior[Command] =
    Behaviors.setup[Command] { context =>

      // subscribe to the processor reference updates we're interested in
      val listingAdapter: ActorRef[Receptionist.Listing] = context.messageAdapter { listing =>
        AddProcessorReference(listing)
      }
      context.system.receptionist ! Receptionist.Subscribe(CreditCardProcessor.Key, listingAdapter)

      def handleRequest(paymentProcessors: Set[Listing]): Behavior[Command] =
        Behaviors.receiveMessage {
          case AddProcessorReference(listing) =>
            handleRequest(paymentProcessors + listing)
          case paymentRequest: HandlePayment =>
            // spawn one child per request
            val requestHandler = context.spawn(
              PaymentRequestHandler(paymentRequest.sender, configuration, paymentProcessors),
              paymentRequest.orderId.id
            )
            requestHandler ! PaymentRequestHandler.HandlePaymentRequest(paymentRequest.orderId, paymentRequest.amount, paymentRequest.merchantId, paymentRequest.userId)
            Behaviors.same
        }

      // initial behavior
      handleRequest(Set.empty)
    }

  // here comes the protocol defintion
  // ...
}

That’s for the easy part. Let’s now look into the PaymentRequestHandler. It has to take care of 3 things:

  • accepting payment requests and retrieving the configuration
  • handling the configuration response and calling the processor
  • handling the processor response and relaying the response to the original client

This brings us to the following protocol (public and internal):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// public protocol
sealed trait Command
final case class HandlePaymentRequest(orderId: OrderId, amount: Money, merchantId: MerchantId, userId: UserId, replyTo: ActorRef[Response]) extends Command

sealed trait Response
final case class PaymentAccepted(transactionId: TransactionId) extends Response
final case class PaymentRejected(reason: String) extends Response

// internal protocol
sealed trait InternalMessage extends Command
private final case class AdaptedConfigurationResponse(orderId: OrderId, response: Configuration.ConfigurationResponse) extends InternalMessage
private final case class AdaptedProcessorResponse(orderId: OrderId, response: Processor.ProcessorResponse) extends InternalMessage

Note that as per the style guide, we have made internal messages private.

When it comes to implementing the actor itself, we will use 3 behaviors to reflect the 3 states the actor can be in (waiting for a request, waiting for the configuration, waiting for a processing reply):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
object PaymentRequestHandler {

  def apply(client: ActorRef[Response], configuration: ActorRef[Configuration.ConfigurationRequest],
            paymentProcessors: Set[Listing]): Behavior[Command] = Behaviors.setup { context =>

      val configurationAdapter: ActorRef[Configuration.ConfigurationResponse] = // ... 
      val processingAdapter: ActorRef[Processor.ProcessorResponse] = // ...

      def handlePaymentRequest: Behavior[Command] =
        Behaviors.receiveMessage {
          case request: HandlePaymentRequest =>
            // bootstrap request handling by fetching the configuration
            configuration ! Configuration.RetrieveConfiguration(request.merchantId, request.userId, configurationAdapter)
            handleConfigurationResponse(request)
          case _ => Behaviors.unhandled
        }

      def handleConfigurationResponse(request: HandlePaymentRequest): Behavior[Command] =
        Behaviors.receiveMessage {
          case AdaptedConfigurationResponse(_, config: Configuration.ConfigurationFound) =>
            processRequest(config, request.amount)
          // ... 
        }

      def handleProcessorResponse: Behavior[Command] =
        Behaviors.receiveMessage {
          case AdaptedProcessorResponse(_, Processor.RequestProcessed(transaction)) =>
            client ! PaymentAccepted(transaction.id)
            // we've done our job, now shut down
            Behaviors.stopped
          case _ => Behaviors.unhandled
        }

      def processRequest(config: Configuration.ConfigurationFound, amount: Money): Behavior[Command] = // ...

      // initial behavior
      handlePaymentRequest
    }

The full implementation can be found in the source code for this article series.

Step 2: making the PaymentRequestHandler persistent

In order to be able to remember that we received a payment request in the face of crashes we need to make the PaymentRequestHandler persistent. We’ll do so by persisting events:

1
2
3
sealed trait Event
case class PaymentRequestReceived(orderId: OrderId, amount: Money, merchantId: MerchantId, userId: UserId, replyTo: ActorRef[Response]) extends Event
case class PaymentRequestProcessed(transactionId: TransactionId) extends Event

(note: we’re persisting an ActorRef[Payment]in this example and are exposed to the risk of this reference not being available anymore after de-hydration. For the purpose of this example, we’ll assume that the actor reference of the client is stable and always available to recovered actors - but be aware that this may not always be the case, such as for example when catering to clients backed by an Akka HTTP endpoint)

Just as we have seen in the previous article on event sourcing with Akka Typed, we also need to define the State for this actor. Conceptually, it functions as a finite state machine capable of handling different types of commands in different states (just as the ones we modeled using different Behavior-s previously):

  • when nothing has happened yet (initial, blank state)
  • when having accepted a request to handle
  • when receiving the processing result

Note that we don’t model an explicit state for waiting for the configuration - we will just fetch it anew should we need to do so. This leads us to the following state definition:

1
2
3
4
sealed trait State
final case object Empty extends State
final case class ProcessingPayment(client: ActorRef[Response], orderId: OrderId, amount: Money, merchantId: MerchantId, userId: UserId) extends State
final case class PaymentProcessed(client: ActorRef[Response], transactionId: TransactionId, orderId: OrderId, amount: Money, merchantId: MerchantId, userId: UserId) extends State

Note that technically, in the current implementation, we will be stopping the actor once the payment has been processed. That is, it could be that the client sends us the same request twice - and in this case, we need to remember that we are done in order to provide an idempotent reply, which is why we explicitly model the PaymentProcessed state.

Our actor now gets the following structure:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
object PaymentRequestHandler {

  // orderId is now a part of the factory and serves as persistence identifier
  def apply(orderId: OrderId,
            client: ActorRef[Response],
            configuration: ActorRef[Configuration.ConfigurationRequest],
            paymentProcessors: Set[Listing]): Behavior[Command] = Behaviors.setup { context =>

    val configurationAdapter: ActorRef[Configuration.ConfigurationResponse] = // same as previously
    val processingAdapter: ActorRef[Processor.ProcessorResponse] = // same as previously

    def commandHandler(state: State, command: Command): Effect[Event, State] = ???

    def eventHandler(state: State, event: Event): State = ???

    // ...

    EventSourcedBehavior[Command, Event, State](
      PersistenceId(orderId.id),
      Empty,
      commandHandler,
      eventHandler
    )

  }

The full implementation of the persistent actor can be found in the sources of this article. Note that the sources are not written in the “handler as part of the state” pattern - this is left as an exercise to the reader.

Step 3: setting up sharding

Let’s now get to the interesting part: turning the request handler into a sharded entity.

Shard Regions contain Shards which themselves contain the sharded entities

In Akka Cluster Sharding, the actors to be sharded are referred to as sharded entities. They each have an identifier that is globally unique across the entire cluster. Those sharded entities are themselves part of so-called shards, which you can think of as containers for holding a number of sharded entities. The reason for this design is to cut down on communication and coordination overhead when a node is shut down and its shards need to be re-created someplace else, or when the distribution of entities in the cluster becomes unequal and a shards must be re-balanced. Within a cluster node, the shards are managed by a shard region.

In order to take advantage of Cluster Sharding, we will hand over the creation of the PaymentRequestHandler actors - as well as the decision of where they are deployed (i.e. which node in the cluster) and for how long they stay around in memory - to Akka. We won’t have to worry about operational details such as failover or load balancing (where actors are deployed) ourselves - as the Cluster Sharding extension will take care of it for us.

That doesn’t mean we don’t have anything to do. When setting up sharding there is a key decision to be taken (and it should not be taken lightly): how to distribute shards on nodes.

The distribution of shards on nodes is important because it directly influences the performance of the overall system:

  • if there are too few shards in relation to nodes, then the load distribution will be uneven as the shards will get “too big”, i.e. they will contain too many entities all the whilst some nodes will not be hosting any shards.
  • if there are too many shards in relation to nodes, the shards will be “too small”, each only containing a few entities. Moving individual shards will be fast, but as there are many, there will be quite some communication overhead caused by having to frequently re-balance shards in order to keep the load distribution equal in the cluster (i.e. there will be a lot of re-balancing, which in turns also impact latency).

The decision of how many shards to allocate is taken by defining a sharding algorithm based on the entity identifiers - basically, it should be possible to infer from the entity id which shard the message should be routed to. This algorithm can’t, of course, be changed while the cluster is running (i.e. you cannot do a rolling deployment that introduces a new sharding algorithm), therefore it should be chosen carefully.

Let’s now have a look at how to set up sharding in the PaymentHandling actor, which used to spawn the PaymentRequestHandler children directly.

First, we need to add a new dependency to our project:

1
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion

Next, we retrieve the sharding extension:

1
2
// initialize the shading extension
val sharding = ClusterSharding(context.system)

So far, so good. Let us now initialize sharding for our entities. For this, we need mainly 4 things:

  • an EntityTypeKey[MessageProtocol] used to describe which entity is being sharded - including, as its signature suggest, its message protocol
  • a MessageExtractor that knows how to extract the entityId of a message as well as how to turn an entityId into a shardId
  • a factory method for building the behavior of the sharded entities from an EntityContext[MessageProtocol]
    a stop message sent to the sharded entity before being shut down (before rebalance, passivation, shutdown, etc.). This is particularly important for persistent actors, as the default is to use a PoisonPill which immediately stops the actor, regardless of whether everything has been written to the journal

With that in mind, let’s get started. The entity type key is rather straighforward:

1
val PaymentRequestHandlerTypeKey = EntityTypeKey[PaymentRequestHandler.Command]("PaymentRequestHandler")

For the message extractor, we will use the provided HashCodeNoEnvelopeMessageExtractor[MessageProtocol] which allows to specify how to extract the entity id from a message, and then takes care of deriving a shard identifier based on the hashcode of the entity id:

1
2
3
4
5
6
7
// define a message extractor that knows how to retrieve the entityId from a message
// we plan on deploying on a 3-node cluster, as a rule of thumb there should be 10 times as many
// shards as there are nodes, hence the numberOfShards value of 30
val messageExtractor =
  new HashCodeNoEnvelopeMessageExtractor[PaymentRequestHandler.Command](numberOfShards = 30) {
    override def entityId(message: PaymentRequestHandler.Command): String = message.orderId.id
  }

Note that we have made orderId a field of the PaymentRequestHandler.Command protocol in order to use this approach.

An alternative to using a property of the message (in most cases, a unique identifier - in our case we use orderId) in order to derive the entityId and shardId “outside” of the messages is to capture these properties as part of the messages, with the caveat that the entity needs to be aware of this sharding-specific protocol.

Finally, let’s assemble everything and start the shard region. We will use a new custom stop message added to the protocol of the PaymentRequestHandler:

1
2
3
4
5
6
7
val shardRegion: ActorRef[PaymentRequestHandler.Command] =
  sharding.init(
    Entity(PaymentRequestHandlerTypeKey) { context =>
      PaymentRequestHandler(OrderId(context.entityId), PersistenceId(context.entityTypeKey.name, context.entityId), configuration)
  }
   .withMessageExtractor(messageExtractor)
   .withStopMessage(PaymentRequestHandler.GracefulStop))

The behavior factory takes an EntityContext[MessageProtocol] that provides useful context, such as the entityId. As you may have noticed, we’ve had to adjust the behavior factory of the PaymentRequestHandler to allow creating entities: we’re now providing it with:

  • the orderId of the order at hand
  • a PersistenceIduseful for our persistent entity
  • a reference to the configuration actor

The PersistenceIdis constructed on the basis of the TypeKey and the entityId, which allows entities of different types to have the same identifier (in sharding, the entity identifiers must be globally unique accross the cluster).

The rest of the payment request information is provided to the entity as part of the HandlePaymentRequest message.

In order to send messages to the entity, we use the shardRegion obtained previously:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def handleRequest(paymentProcessors: Set[Listing], shardRegion: ActorRef[PaymentRequestHandler.Command]): Behavior[Command] =
  Behaviors.receiveMessage {
    case paymentRequest: HandlePayment =>
      shardRegion ! PaymentRequestHandler.HandlePaymentRequest(
        paymentRequest.orderId,
        paymentRequest.amount,
        paymentRequest.merchantId,
        paymentRequest.userId,
        paymentRequest.sender
      )
      Behaviors.same
  }

A sharded entity is be created by its shard when a first message is sent to it. If it is idle (i.e. if it receives no messages) for 120 seconds (by default) then it will be passivated, i.e. it will receive the stop message from its parent shard.

Step 4: resuming processing in case of crash

There is one more thing we need to take care of. If the node which handles the request crashes, or if the entity is passivated while processing the request (for example, while waiting for a reply of the configuration), we need a way to bring it back up to life and to resume processing.

We will be using a feature of Akka Cluster Sharding adequately called remember entities: when the shard that holds the entities is re-allocated (after re-balancing or after a crash), all the entities it holds will be started again automatically. This feature needs to be explicitly enabled in the configuration:

akka.cluster.sharding.remember-entities = on

(note: as you can imagine, there’s a cost associated with using this feature - it should not be used for systems that holds a large number of entities per shard, as it slows down the rebalancing of shards)

What’s now left to do is to figure out where we left of and resume processing. For this, we’ll handle the RecoveryCompletedsignal and take appropriate action:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
EventSourcedBehavior[Command, Event, State](
  persistenceId = persistenceId,
  emptyState = Empty,
  commandHandler = commandHandler,
  eventHandler = eventHandler
).receiveSignal {
  case (state: ProcessingPayment, RecoveryCompleted) =>
    // request configuration again
    configuration ! Configuration.RetrieveConfiguration(state.merchantId, state.userId, configurationAdapter)
}

This is it for this article! In the next one, we’ll continue looking into the Akka Typed Cluster extensions with routers and Cluster Singleton.

Concept comparison table

As usually in this series, here’s an attempt at comparing concepts in Akka Classic and Akka Typed (see also the official learning guide):

Akka Classic Akka Typed
typeName String TypeKey
ClusterSharding(system).start(...) ClusterSharding(system).init(...)

Go to part 6 of this series