文章目录
  1. 1. Pipes and Filters
  2. 2. Scatter-Gather Pattern
    1. 2.1. Competing Tasks
  3. 3. Routing Messages
    1. 3.1. RoundRobinRouter
    2. 3.2. SmallestMailboxRouter
    3. 3.3. Configuring a Router
    4. 3.4. Routers and Children
    5. 3.5. Letting the Router Create the Routees
    6. 3.6. Passing the Router Pre-Created Actors
    7. 3.7. The Router and Its Children’s Life Cycles
    8. 3.8. Routers on a Plane
      1. 3.8.1. The Passenger Router
      2. 3.8.2. Using the Passenger Router

One of the immediate implications of Actor based programming is how do we model code that requires collaborators to work together if each unit of work is done in parallel?

Akka allows us to use these design approaches while still making use of its inherent concurrency.

  • integration tools and platforms
  • messaging systems
  • WSO2, and SOA and Web-service based solutions

Pipes and Filters

The concept of piping refers to the ability for one process or thread to pump its results to another processor for additional processing.

In many systems a single event will trigger a sequence of tasks.

It receives the photo and before the event is sent to central processing, a number of checks are done. When no license plate is found in the photo, the system is unable to process the message any further and therefore, it will be discarded. In this example we also discard the message when the speed is below the maximum speed. Which means that only messages that contain the license plate of a speeding vehicle end up getting to the central processor.

Each Filter consists of three parts, the inbound pipe where the message is received, the processor of the message, and finally the outbound pipe where the result of the processing is published.

An important restriction is that each filter must accept and send the same messages, because the outbound pipe of a filter can be the inbound pipe of any other filter in the pattern.

A Pipe with Two Filters Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
case class Photo(license: String, speed: Int)

class SpeedFilter(minSpeed: Int, pipe: ActorRef) extends Actor {

  // Filter all Photos which have a speed lower than the minimal speed
  def receive = {
    case msg: Photo =>
      if (msg.speed > minSpeed) pipe ! msg
  }
}

// Filter all Photos which have an empty license
class LicenseFilter(pipe: ActorRef) extends Actor {
  def receive = {
    case msg: Photo =>
      if (!msg.license.isEmpty) pipe ! msg
  }
}


// Pipe and filter test

val endProbe = TestProbe()
val speedFilterRef = system.actorOf( Props(new SpeedFilter(50, endProbe.ref)))

val licenseFilterRef = system.actorOf(Props(new LicenseFilter(speedFilterRef)))
val msg = new Photo("123xyz", 60)

licenseFilterRef ! msg
endProbe.expectMsg(msg)
licenseFilterRef ! new Photo("", 60)

endProbe.expectNoMsg(1 second)
licenseFilterRef ! new Photo("123xyz", 49)
endProbe.expectNoMsg(1 second)

Scatter-Gather Pattern

The first case is when the tasks are functionally the same, but only one is passed through to the gather component as the chosen result. The second scenario is when work is divided for parallel processing and each processor submits its results which are then combined into a result set by the aggregator.

Competing Tasks

A client buys a product, let's say a book at a web shop, but the shop doesn't have the requested book in stock, so it has to buy the book from a supplier. But the shop is doing business with three different suppliers and wants to pay the lowest price.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
case class PhotoMessage(id: String,
    photo: String,
    creationTime: Option[Date] = None,
    speed: Option[Int] = None)
    
// task  handler
class GetSpeed(pipe:ActorRef) extends Actor {
  def receive = {
    case msg:PhotoMessage => {
      pipe ! msp.copy(speed = ImageProcessing.getSpeed(msg.photo))
    }
  }
}
class GetTime(pipe: ActorRef) extends Actor {
  def receive = {
    case msg: PhotoMessage => {
      pipe ! msg.copy(creationTime = ImageProcessing.getTime(msg.photo))
    }
  }
}

// distribute received message to all processing tasks
class RecipientList(recipientList:Seq[ActorRef]) extends Actor {
  def receive = {
    case msg: AnyRef => recipientList.foreach( _ ! msg)
  }
}

case class TimeoutMessage(msg:PhotoMessage)

// 聚合, 等待两个actor消息, 当一个消息回来时, 接着等待另一个, 加上了定时功能
class Aggregator(timeout:Duration, pipe:ActorRef) extends Actor {
  val messages = new ListBuffer[PhotoMessage]
  implicit val ec = context.system.dispatcher
  // Send all the received messages to our own mailbox

  override def preRestart(reason: Throwable, message: Option[Any]) {
    super.preRestart(reason, message)
    messages.foreach(self ! _)
    messages.clear()
  }
  
  // The first thing when receiving a message,
  // is to check if it is the first message or the second. 
  def receive = {
    case rcvMsg: PhotoMessage => {
      messages.find(_.id == rcvMsg.id) match {
        case Some(alreadyRcvMsg) => {
        
          val newCombinedMsg = new PhotoMessage(
            rcvMsg.id,
            rcvMsg.photo,
            rcvMsg.creationTime.orElse(alreadyRcvMsg.creationTime),
            rcvMsg.speed.orElse(alreadyRcvMsg.speed) )
            
          pipe ! newCombinedMsg
          //cleanup message
          messages -= alreadyRcvMsg
        }
        case None =>{
          messages += rcvMsg
          // 如果在规定的时间内没有收到信息
          context.system.scheduler.scheduleOnce(
            timeout,
            self,
            new TimeoutMessage(rcvMsg))
        }
    }
    case TimeoutMessage(rcvMsg) => {
      messages.find(_.id == rcvMsg.id) match {
        case Some(alreadyRcvMsg) => {
          pipe ! alreadyRcvMsg
          messages -= alreadyRcvMsg
        }
        case None => //message is already processed
    }
    case ex: Exception => throw ex
  }
}

Test Aggregator missing a message

1
2
3
4
5
6
7
8
9
10
11
val endProbe = TestProbe()
val actorRef = system.actorOf(Props(new Aggregator(1 second, endProbe.ref)))
val photoStr = ImageProcessing.createPhotoString(new Date(), 60)
val msg1 = PhotoMessage("id1", photoStr, Some(new Date()), None)
actorRef ! msg1
actorRef ! new IllegalStateException("restart")
val msg2 = PhotoMessage("id1",photoStr, None, Some(60))
actorRef ! msg2

val combinedMsg = PhotoMessage("id1", photoStr, msg1.creationTime, msg2.speed)
endProbe.expectMsg(combinedMsg)

Routing Messages

Akka’s routing feature lets you alter a message’s path from sender to receiver.

Akka provides a set of standard routers that implement routing patterns that frequently show up in those everyday problems.

RoundRobinRouter

The RoundRobinRouter sends messages to the Actors for which it fronts in a round-robin fashion.

SmallestMailboxRouter

When a message comes into the Router, it decides to route the message to the composed Actor whose Mailbox is the smallest.

The SmallestMailboxRouter is a pretty good choice when it comes to balancing load among your composed Actors.

Configuring a Router

The configurations are specified in the akka.actor.deployment block. Be- low is a simple configuration of a RoundRobinRouter, which would appear in your application.conf file:

1
2
3
4
5
6
7
8
9
10
akka {
  actor {
    deployment {
      /DatabaseConnectionRouter {
        router = "round-robin"
        nr-of-instances = 20
      }
    }
  }
}
1
2
3
4
5
import akka.routing.FromConfig
class DBConnection extends Actor { ...
}
val dbRouter = system.actorOf(Props[DBConnection].withRouter(FromConfig(),
                              "DatabaseConnectionRouter"), "DBRouter")

The dbRouter will be an instance of a RoundRobinRouter that will rep- resent the database connection, and it will also be the parent of 20 instances of the DBConnection Actor.

Routers and Children

Routers route to routees. You can create those routees dynamically by the Router, or you can assign them to it from an already created set.

These two different methods of assigning routees have an impact on the relationship and supervision of those routees.

Letting the Router Create the Routees

Advantages

  • The Router handles the Supervision.
  • It works well with configuration.

Disadvantages

  • You’ll have a difficult time constructing anything but a single type of Actor.
  • You can’t name them the way you might like.

Passing the Router Pre-Created Actors

  • You have to have parents for them.
  • It’s not as flexible from a configuration perspective.

The Router and Its Children’s Life Cycles

When the Router creates the routees, they are its children, which means that the Router must manage their life cycles. The Router will assign a supervisorStrategy that always escalates the decision to its parent.

1
2
3
4
5
6
7
8
9
10
11
val dbRouter = system.actorOf(Props.empty.withRouter(RoundRobinRouter(
    nrOfInstances = 5,
    supervisorStrategy = OneForOneStrategy {
    // define your Decider here
    }), "DBRouter")

// you can assign that easily enough:
val dbRouter = system.actorOf(Props.empty.withRouter(RoundRobinRouter(
    nrOfInstances = 5,
    supervisorStrategy = SupervisorStrategy.defaultStrategy
    ), "DBRouter")

Routers on a Plane

A BroadcastRouter would make the perfect component for allowing the passengers to receive important information, such as “Fasten Seat Belts,”.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
zzz.akka.avionics {
  passengers = [
    [ "Kelly Franqui", "01", "A" ],
    [ "Tyrone Dotts", "02", "B" ],
    [ "Malinda Class", "03", "C" ],
    [ "Kenya Jolicoeur", "04", "A" ],
    [ "Christian Piche", "10", "B" ],
    [ "Neva Delapena", "11", "C" ],
    [ "Alana Berrier", "12", "A" ],
    [ "Malinda Heister", "13", "B" ],
    [ "Carlene Heiney", "14", "C" ],
    [ "Erik Dannenberg", "15", "A" ],
    [ "Jamie Karlin", "20", "B" ],
    [ "Julianne Schroth", "21", "C" ],
    [ "Elinor Boris", "22", "A" ],
    [ "Louisa Mikels", "30", "B" ],
    [ "Jessie Pillar", "31", "C" ],
    [ "Darcy Goudreau", "32", "A" ],
    [ "Harriett Isenhour", "33", "B" ],
    [ "Odessa Maury", "34", "C" ],
    [ "Malinda Hiett", "40", "A" ],
    [ "Darcy Syed", "41", "B" ],
    [ "Julio Dismukes", "42", "C" ],
    [ "Jessie Altschuler", "43", "A" ],
    [ "Tyrone Ericsson", "44", "B" ],
    [ "Mallory Dedrick", "50", "C" ],
    [ "Javier Broder", "51", "A" ],
    [ "Alejandra Fritzler", "52", "B" ],
    [ "Rae Mcaleer", "53", "C" ]
  ]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
object Passenger {
  // These are notifications that tell the Passenger
  // to fasten or unfasten their seat belts
  case object FastenSeatbelts
  case object UnfastenSeatbelts
  // Regular expression to extract Name-Row-Seat tuple
  val SeatAssignment = """([\w\s_]+)-(\d+)-([A-Z])""".r
}

// The DrinkRequestProbability trait defines some
// thresholds that we can modify in tests to
// speed things up.
trait DrinkRequestProbability {
  // Limits the decision on whether the passenger
  // actually asks for a drink
  val askThreshold = 0.9f
  
  // The minimum time between drink requests
  val requestMin = 20.minutes
  
  // Some portion of this (0 to 100
  // to requestMin
  val requestUpper = 30.minutes
  // Gives us a 'random' time within the previous
  // two bounds
  def randomishTime(): Duration = {
    requestMin + scala.util.Random.nextInt(requestUpper.toMillis.toInt).millis
  }
}

// The idea behind the PassengerProvider is old news at this point.
// We can use it in other classes to give us the ability to slide
// in different Actor types to ease testing.
trait PassengerProvider {
  def newPassenger(callButton: ActorRef): Actor =
    new Passenger(callButton) with DrinkRequestProbability
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class Passenger(callButton: ActorRef) extends Actor with ActorLogging {
  this: DrinkRequestProbability =>
  import Passenger._
  import FlightAttendant.{GetDrink, Drink}
  import scala.collection.JavaConverters._
  
  // We'll be adding some randomness to our Passenger,
  // and this shortcut will make things a little more
  // readable.
  val r = scala.util.Random
  
  // It's about time that someone actually asked for a
  // drink since our Flight Attendants have been coded
  // to serve them up
  case object CallForDrink
  
  // The name of the Passenger can't have spaces in it,
  // since that's not a valid character in the URI
  // spec. We know the name will have underscores in
  // place of spaces, and we'll convert those back
  // here.
  val SeatAssignment(myname, _, _) =
    self.path.name.replaceAllLiterally("_", " ")
    
  // We'll be pulling some drink names from the
  // configuration file as well
  val drinks = context.system.settings.config.getStringList("zzz.akka.avionics.drinks").asScala.toIndexedSeq
  
  // A shortcut for the scheduler to make things look
  // nicer later
  val scheduler = context.system.scheduler
  
  // We've just sat down, so it's time to get a drink
  override def preStart() {
    self ! CallForDrink
  }

  // This method will decide whether or not we actually
  // want to get a drink using some randomness to
  // decide
  def maybeSendDrinkRequest(): Unit = {
    if (r.nextFloat() > askThreshold) {
      val drinkname = drinks(r.nextInt(drinks.length))
      callButton ! GetDrink(drinkname)
    }
    scheduler.scheduleOnce(randomishTime(), self, CallForDrink)
  }

  // Standard message handler
  def receive = {
    case CallForDrink =>
      maybeSendDrinkRequest()
    case Drink(drinkname) =>
      log.info("{} received a {} - Yum", myname, drinkname)
    case FastenSeatbelts =>
      log.info("{} fastening seatbelt", myname)
    case UnfastenSeatbelts =>
      log.info("{} UNfastening seatbelt", myname)
  }
}

Testing and the Event Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
trait TestDrinkRequestProbability extends DrinkRequestProbability {
  override val askThreshold = 0f
  override val requestMin = 0.milliseconds
  override val requestUpper = 2.milliseconds
}

class PassengersSpec extends TestKit(ActorSystem()) with ImplicitSender {
  import akka.event.Logging.Info
  import akka.testkit.TestProbe
  var seatNumber = 9
  
  def newPassenger(): ActorRef = {
    seatNumber += 1
    system.actorOf(Props(new Passenger(testActor) with TestDrinkRequestProbability), s"Pat_Metheny-$seatNumber-B")
  }
  
  "Passengers" should {
    "fasten seatbelts when asked" in {
      val a = newPassenger()
      val p = TestProbe()
      // This says that we want the TestProbe’s ActorRef (p.ref) to be the handle
      // to the subscribed Actor, and that we want it to receive events that match the
      // class akka.event.Logger.Info. 
      system.eventStream.subscribe(p.ref, classOf[Info])
      
      a ! FastenSeatbelts
      p.expectMsgPF() {
        case Info(_, _, m) =>
          m.toString must include ("fastening seatbelt")
      }
    }
  }
}

The Passenger Router

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
object PassengerSupervisor {
  // Allows someone to request the BroadcastRouter
  case object GetPassengerBroadcaster
  // Returns the BroadcastRouter to the requestor
  case class PassengerBroadcaster(broadcaster: ActorRef)
  // Factory method for easy construction
  def apply(callButton: ActorRef) = new PassengerSupervisor(callButton)
        with PassengerProvider
}

class PassengerSupervisor(callButton: ActorRef) extends Actor {
  this: PassengerProvider =>
  import PassengerSupervisor._
  // We'll resume our immediate children instead of restarting them
  // on an Exception
  override val supervisorStrategy = OneForOneStrategy() {
    case _: ActorKilledException => Escalate
    case _: ActorInitializationException => Escalate
    case _ => Resume
  }
  
  // Internal messages we use to communicate between this Actor
  // and its subordinate IsolatedStopSupervisor
  case class GetChildren(forSomeone: ActorRef)

  case class Children(children: Iterable[ActorRef], childrenFor: ActorRef)
  
  // We use preStart() to create our IsolatedStopSupervisor
  override def preStart() {
    context.actorOf(Props(new Actor {
      val config = context.system.settings.config
      
      override val supervisorStrategy = OneForOneStrategy() {
        case _: ActorKilledException => Escalate
        case _: ActorInitializationException => Escalate
        case _ => Stop
      }
      override def preStart() {
        import scala.collection.JavaConverters._
        import com.typesafe.config.ConfigList
        // Get our passenger names from the configuration
        val passengers = config.getList("zzz.akka.avionics.passengers")
        
        // Iterate through them to create the passenger children
        passengers.asScala.foreach { nameWithSeat =>
          val id = nameWithSeat.asInstanceOf[ConfigList].unwrapped().asScala.mkString("-").replaceAllLiterally(" ", "_")
          // Convert spaces to underscores to comply with URI standard
          context.actorOf(Props(newPassenger(callButton)), id)
        }
      }
    
      override def receive = {
        case GetChildren(forSomeone: ActorRef) =>
          sender ! Children(context.children, forSomeone)
      }
    }), "PassengersSupervisor")
  }

  def receive = noRouter

  // TODO: This noRouter method could be made simpler by using a Future.
  // We'll have to refactor this later.
  def noRouter: Receive = {
    case GetPassengerBroadcaster =>
      context.actorFor("PassengersSupervisor") ! GetChildren(sender)
    case Children(passengers, destinedFor) =>
      val router = context.actorOf(Props().withRouter(
        BroadcastRouter(passengers.toSeq)), "Passengers")
        
      destinedFor ! PassengerBroadcaster(router)
      context.become(withRouter(router))
  }
  
  def withRouter(router: ActorRef): Receive = {
    case GetPassengerBroadcaster =>
      sender ! PassengerBroadcaster(router)
  }

}

Using the Passenger Router

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package zzz.akka.avionics

import akka.actor.{ActorSystem, Actor, ActorRef, Props}
import akka.testkit.{TestKit, ImplicitSender}
import scala.concurrent.util.duration._
import com.typesafe.config.ConfigFactory
import org.scalatest.{WordSpec, BeforeAndAfterAll}
import org.scalatest.matchers.MustMatchers


// ActorSystem so we have a known quantity we can test with
object PassengerSupervisorSpec {
  val config = ConfigFactory.parseString("""
    zzz.akka.avionics.passengers = [
      [ "Kelly Franqui", "23", "A" ],
      [ "Tyrone Dotts", "23", "B" ],
      [ "Malinda Class", "23", "C" ],
      [ "Kenya Jolicoeur", "24", "A" ],
      [ "Christian Piche", "24", "B" ]
    ]
  """)
}

// We don't want to work with "real" passengers.
 This mock
// passenger will be much easier to verify things with
trait TestPassengerProvider extends PassengerProvider {
  override def newPassenger(callButton: ActorRef): Actor =
    new Actor {
      def receive = {
        case m => callButton ! m
      }
    }
}

// The Test class injects the configuration into the
// ActorSystem
class PassengerSupervisorSpec extends
    TestKit(ActorSystem("PassengerSupervisorSpec", PassengerSupervisorSpec.config))
        with ImplicitSender
        with WordSpec
        with BeforeAndAfterAll
        with MustMatchers {
  import PassengerSupervisor._
  // Clean up the system when all the tests are done
  override def afterAll() {
    system.shutdown()
  }

  "PassengerSupervisor" should {
    "work" in {
      // Get our SUT
      val a = system.actorOf(Props(new PassengerSupervisor(testActor) with TestPassengerProvider))
      
      // Grab the BroadcastRouter
      a ! GetPassengerBroadcaster
      val broadcaster = expectMsgPF() {
        case PassengerBroadcaster(b) =>
          // Exercise the BroadcastRouter
          b ! "Hithere"
          
          // All 5 passengers should say "Hithere"
          expectMsg("Hithere")
          expectMsg("Hithere")
          expectMsg("Hithere")
          expectMsg("Hithere")
          expectMsg("Hithere")
          
          // And then nothing else!
          expectNoMsg(100.milliseconds)
          // Return the BroadcastRouter
          b
      }
      
      // Ensure that the cache works
      a ! GetPassengerBroadcaster
      expectMsg(PassengerBroadcaster(`broadcaster`))
    }
  }
}

TODO

文章目录
  1. 1. Pipes and Filters
  2. 2. Scatter-Gather Pattern
    1. 2.1. Competing Tasks
  3. 3. Routing Messages
    1. 3.1. RoundRobinRouter
    2. 3.2. SmallestMailboxRouter
    3. 3.3. Configuring a Router
    4. 3.4. Routers and Children
    5. 3.5. Letting the Router Create the Routees
    6. 3.6. Passing the Router Pre-Created Actors
    7. 3.7. The Router and Its Children’s Life Cycles
    8. 3.8. Routers on a Plane
      1. 3.8.1. The Passenger Router
      2. 3.8.2. Using the Passenger Router