Stages
Stage 是 Akka Streams 里的核心概念,它代表了在Graph里的单位,是如何定义输入端口(Inlet) / 输出端口(Outlet) 的;常用的基础形态有:
- Source: 只有一个Outlet
- Sink: 只有一个Inlet
- Flow: 只有一个Outlet和一个Inlet
- Fan-In: 多个Inlet一个Outlet
- Fan-Out: 一个Inlet多个Outlet
- BidiFlow: 双向通道,一对Inlet一对Outlet
有了这些基础件,我们可以按需组合出各种拓扑的图来:
自定义流处理
之前介绍过,Akka Streams 内置支持 back-pressure 特性:可根据上下游 Stage 的处理能力,进行推/拉 速率的流控。同时,Akka Stream 也允许我们通过自定义 GraphStage 进行流处理细节的控制。
GraphStage 的定义如下:
A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a Shape which describes its input and output ports and a factory function that creates a GraphStageLogic which implements the processing logic that ties the ports together.
可以看出,它需要一个Shape来描述 输入(Inlet) 和 输出(Outlet) 端口,以及一个 GraphStageLogic 来实现串联端口的逻辑(数据是如何在端口间流动)
Shape 即 Stage 的表现形式,常用的有:
- SourceShape: 只有一个Outlet
- SinkShape: 只有一个Inlet
- FlowShape: 只有一个Outlet和一个Inlet
- ……
GraphStageLogic 是用于管理端口事件和回调的状态机,通常来说,在不同的Shape里,它根据Inlet和Outlet 的数量不同,按需提供 流输入事件的回调 InHandler 和 流输出事件的回调 OutHandler:
OutHandler
输出端口可用的操作:
push(out,elem)
发送数据到输出端口(当下游进行拉取时可用)complete(out)
关闭输出端口fail(out,exception)
关闭输出端口,并附上错误信息
在 GraphStageLogic 内使用 setHandler(out,handler)
为输出端口注册 OutHandler
实例,可处理如下回调:
onPull()
在输出端口准备好发送下一条数据时被调用,push(out,elem)
此时将可被用于此端口进行数据的推送onDownstreamFinish()
在下游处于被取消状态或不再接收消息时被调用。此后将不会再有onPull()
事件发生。此回调如未被重载,当前Stage将在此时完成
输出端口可用的状态查询方法:
isAvailable(out)
端口是否允许发送数据isClosed(out)
端口是否已关闭(此时不能再推送和拉取)
OutHandler 状态机如图:
InHandler
输入端口可用的操作:
pull(in)
向输入端口请求一条新的数据(当上游推送数据时可用)grab(in)
在onPush()
回调里,获取接收到的数据的内容(不能被重复调用,除非上游再次发送新的数据)cancel(in)
关闭输入端口
在 GraphStageLogic 内使用 setHandler(in,handler)
为输出端口注册 InHandler
实例,可处理如下回调:
onPush()
在输入端口有一条新数据到达时被调用。此时可使用grab(in)
获取数据内容,使用pull(in)
请求下一条数据(流控在此实现)。可以不使用grab(in)
获取 而直接pull(in)
请求新数据,此时旧的数据将从缓冲区被抹掉onUpstreamFinish()
在上游关闭或再无数据可获取时被调用。此后将不再有onPush()
事件发生。此回调如未被重载,当前Stage将在此时完成onUpstreamFailure()
在上游出错时被调用,回调将带着一个异常参数。此后不再有onPush()
事件发生。此回调如未被重载,当前Stage将在此时失败
输入端口可用的状态查询方法:
isAvailable(in)
端口是否有数据可获取hasBeenPulled(in)
端口是否已经pull过了,如果为true, 再调用pull(in)
将会出错isClosed(in)
端口是否已关闭
InHandler 状态机如图:
GraphStage实例
流过滤器Filter
基于GraphStage,我们可以为FlowShpe通道提供筛选函数,实现一个数据流筛选器:
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl.{Sink, Source} import akka.NotUsed import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} object FilterStage_ extends App { // 过滤器实现 class Filter[A, B](f: A => Boolean) extends GraphStage[FlowShape[A, B]] { val in = Inlet[A]("Filter.in") val out = Outlet[B]("Filter.out") override val shape = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler { @scala.throws[Exception](classOf[Exception]) override def onPush(): Unit = { val elem = grab(in) if(f(elem)) { push(out, elem.asInstanceOf[B]) } } }) setHandler(out, new OutHandler { @scala.throws[Exception](classOf[Exception]) override def onPull(): Unit = { pull(in) } }) } } // 调用部分 implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() val flowGraph: Graph[FlowShape[Int, Int], NotUsed] = new Filter[Int, Int]((a:Int) => a < 100) val result = Source.fromIterator(() => Iterator from 5) .via(flowGraph) .to(Sink.foreach(println)) result.run() Thread.sleep(300) system.terminate() } |
输出:
1 2 3 4 5 |
5 6 ... 98 99 |
定时器流控阀
TimerGraphStageLogi
继承自 GraphStageLogic
,它提供了定时器回调接口:
scheduleOnce(key,delay)
下一次定时回调schedulePeriodically(key,period)
周期性的定时回调schedulePeriodicallyWithInitialDelay(key,delay,period)
带初始值的周期性定时回调
定时器会触发onTimer(key)
回调,在其中可置入定时处理逻辑
通过使用这些接口,我们可以轻松实现如 RX.Debounce 一般的流控功能。
实例:迭代50000个数,1毫秒一次进行分组,显示分组的长度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl.{Sink, Source} import akka.stream.stage._ import scala.collection.mutable.ListBuffer import scala.concurrent.Await import scala.concurrent.duration._ object TimedGate_ extends App { // 定时器流控 class TimedGate[A](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[A, List[A]]] { val in = Inlet[A]("TimedGate.In") val out = Outlet[List[A]]("TimedGate.Out") override val shape = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { var open = false var buffer = ListBuffer[A]() setHandler(in, new InHandler { @scala.throws[Exception](classOf[Exception]) override def onPush(): Unit = { val elem = grab(in) buffer += elem if (open) { pull(in) } else { push(out, buffer.toList) buffer = ListBuffer[A]() open = true scheduleOnce(None, silencePeriod) } } override def onUpstreamFinish(): Unit = { if (buffer.nonEmpty) { emit(out, buffer.toList) } complete(out) } }) setHandler(out, new OutHandler { @scala.throws[Exception](classOf[Exception]) override def onPull(): Unit = { pull(in) } }) override def onTimer(timerKey: Any): Unit = { open = false } } } // 调用部分 implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() val result = Source.fromIterator(() => Iterator from 1) .take(50000) .via(new TimedGate[Int](1 millis)) .runWith(Sink.seq) val seqs = Await.result(result, 10 seconds) seqs.foreach(seq => println(seq.length)) system.terminate() } |
结果随机,不列举
蒸汽时代~