in FP

Akka Streams: GraphStage

Stages

Stage 是 Akka Streams 里的核心概念,它代表了在Graph里的单位,是如何定义输入端口(Inlet)  / 输出端口(Outlet) 的;常用的基础形态有:

compose_shapes1

  • Source: 只有一个Outlet
  • Sink: 只有一个Inlet
  • Flow: 只有一个Outlet和一个Inlet
  • Fan-In: 多个Inlet一个Outlet
  • Fan-Out: 一个Inlet多个Outlet
  • BidiFlow: 双向通道,一对Inlet一对Outlet

有了这些基础件,我们可以按需组合出各种拓扑的图来:
runnable_graph1

自定义流处理

之前介绍过,Akka Streams 内置支持 back-pressure 特性:可根据上下游 Stage 的处理能力,进行推/拉 速率的流控。同时,Akka Stream 也允许我们通过自定义 GraphStage 进行流处理细节的控制。

GraphStage 的定义如下:

A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a Shape which describes its input and output ports and a factory function that creates a GraphStageLogic which implements the processing logic that ties the ports together.

可以看出,它需要一个Shape来描述 输入(Inlet) 和 输出(Outlet) 端口,以及一个 GraphStageLogic 来实现串联端口的逻辑(数据是如何在端口间流动)

Shape 即 Stage 的表现形式,常用的有:

  • SourceShape: 只有一个Outlet
  • SinkShape: 只有一个Inlet
  • FlowShape: 只有一个Outlet和一个Inlet
  • ……

GraphStageLogic 是用于管理端口事件和回调的状态机,通常来说,在不同的Shape里,它根据Inlet和Outlet 的数量不同,按需提供 流输入事件的回调 InHandler 和 流输出事件的回调 OutHandler:

OutHandler

输出端口可用的操作:

  • push(out,elem)  发送数据到输出端口(当下游进行拉取时可用)
  • complete(out)  关闭输出端口
  • fail(out,exception) 关闭输出端口,并附上错误信息

在 GraphStageLogic 内使用 setHandler(out,handler)为输出端口注册 OutHandler 实例,可处理如下回调:

  • onPull() 在输出端口准备好发送下一条数据时被调用,push(out,elem) 此时将可被用于此端口进行数据的推送
  • onDownstreamFinish() 在下游处于被取消状态或不再接收消息时被调用。此后将不会再有 onPull() 事件发生。此回调如未被重载,当前Stage将在此时完成

输出端口可用的状态查询方法:

  • isAvailable(out) 端口是否允许发送数据
  • isClosed(out)  端口是否已关闭(此时不能再推送和拉取)

OutHandler 状态机如图:

outport_transitions1

InHandler

输入端口可用的操作:

  • pull(in) 向输入端口请求一条新的数据(当上游推送数据时可用)
  • grab(in) 在 onPush()回调里,获取接收到的数据的内容(不能被重复调用,除非上游再次发送新的数据)
  • cancel(in) 关闭输入端口

在 GraphStageLogic 内使用 setHandler(in,handler)为输出端口注册 InHandler 实例,可处理如下回调:

  • onPush() 在输入端口有一条新数据到达时被调用。此时可使用grab(in) 获取数据内容,使用 pull(in) 请求下一条数据(流控在此实现)。可以不使用grab(in) 获取 而直接 pull(in)  请求新数据,此时旧的数据将从缓冲区被抹掉
  • onUpstreamFinish() 在上游关闭或再无数据可获取时被调用。此后将不再有 onPush() 事件发生。此回调如未被重载,当前Stage将在此时完成
  • onUpstreamFailure() 在上游出错时被调用,回调将带着一个异常参数。此后不再有 onPush() 事件发生。此回调如未被重载,当前Stage将在此时失败

输入端口可用的状态查询方法:

  • isAvailable(in) 端口是否有数据可获取
  • hasBeenPulled(in) 端口是否已经pull过了,如果为true, 再调用 pull(in) 将会出错
  • isClosed(in) 端口是否已关闭

InHandler 状态机如图:

inport_transitions1

GraphStage实例

流过滤器Filter

基于GraphStage,我们可以为FlowShpe通道提供筛选函数,实现一个数据流筛选器:graph_stage_filter1
实现:

输出:

定时器流控阀

TimerGraphStageLogi 继承自 GraphStageLogic,它提供了定时器回调接口:

  • scheduleOnce(key,delay) 下一次定时回调
  • schedulePeriodically(key,period) 周期性的定时回调
  • schedulePeriodicallyWithInitialDelay(key,delay,period) 带初始值的周期性定时回调

定时器会触发onTimer(key)回调,在其中可置入定时处理逻辑

通过使用这些接口,我们可以轻松实现如 RX.Debounce 一般的流控功能。

实例:迭代50000个数,1毫秒一次进行分组,显示分组的长度

结果随机,不列举

打赏作者
您的支持将激励我继续创作!

您的支持将鼓励我们继续创作!

[微信] 扫描二维码打赏

[支付宝] 扫描二维码打赏

Write a Comment

Comment