In the previous article of this series we’ve explored the basics of the Akka Typed API: why it was created and what are its benefits in comparison to the classic Actor API, how to build typed actor systems via protocols and behaviors and how to create typed actors. In this series we’re going to go further down the route of building typed actor systems by looking at fundamental concepts necessary for the interaction between actors.

Let us start by defining the centerpiece actor of this system: the PaymentHandler. This is the actor that is responsible for handling the requests that are received by the API by retrieving the necessary configuration and then orchestrating the handling through the adequate components.

The Payment request flow

This time around we will be using the functional style of the typed API to create the actor.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
object PaymentHandling {

  def apply(configuration: ActorRef[ConfigurationMessage]): Behavior[PaymentHandlingMessage] = {
    Behaviors.setup[PaymentHandlingMessage] { context =>
      // TODO request the configuration and handle the payment request
      Behaviors.empty
    }
  }

  // ~~~ actor protocol

  sealed trait PaymentHandlingMessage
  case class HandlePayment(amount: Money, merchantId: MerchantId, userId: UserId) extends PaymentHandlingMessage

}

The API will send us a HandlePayment message upon which we can retrieve the necessary configuration from our Configuration actor:

Message flow for a payment request

If you still have the first article of this series in mind, you may have anticipated that at this point there will be an issue with our protocol definition and the use of the typed actor API. Let’s have a quick look at the message definitions:

1
2
3
4
5
6
7
8
// ~~~ Configuration responses
sealed trait ConfigurationResponse
final case class ConfigurationFound(merchantId: MerchantId, merchantConfiguration: MerchantConfiguration) extends ConfigurationResponse
final case class ConfigurationNotFound(merchanId: MerchantId) extends ConfigurationResponse

// ~~~ PaymentHandling protocol
sealed trait PaymentHandlingMessage
case class HandlePayment(paymentRequestId: PaymentRequestId, amount: Money, merchantId: MerchantId, userId: UserId) extends PaymentHandlingMessage

Indeed, the PaymentHandling actor does not have the knowledge of the response messages of Configuration - and in all fairness, it shouldn’t. I’ve always found this aspect of using the classic Akka API with message patterns of type Request-Response / Command-Event to be a bit cumbersome as I find it to be perfectly legitimate to define the response message types (or events, when you’re using that semantic) close to the actor that emits the messages but unfortunately this means that the actors that receive a “foreign” message will not have it as part of their own protocol. This in turn leads to the protocol definition in classic Akka API systems being incomplete, or should I say scattered, in the sense that the message protocols of one actor will always miss some messages sent by other actors.

So how do we work with this in the Akka Typed API? The answer is that those cases need to be made explicit with the use of Adapted Responses.

Adapted Responses

In order for PaymentHandling to consume responses of the Configuration actor we’ll need two things:

  • to define a message in the protocol of PaymentHandling reflecting those responses
  • to translate the responses from one protocol to the other, which we’ll do using a message adapter

Since in our case there are several possible response messages from Configuration, it would not be very practical to redefine all of them as part of the payment handling protocol. A simple mechanism is to use a wrapper like so:

1
case class WrappedConfigurationResponse(response: Configuration.ConfigurationResponse) extends PaymentHandlingMessage

Let’s now go ahead and implement the behavior of PaymentHandling using adapted responses:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
object PaymentHandling {

  def apply(configuration: ActorRef[ConfigurationMessage]): Behavior[PaymentHandlingMessage] =
    Behaviors.setup[PaymentHandlingMessage] { context =>

      // define an adapter that translates between the external protocol (the response from Configuration)
      // and the protocol of this actor
      val configurationResponseAdapter: ActorRef[Configuration.ConfigurationResponse] =
        context.messageAdapter { response => WrappedConfigurationResponse(response) }

      def handle(requests: Map[MerchantId, HandlePayment]): Behavior[PaymentHandlingMessage] =
        Behaviors.receiveMessage {
          case paymentRequest: HandlePayment =>
            configuration ! Configuration.RetrieveConfiguration(paymentRequest.merchantId, configurationResponseAdapter)
            // note: we use merchant IDs to retrieve the request state in this example to keep things simple
            // in reality, this isn't a working solution as there might be more requests per merchant ID that could be received
            // in a short time-frame and thus the state would be lost before the configuration was retrieved
            handle(requests.updated(paymentRequest.merchantId, paymentRequest))
          case wrapped: WrappedConfigurationResponse =>
            // handle the response from Configuration, which we understand since it was wrapped in a message that is part of
            // the protocol of this actor
            wrapped.response match {
              case Configuration.ConfigurationNotFound(merchantId) =>
                context.log.warning("Cannot handle request since no configuration was found for merchant", merchantId.id)
                Behaviors.same
              case Configuration.ConfigurationFound(merchantId, merchantConfiguration) =>
                requests.get(merchantId) match {
                  case Some(request) =>
                  // TODO relay the request to the proper payment processor
                    Behaviors.same
                  case None =>
                    context.log.warning("Could not find payment request for merchant id {}", merchantId.id)
                    Behaviors.same
                }
            }

        }

      handle(requests = Map.empty)
    }
  // ...
}

The part that’s most interesting to us here is the configurationResponseAdapter:

1
2
val configurationResponseAdapter: ActorRef[Configuration.ConfigurationResponse] =
  context.messageAdapter { response => WrappedConfigurationResponse(response) }

This is the mechanism by which we can turn an ActorRef[PaymentHandlingMessage] into an ActorRef[ConfigurationResponse], thus allowing the Configuration actor to reply to the PaymentHandling actor and to translate the messages transparently. As shown here, using a wrapper to this effect makes sense when there are several possible responses to translate - for simpler cases, a direct mapping without a wrapper may be enough.

Since we are using the functional style of the Typed Actor API here, the state of the actor is not kept in a mutable data structure but instead passed down from one behavior to the next by virtue of the function definition:

1
def handle(requests: Map[MerchantId, HandlePayment]): Behavior[PaymentHandlingMessage] = ...

When returning this behavior, the state can now be altered by calling handle with different value for the request map.

Note that the choice of using of a Map[MerchantId, HandlePayment] is a pretty poor one which wouldn’t work in real life (and which I only took to keep the example simple): as soon as subsequent HandlePayment messages with the same merchantId are received, chances are that the first values would be overwritten. There are several correct solutions for this:

  • using a multi-valued map
  • extending the protocol so that each incoming HandlePayment message contains a unique request identifier, and modify the Configuration protocol to include that request identifier in order to be able to correlate the configuration responses
  • use the ask pattern which works well for this type of request-response situation where the protocol does not carry the required context to allow for correlation

Let’s have a look at the ask pattern in more detail which also allows us to map the response.

The ask pattern

The ask pattern allows two actors to interact in such a way that there’s a 1 to 1 mapping between the request and the response. Since there’s no guarantee that the responding actor will in fact ever respond, the ask pattern requires to define a timeout after which the interaction is considered to fail. What happens under the hood is that a TimeoutException is thrown.

In the following implementation of the PaymentHandling we no longer keep track of the in-flight payment requests as there’s now a direct mapping for the interactions with the configuration service. Instead we extend the internal protocol of the PaymentHandling actor to carry the information we require:

1
2
3
  // ~~~ internal protocol
  case class AdaptedConfigurationResponse(response: Configuration.ConfigurationResponse, request: HandlePayment) extends PaymentHandlingMessage
  case class ConfigurationFailure(exception: Throwable) extends PaymentHandlingMessage

Our actor implementation now becomes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
object PaymentHandling {

  def apply(configuration: ActorRef[ConfigurationMessage]): Behavior[PaymentHandlingMessage] =
    Behaviors.setup[PaymentHandlingMessage] { context =>
      Behaviors.receiveMessage {
        case paymentRequest: HandlePayment =>
          // define the timeout after which the ask request has failed
          implicit val timeout: Timeout = 1.second

          def buildConfigurationRequest(ref: ActorRef[Configuration.ConfigurationResponse]) =
            Configuration.RetrieveConfiguration(paymentRequest.merchantId, ref)

          context.ask(configuration)(buildConfigurationRequest) {
            case Success(response: Configuration.ConfigurationResponse) => AdaptedConfigurationResponse(response, paymentRequest)
            case Failure(exception) => ConfigurationFailure(exception)
          }

          Behaviors.same
        case AdaptedConfigurationResponse(Configuration.ConfigurationNotFound(merchantId), _) =>
          context.log.warning("Cannot handle request since no configuration was found for merchant", merchantId.id)
          Behaviors.same
        case AdaptedConfigurationResponse(Configuration.ConfigurationFound(merchantId, merchantConfiguration), request) =>
          // TODO relay the request to the proper payment processor
          Behaviors.unhandled
        case ConfigurationFailure(exception) =>
          context.log.warning(exception, "Could not retrieve configuration")
          Behaviors.same
      }
    }
    // ...
}

Again the really interesting bit of code here is the invocation of ask, which for the Scala API is a function with 4 parameter lists (3 explicit and one implicit):

1
2
3
4
context.ask(configuration)(buildConfigurationRequest) {
  case Success(response: Configuration.ConfigurationResponse) => AdaptedConfigurationResponse(response, paymentRequest)
  case Failure(exception) => ConfigurationFailure(exception)
}
  • the first parameter list takes the target to which we want to send the request, in our case an ActorRef[ConfigurationMessage].
  • the second parameter list takes a function that constructs the request to be sent given an actor reference to reply to. We could have defined the function inline as well, but for the clarity of the example it is defined separately first and then passed as a function reference
  • the third parameter list takes a function that evaluates the result of a Try and turns it into a message that will be sent to the requesting actor. As such, it needs to be part of the protocol that the actor understands

Whilst this method signature might look a bit cumbersome at first, I think that it is a really good move on the part of the Akka team as it entirely eliminates a source of mistakes related to ask returning a Future in the classic Actor API, which made it possible to mistakenly close over mutable state of the actor.

As we now have retrieved the configuration we can proceed to contacting the right payment processor and ask it to perform the payment. For this purpose, let’s have a look at actor discovery.

Discovering actors with the Receptionist

With a name that could have been given to a Matrix character, The Receptionist allows you to get typed actor references given a key. If you’re familiar with the classic Actor API then this is what replaces the ActorSelection.

Edgar Poe in the series Altered Carbon sure is a committed Receptionist

The way in which the discovery mechanism works is pretty straight-forward — it simply acts as a registry:

  • each actor that wants to become discoverable needs to register itself by sending a Register message to the Receptionist actor and specifying a ServiceKey
  • any actor that requires a reference to a discoverable actor can query the Receptionist using a Find message or it can subscribe to updates using a Subscribe message

In our example, we don’t want to have to query the whereabouts of our processor actors for every request, therefore we’ll be using the subscription mechanism instead.

Let’s start by fleshing out a really simply processor and registering it with the Receptionist when it starts up. To make the processors even more pluggable, they will share the same protocol:

1
2
3
4
object Processor {
  sealed trait ProcessorRequest
  case class Process(amount: Money, merchantConfiguration: Configuration, userId: UserId) extends ProcessorRequest
}

We can now start with scaffolding the first payment processor for credit cards:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
object CreditCardProcessor {

  def process: Behavior[ProcessorRequest] = Behaviors.setup { context =>
    // register with the Receptionist which makes this actor discoverable
    context.system.receptionist ! Receptionist.Register(Key, context.self)
    // TODO implement the actual behaviour
    Behaviors.unhandled
  }

  val Key: ServiceKey[ProcessorRequest] = ServiceKey("creditCardProcessor")
}

As described earlier on, this process is quite simple: whenever the CreditCardProcessor is started it will register its actor reference with the Receptionist.

Note that the ServiceKey takes a type parameter, which is supposed to be the type of the protocol understood by the registered actor.

Next, we need to subscribe the PaymentHandling actor to updates of the Receptionist so that we are made aware of all the available processors:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
object PaymentHandling {

  def apply(configuration: ActorRef[ConfigurationMessage], paymentProcessors: Set[Listing]): Behavior[PaymentHandlingMessage] =
    Behaviors.setup[PaymentHandlingMessage] { context =>

      // subscribe to the processor reference updates we're interested in
      val listingAdapter: ActorRef[Receptionist.Listing] = context.messageAdapter { listing =>
        AddProcessorReference(listing)
      }
      context.system.receptionist ! Receptionist.Subscribe(CreditCardProcessor.Key, listingAdapter)

      Behaviors.receiveMessage {
        case AddProcessorReference(listing) =>
          apply(configuration, paymentProcessors + listing)
        // ...  
      }
    }

  // ...

  // ~~~ internal protocol
  // ...
  case class AddProcessorReference(listing: Receptionist.Listing) extends PaymentHandlingMessage
}

Note that since the Receptionist will return a Listing message, we need to use a message adapter coupled with an internal protocol message (AddProcessorReference) to be able to understand the update.

At this point we now have a set of Listing’s at our disposal which we can use to send off the message to the right processor using the configuration (this step isn’t shown in the example, but you can imagine that given the right configuration this should be rather simple).

Concept comparison table

As usually in this series, here’s an attempt at comparing concepts in Akka Classic and Akka Typed (see also the official learning guide):

Akka Classic Akka Typed
- context.messageAdapter
ask / ? context.ask
ActorSelection receptionist
And this is it for the second article of this series! You can find the source code of this article here.

Go to part 3 of this series