Stages
Stage 是 Akka Streams 里的核心概念,它代表了在Graph里的单位,是如何定义输入端口(Inlet) / 输出端口(Outlet) 的;常用的基础形态有:
- Source: 只有一个Outlet
- Sink: 只有一个Inlet
- Flow: 只有一个Outlet和一个Inlet
- Fan-In: 多个Inlet一个Outlet
- Fan-Out: 一个Inlet多个Outlet
- BidiFlow: 双向通道,一对Inlet一对Outlet
有了这些基础件,我们可以按需组合出各种拓扑的图来:
自定义流处理
之前介绍过,Akka Streams 内置支持 back-pressure 特性:可根据上下游 Stage 的处理能力,进行推/拉 速率的流控。同时,Akka Stream 也允许我们通过自定义 GraphStage 进行流处理细节的控制。
GraphStage 的定义如下:
A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a Shape which describes its input and output ports and a factory function that creates a GraphStageLogic which implements the processing logic that ties the ports together.
可以看出,它需要一个Shape来描述 输入(Inlet) 和 输出(Outlet) 端口,以及一个 GraphStageLogic 来实现串联端口的逻辑(数据是如何在端口间流动)
Shape 即 Stage 的表现形式,常用的有:
- SourceShape: 只有一个Outlet
- SinkShape: 只有一个Inlet
- FlowShape: 只有一个Outlet和一个Inlet
- ……
GraphStageLogic 是用于管理端口事件和回调的状态机,通常来说,在不同的Shape里,它根据Inlet和Outlet 的数量不同,按需提供 流输入事件的回调 InHandler 和 流输出事件的回调 OutHandler:
OutHandler
输出端口可用的操作:
push(out,elem)
发送数据到输出端口(当下游进行拉取时可用)complete(out)
关闭输出端口fail(out,exception)
关闭输出端口,并附上错误信息
在 GraphStageLogic 内使用 setHandler(out,handler)
为输出端口注册 OutHandler
实例,可处理如下回调:
onPull()
在输出端口准备好发送下一条数据时被调用,push(out,elem)
此时将可被用于此端口进行数据的推送onDownstreamFinish()
在下游处于被取消状态或不再接收消息时被调用。此后将不会再有onPull()
事件发生。此回调如未被重载,当前Stage将在此时完成
输出端口可用的状态查询方法:
isAvailable(out)
端口是否允许发送数据isClosed(out)
端口是否已关闭(此时不能再推送和拉取)
OutHandler 状态机如图:
InHandler
输入端口可用的操作:
pull(in)
向输入端口请求一条新的数据(当上游推送数据时可用)grab(in)
在onPush()
回调里,获取接收到的数据的内容(不能被重复调用,除非上游再次发送新的数据)cancel(in)
关闭输入端口
在 GraphStageLogic 内使用 setHandler(in,handler)
为输出端口注册 InHandler
实例,可处理如下回调:
onPush()
在输入端口有一条新数据到达时被调用。此时可使用grab(in)
获取数据内容,使用pull(in)
请求下一条数据(流控在此实现)。可以不使用grab(in)
获取 而直接pull(in)
请求新数据,此时旧的数据将从缓冲区被抹掉onUpstreamFinish()
在上游关闭或再无数据可获取时被调用。此后将不再有onPush()
事件发生。此回调如未被重载,当前Stage将在此时完成onUpstreamFailure()
在上游出错时被调用,回调将带着一个异常参数。此后不再有onPush()
事件发生。此回调如未被重载,当前Stage将在此时失败
输入端口可用的状态查询方法:
isAvailable(in)
端口是否有数据可获取hasBeenPulled(in)
端口是否已经pull过了,如果为true, 再调用pull(in)
将会出错isClosed(in)
端口是否已关闭
InHandler 状态机如图:
GraphStage实例
流过滤器Filter
基于GraphStage,我们可以为FlowShpe通道提供筛选函数,实现一个数据流筛选器:
实现:
输出:
定时器流控阀
TimerGraphStageLogi
继承自 GraphStageLogic
,它提供了定时器回调接口:
scheduleOnce(key,delay)
下一次定时回调schedulePeriodically(key,period)
周期性的定时回调schedulePeriodicallyWithInitialDelay(key,delay,period)
带初始值的周期性定时回调
定时器会触发onTimer(key)
回调,在其中可置入定时处理逻辑
通过使用这些接口,我们可以轻松实现如 RX.Debounce 一般的流控功能。
实例:迭代50000个数,1毫秒一次进行分组,显示分组的长度
结果随机,不列举
蒸汽时代~