文章目录
  1. 1. Chinnel types
    1. 1.1. Point to Point
    2. 1.2. Publish subscribe
    3. 1.3. CUSTOM EVENTBUS
  2. 2. Specialize Channel
    1. 2.1. Dead letter
  3. 3. Guaranteed delivery
  4. 4. Using a Finite State Machine
    1. 4.1. Creating an FSM Model
    2. 4.2. TESTING THE FSM
    3. 4.3. Timers within FSM
    4. 4.4. Termination of FSM
  5. 5. Implement Shared state using agents
    1. 5.1. Waiting for the state update

Chinnel types

Other names which are often used for these kind of channels are EventQueue or EventBus. Akka has an EventStream which implements a publish-subscribe channel. But when this implementation isn't sufficient, then Akka has a collection of traits which helps to implement an custom publish subscribe channel.

Next we describe two special channels. The first is the Dead Letter channel, which contain message that couldn't be delivered. This channel can help when debugging, why some messages aren't processed or to monitor where there are problems.

Point to Point

The point-to-point channel sends the message to one receiver.

point 2 point img

The round-robin Router in section 7.3.1 is an example of the channel having multiple receivers. The processing of the messages can be done concurrently by different Receivers, but only one Receiver consumes any one message.

Because in Akka the ActorRef is the implementation of a point-to-point channel. Because all the messages send will be delivered to one Actor.

Publish subscribe

The channel can also deliver the same message to multiple receivers.

To solve this problem we can use the Publish-subscribe channel. The channel is able to send the same message to multiple receivers, without the sender knows which receiver.

When a receiver is interested in a message of the publisher, it subscribes itself to the channel.

The most easiest when needed a publish-subscribe channel, is to use the EventStream.The EventStream can be seen as a manager of multiple Publish-Subscribe channels.

1
2
3
4
5
6
7
8
9
// subscirbe to the EventStream to receive Order messages
system.eventStream.subscribe(giftModule, classOf[Order])
// unsubscribe
system.eventStream.unsubscribe(giftModule, classOf[Order])

// 取消订阅所有消息
system.eventStream.unsubscribe(giftModule.ref)
// 发布消息
system.eventStream.publish(msg)

CUSTOM EVENTBUS

Let assume that we only want to send a gift when someone ordered more than one book.

An EventBus is generalized so that it can be used for all implementations of a publish-subscribe channel. In the generalized form there are three entities.

  • Event, This is the type of all events published on that bus. In the Akka EventStream all uses AnyRef as event and therefor supports all type of messages
  • Subscriber, This is the type of subscribers allowed to register on that event bus. In the Akka EventStream the subscribers are ActorRef's
  • Classifier, This defines the classifier to be used in selecting subscribers for dispatching events.

EventBus Interface

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package akka.event
trait EventBus {
  type Event
  type Classifier
  type Subscriber
  /**
  * Attempts to register the subscriber to the specified Classifier
  * @return true if successful and false if not (because it was
  * already subscribed to that Classifier, or otherwise)
  */
  def subscribe(subscriber: Subscriber, to: Classifier): Boolean
  /**
  * Attempts to deregister the subscriber from the specified Classifier
  * @return true if successful and false if not (because it wasn't
  * subscribed to that Classifier, or otherwise)
  */
  def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
  /**
  * Attempts to deregister the subscriber from all Classifiers it may
  * be subscribed to
  */
  def unsubscribe(subscriber: Subscriber): Unit
  /**
  * Publishes the specified Event to this bus
  */
  def publish(event: Event): Unit
}
1
2
3
4
5
6
7
class OrderMessageBus extends EventBus {
  type Event = Order
  // chosen to classify the Order messages on the criteria "is Multiple
  // Book Order" and use a Boolean as classifier 
  type Classifier = Boolean

}

Akka has three composable traits which can help in keeping track of the subscribers.

  • LookupClassification, It maintain a set of subscribers for each possible classifier and extract a classifier from each event.
  • SubchannelClassification, This trait is used when classifiers form a hierarchy and it is desired that subscription can be possible not only at the leaf nodes, but also to the higher nodes.
  • ScanningClassification, it can be used when classifiers have an overlap. This means that one Event can be part of more classifiers, for example if we give more gifts when ordering more books.

API of LookupClassification

  • classify(event: Event): Classifier This is used for extracting the classifier from the incoming events.
  • compareSubscribers(a: Subscriber, b: Subscriber): Int This method must define a order over the subscribers, to be able to compare them just as the java.lang.Comparable.compare method.
  • publish(event: Event, subscriber: Subscriber), This method will be invoked for each event for all subscribers which registered themselves for the events classifier.
  • mapSize: Int, This returns the expected number of the different classifiers. This is used for the initial size of an internal data structure.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import akka.event.ActorEventBus
import akka.event.{ LookupClassification, EventBus }

class OrderMessageBus extends EventBus with LookupClassification
    with ActorEventBus {  //  defines that the subscriber is an ActorRef.
  type Event = Order
  type Classifier = Boolean
  def mapSize = 2
  
  protected def classify(event: StateEventBus#Event) = {
    event.number > 1
  }
  
  // publish method by sending the event to the subscriber
  protected def publish(event: OrderMessageBus#Event,
      subscriber: OrderMessageBus#Subscriber) {
    subscriber ! event
  }

}


// Test for event bus
val bus = new OrderMessageBus
val singleBooks = TestProbe()
bus.subscribe(singleBooks.ref, false)

val multiBooks = TestProbe()
bus.subscribe(multiBooks.ref, true)

val msg = new Order("me", "Akka in Action", 1)
bus.publish(msg)

singleBooks.expectMsg(msg)
multiBooks.expectNoMsg(3 seconds)

val msg2 = new Order("me", "Akka in Action", 3)
bus.publish(msg2)
singleBooks.expectNoMsg(3 seconds)
multiBooks.expectMsg(msg2)

Specialize Channel

DeadLetter channel, Only failed message are put on this channel. Listening on this channel can help to find problems in your system.

Guaranteed deliver channel, his channel guaranties all messages which are send are also delivered

Dead letter

By monitoring this channel you know which messages aren't processed and can take corrective actions.

To get these dead letter messages you only need to subscribe your actor to the EventStream with the DeadLetter class as the Classifier.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val deadLetterMonitor: ActorRef

system.eventStream.subscribe(
  deadLetterMonitor,
  classOf[DeadLetter])

// 测试代码
val deadLetterMonitor = TestProbe()
system.eventStream.subscribe(
    deadLetterMonitor.ref,
    classOf[DeadLetter])
    
val actor = system.actorOf(Props[EchoActor], "echo")
actor ! PoisonPill
val msg = new Order("me", "Akka in Action", 1)

actor ! msg

//  wrapped also into a DeadLetter object
val dead = deadLetterMonitor.expectMsgType[DeadLetter]
dead.message must be(msg)
dead.sender must be(testActor)
dead.recipient must be(actor)

Guaranteed delivery

The guaranteed delivery channel is point-to-point channel with the guaranty that the message is always delivered to the receiver.

This means that the channel must have all kind of mechanism and checks to be able to guaranty the delivery, for example the message has to be saved on disk in case the process crashes.

The general rule of message delivery is that messages are delivered at-most-once. This means that Akka promise that messages are delivered once or fails to deliver, Which means that the message is lost.

Sending local messages will not likely fails, because it is like a normal method call. This fails only when there are catastrophic VM errors, like StackOverflowError, OutOfMemoryError or a memory access violation. So the guaranties when sending a message to a local actor, are pretty good and reliable.

The problem of losing the messages is when using remote actors. When using remote actors, it is a lot more likely for a message delivery failure to occur.

The Egress is an Actor which is started by the ReliableProxy and both Actors implements the checks and resend functionality to be able to keep track of which of the messages are delivered to the remote receiver.

One restriction of using the ReliableProxy is that the tunnel is only one-way and for one receiver. This means that when the receiver replies to the sender the tunnel is NOT used.

1
2
3
4
5
import akka.contrib.pattern.ReliableProxy
val echo = system.actorFor(node(server) / "user" / "echo")
// In the example we create a proxy using the echo reference.
// When failing to send a message it is retried after 500 milliseconds.
val proxy = system.actorOf(Props(new ReliableProxy(echo, 500.millis)), "proxy")

We create a Multi-node test with two nodes, the client and server node. On the server Node we create a EchoActor as receiver and on the client node we run our actual test.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import akka.remote.testkit.MultiNodeSpecCallbacks
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

trait STMultiNodeSpec
    extends MultiNodeSpecCallbacks
    with WordSpec
    with MustMatchers
    with BeforeAndAfterAll {
  override def beforeAll() = multiNodeSpecBeforeAll()
  override def afterAll() = multiNodeSpecAfterAll()
}

object ReliableProxySampleConfig extends MultiNodeConfig {
  val client = role("Client")
  val server = role("Server")
  testTransport(on = true)
}

class ReliableProxySampleSpecMultiJvmNode1 extends ReliableProxySample
class ReliableProxySampleSpecMultiJvmNode2 extends ReliableProxySample

TODO P257

Using a Finite State Machine

Finite-state machine (FSM), also called a state machine, is a common, language-independent modeling technique.

The simplest example of a Finite State Machine is a device whose operation proceeds through several states, transitioning from one to the next as certain events occur.

The simplest example of a Finite State Machine is a device whose operation proceeds through several states, transitioning from one to the next as certain events occur.

fsm

Creating an FSM Model

The inventory Service gets requests for specific books and sends a reply. When the book is in inventory, the order system gets a reply that a book has been reserved. But it is possible that there aren't any books left and that the inventory will have to ask the publisher for more books, before it can service the order.

fms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// State, The super type of all state names
sealed trait State
case object WaitForRequests extends State
case object ProcessRequest extends State
case object WaitForPublisher extends State
case object SoldOut extends State
case object ProcessSoldOut extends State

// StateData, The type of the state data which are tracked by the FSM.
case class StateData(nrBooksInStore:Int,pendingRequests:Seq[BookRequest])


import akka.actor.{Actor, FSM}
class Inventory() extends Actor with FSM[State, StateData] {
  // define the initial state and the initial StateData.
  startWith(WaitForRequests, new StateData(0,Seq()))

  // Declare the transitions for state WaitForRequests
  when(WaitForRequests) {
    // Declare the possible Event when a BookRequest messages occur
    case Event(request:BookRequest, data:StateData) => 
      val newStateData = data.copy(pendingRequests = data.pendingRequests :+ request)
      
      if (newStateData.nrBooksInStore > 0) {
        goto(ProcessRequest) using newStateData
      } else {
        goto(WaitForPublisher) using newStateData
      }
    case Event(PendingRequests, data:StateData) => 
      if (data.pendingRequests.isEmpty) {
        stay
      } else if(data.nrBooksInStore > 0) {
        goto(ProcessRequest)
      } else {
        goto(WaitForPublisher)
      }
  }

  when(WaitForPublisher) {
    case Event(supply:BookSupply, data:StateData) => {
      goto(ProcessRequest) using data.copy(nrBooksInStore = supply.nrBooks)
    }
    case Event(BookSupplySoldOut, _) => {
      goto(ProcessSoldOut)
    }
  }

  when(ProcessRequest) {
    case Event(Done, data:StateData) => {
      goto(WaitForRequests) using data.copy(
          nrBooksInStore = data.nrBooksInStore - 1,
          pendingRequests = data.pendingRequests.tail)
    }
  }

  when(SoldOut) {
    case Event(request:BookRequest, data:StateData) => {
      goto(ProcessSoldOut) using new StateData(0,Seq(request))
    }
  }

  when(ProcessSoldOut) {
    case Event(Done, data:StateData) => {
      goto(SoldOut) using new StateData(0,Seq())
    }
  }

  whenUnhandled {
    // common code for all states
    case Event(request:BookRequest, data:StateData) => {
      // Only update the stateData
      stay using data.copy(pendingRequests = data.pendingRequests :+ request)
    }
    // Log when the event isn't handled
    case Event(e, s) => {
      log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
      stay
    }
  }

  // entry Action of the WaitForRequests state
  onTransition {
    case _ -> WaitForRequests => {
      if (!nextStateData.pendingRequests.isEmpty) {
        // go to next state
        self ! PendingRequests
      }
    }
    
    case _ -> WaitForPublisher => {
      publisher ! PublisherRequest
    }
    
    case _ -> ProcessRequest => {
      val request = nextStateData.pendingRequests.head
      reserveId += 1
      request.target ! new BookReply(request.context, Right(reserveId))
      self ! Done
    }
    
    case _ -> ProcessSoldOut => {
      nextStateData.pendingRequests.foreach(request => {
        request.target ! new BookReply(request.context, Left("SoldOut"))
      })
      self ! Done
    }
  }

}

TESTING THE FSM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Publisher(totalNrBooks: Int, nrBooksPerRequest: Int)
    extends Actor {
  var nrLeft = totalNrBooks
  def receive = {
    case PublisherRequest => {
      if (nrLeft == 0)
      sender ! BookSupplySoldOut
      else {
        val supply = min(nrBooksPerRequest, nrLeft)
        nrLeft -= supply
        sender ! new BookSupply(supply)
      }
    }
  }
}

val publisher = system.actorOf(Props(new Publisher(2,2)))
val inventory = system.actorOf(Props(new Inventory(publisher)))
val stateProbe = TestProbe()

// 订阅状态变化信息
inventory ! new SubscribeTransitionCallBack(stateProbe.ref)
stateProbe.expectMsg(new CurrentState(inventory, WaitForRequests))

inventory ! new BookRequest("context1", replyProbe.ref)
stateProbe.expectMsg(new Transition(inventory, WaitForRequests, WaitForPublisher))
stateProbe.expectMsg(new Transition(inventory, WaitForPublisher, ProcessRequest))
stateProbe.expectMsg(new Transition(inventory, ProcessRequest, WaitForRequests))
replyProbe.expectMsg(new BookReply("context1", Right(1)))

Timers within FSM

When it is in the state 'WaitingForPublisher,' we don't wait forever for the publisher to reply.

1
2
3
4
5
6
7
8
9
10
11
12
13
when(WaitForPublisher, stateTimeout = 5 seconds) {
  case Event(supply:BookSupply, data:StateData) => {
    goto(ProcessRequest) using data.copy(nrBooksInStore = supply.nrBooks)
  }
  case Event(BookSupplySoldOut, _) => {
    goto(ProcessSoldOut)
  }
  // Define the timeout transition
  case Event(StateTimeout,_) => goto(WaitForRequests)
}

// 当测试时
// stateProbe.expectMsg(6 seconds,new Transition(inventory, WaitForPublisher, WaitForRequests))

The timer can also be set by specifying the next state using the method forMax.

1
goto(WaitForPublisher) using (newData) forMax (5 seconds)

Termination of FSM

The FSM has an specific handler for these cases: onTermination. This handler is also a partial function and takes a StopEvent as an argument.

There are three possible reasons this can be received.

  • Normal. This is received when there is a normal termination.
  • Shutdown. This is received when the FSM is stopped due to a shutdown.
  • Failure(cause: Any), This reason is received when the termination was caused by a failure
1
2
3
4
5
onTermination {
  case StopEvent(FSM.Normal, state, data)
  case StopEvent(FSM.Shutdown, state, data)
  case StopEvent(FSM.Failure(cause), state, data)
}

Implement Shared state using agents

Akka accomplishes this by sending actions to the agent for each operation, where the messaging infrastructure will preclude a race condition.

For our example, we need to share the number of copies sold for each book, so we will create an Agent that contains this value.

1
2
3
case class BookStatics(val nameBook: String, nrSold: Int)
// a BookStatics instance is created which is put into a map using the title as the key
case class StateBookStatics(val sequence: Long, books: Map[String, BookStatics])

The state object contained by the agent must be immutable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent
val stateAgent = new Agent(new StateBookStatics(0,Map()))

val currentBookStatics = stateAgent.get // 使用 stateAgent() 效果一样

// 如果 agent 的值是依赖前一个状态的呢?
val newState = StateBookStatics(1, Map(book -> bookStat ))
stateAgent send newState

// 可以这样
val book = "Akka in Action"
val nrSold = 1
stateAgent send( oldState => {
  val bookStat = oldState.books.get(book) match {
    case Some(bookState) =>
      bookState.copy(nrSold = bookState.nrSold + nrSold)
    case None => new BookStatics(book, nrSold)
  }
  oldState.copy(oldState.sequence+1, oldState.books + (book -> bookStat ))
})

Waiting for the state update

In some cases, we need to update shared state and use the new state. For example, we need to know which book is selling the most, and when a book becomes popular, we want to notify the authors.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
implicit val timeout = Timeout(1000)

// It works exactly as the send method only it returns a Future,
// which can be used to wait for the new state.
val future = stateAgent alter( oldState => {
  val bookStat = oldState.books.get(book) match {
    case Some(bookState) =>
      bookState.copy(nrSold = bookState.nrSold + nrSold)
    case None =>
      new BookStatics(book, nrSold)
  }
  oldState.copy(oldState.sequence+1,oldState.books + (book -> bookStat ))
})

val newState = Await.result(future, 1 second)

It is possible that there are multiple changes at nearly the same time and we want the final state or another thread needs the final state and only knows that the process before it may have updated the state.

1
2
val future = stateAgent.future
val newState = Await.result(future, 1 second)
1
2
3
4
5
import scala.concurrent.ExecutionContext.Implicits.global
val agent1 = Agent(3)
// When using this notation, agent2 is a newly created Agent that contains the
// value 4 and agent1 is just the same as before (it still contains the value 3).
val agent2 = agent1 map (_ + 1)
文章目录
  1. 1. Chinnel types
    1. 1.1. Point to Point
    2. 1.2. Publish subscribe
    3. 1.3. CUSTOM EVENTBUS
  2. 2. Specialize Channel
    1. 2.1. Dead letter
  3. 3. Guaranteed delivery
  4. 4. Using a Finite State Machine
    1. 4.1. Creating an FSM Model
    2. 4.2. TESTING THE FSM
    3. 4.3. Timers within FSM
    4. 4.4. Termination of FSM
  5. 5. Implement Shared state using agents
    1. 5.1. Waiting for the state update