in FP

Akka Streams: Integrating with Actors

引子

Akka Streams 为异步编程提供了管道和流式接口,但如果我们希望结合 actor 模型的消息机制呢?Akka Streams提供了如下解决方案:

  • Sink.actorRef 产生 ActorRef,做为下游结点接收管道里的流数据
  • Source.actorRef 产生ActorRef,能被下游结点消费
  • Source.ActorPublisher 产生ActorRef,用于实现Reactive Streams中的Publisher
  • Sink.ActorSubscriber 产生ActorRef,用于实现Reactive Streams中的Subscriber

Sink.ActorSubscriber 为例,做为标准下游环节的Sink,它生成的ActorRef 必然是一个消费者,因为结合了消息机制,它非常适合提供可控的并发性

ActorSubscriber

Sink.ActorSubscriber 能够接受的消息类型:

  • ActorSubscriberMessage.OnNext 管道来的新消息
  • ActorSubscriberMessage.OnComplete 管道中的上游已结束
  • ActorSubscriberMessage.OnError 发生错误
  • 以及其他普通消息

任何继承自 Sink.ActorSubscriber 的子类,必须重写 requestStrategy 以提供请求策略去控制流的back pressure。有如下备选策略:

  • WatermarkRequestStrategy 适用于单 actor 处理任务
  • MaxInFlightRequestStrategy 适用于subscriber 内部需要维护一个队列,或需要与其他 actor 交互
  • 还可以自定义 RequestStrategy ,或者使用  ZeroRequestStrategy 策略手动调用request 方法拉取上游数据。

基于 Routing 的负载

Akka 提供路由功能,即 消息能被路由到一到多个actor结点。可选的路由策略有:

  • akka.routing.RoundRobinRoutingLogic
  • akka.routing.RandomRoutingLogic
  • akka.routing.SmallestMailboxRoutingLogic
  • akka.routing.BroadcastRoutingLogic
  • akka.routing.ScatterGatherFirstCompletedRoutingLogic
  • akka.routing.TailChoppingRoutingLogic
  • akka.routing.ConsistentHashingRoutingLogic

选择一个策略,并提供一组 Actor 的引用ActorRefRouteeRouter,从而实现在针对消息的负载均衡(下例将给出基于akka.routing.RoundRobinRoutingLogic的实现

爬虫实例

管道中的生产者指定要抓取的网页地址和任务ID,下游Subscriber将根据自身处理能力的现实情况,制定出一个合理的同时处理任务的队列大小,和同时进行抓取的Actor worker数量:

运行结果

打赏作者
您的支持将激励我继续创作!

您的支持将鼓励我们继续创作!

[微信] 扫描二维码打赏

[支付宝] 扫描二维码打赏

Write a Comment

Comment