引子
Akka Streams 为异步编程提供了管道和流式接口,但如果我们希望结合 actor 模型的消息机制呢?Akka Streams提供了如下解决方案:
Sink.actorRef
产生 ActorRef,做为下游结点接收管道里的流数据Source.actorRef
产生ActorRef,能被下游结点消费Source.ActorPublisher
产生ActorRef,用于实现Reactive Streams中的PublisherSink.ActorSubscriber
产生ActorRef,用于实现Reactive Streams中的Subscriber
以Sink.ActorSubscriber
为例,做为标准下游环节的Sink,它生成的ActorRef 必然是一个消费者,因为结合了消息机制,它非常适合提供可控的并发性
ActorSubscriber
Sink.ActorSubscriber
能够接受的消息类型:
ActorSubscriberMessage.OnNext
管道来的新消息ActorSubscriberMessage.OnComplete
管道中的上游已结束ActorSubscriberMessage.OnError
发生错误- 以及其他普通消息
任何继承自 Sink.ActorSubscriber
的子类,必须重写 requestStrategy
以提供请求策略去控制流的back pressure。有如下备选策略:
WatermarkRequestStrategy
适用于单 actor 处理任务MaxInFlightRequestStrategy
适用于subscriber 内部需要维护一个队列,或需要与其他 actor 交互- 还可以自定义
RequestStrategy
,或者使用ZeroRequestStrategy
策略手动调用request
方法拉取上游数据。
基于 Routing 的负载
Akka 提供路由功能,即 消息能被路由到一到多个actor结点。可选的路由策略有:
akka.routing.RoundRobinRoutingLogic
akka.routing.RandomRoutingLogic
akka.routing.SmallestMailboxRoutingLogic
akka.routing.BroadcastRoutingLogic
akka.routing.ScatterGatherFirstCompletedRoutingLogic
akka.routing.TailChoppingRoutingLogic
akka.routing.ConsistentHashingRoutingLogic
选择一个策略,并提供一组 Actor 的引用ActorRefRoutee
给 Router
,从而实现在针对消息的负载均衡(下例将给出基于akka.routing.RoundRobinRoutingLogic
的实现
爬虫实例
管道中的生产者指定要抓取的网页地址和任务ID,下游Subscriber将根据自身处理能力的现实情况,制定出一个合理的同时处理任务的队列大小,和同时进行抓取的Actor worker数量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
import akka.actor.{Actor, ActorSystem, Props} import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router} import akka.stream.ActorMaterializer import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy} import akka.stream.actor.ActorSubscriberMessage._ import akka.stream.scaladsl.{Sink, Source} object StreamingActorCrawler_ extends App { implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() object Messages { case class Request(id: Int, url: String) case class Response(id: Int, result: String) case class Crawl(id: Int, url: String) def get(url: String): String = scala.io.Source.fromURL(url)("utf-8").mkString } class Worker extends Actor { import Messages._ override def receive: Receive = { case Crawl(id, url) => val result = Messages.get(url) sender() ! Response(id, result) } } class WorkerPool extends ActorSubscriber { import Messages._ val MaxQueueSize = 18 // 队列大小 var queue = Map.empty[Int, String] val router = { val routees = Vector.fill(36) { // worker数量 ActorRefRoutee(context.actorOf(Props[Worker])) } Router(RoundRobinRoutingLogic(), routees) } override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxQueueSize) { override def inFlightInternally: Int = queue.size } def receive: Receive = { case OnNext(Request(id, url)) => assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}") queue += (id -> url) router.route(Crawl(id, url), self) // 路由 case OnComplete => Thread.sleep(500) system.terminate() case OnError(e) => println(s"error: $e") system.terminate() case Response(id, result) => queue -= id println(s"[$id] DONE, HTML content length: ${result.length}") } } val N = 255 // 总任务数 val r = Source(1 to N) .map(Messages.Request(_, "http://example.org")) .runWith(Sink.actorSubscriber(Props(new WorkerPool))) } |
运行结果
1 2 3 4 5 6 7 8 9 10 11 |
[1] DONE, HTML content length: 1270 [4] DONE, HTML content length: 1270 [3] DONE, HTML content length: 1270 [13] DONE, HTML content length: 1270 [16] DONE, HTML content length: 1270 ... ... ... [255] DONE, HTML content length: 1270 [254] DONE, HTML content length: 1270 [253] DONE, HTML content length: 1270 |
打赏作者
您的支持将激励我继续创作!
Sink.actorRef这个方法,不应该是接收一个actorRef实例吗,而不是返回一个actorRef