Reactive Streams
reactive streams 是一个异步流标准,Akka项目是这个标准最早的实现者之一。
标准定义了两个通道:
- publisher为上游生产者通道
- subscriber为下游消费者通道
标准要求这两个通道间的通信是异步非阻塞的,并且是基于反压模式(back pressure)的——数据是从publisher流向subscriber的,但数据一次发送多少个elements是由subscriber通过request(n)来告诉上游的,这使得流可控。同时,这也使得流在处理过程中的合并与拆分成为可能:
akka-streams 内置了对 back-pressure 特性的支持,根据上下游的快慢不同,对数据流进行动态的 推/拉 策略调节(解决了生产者快消费者慢问题)。
一些概念
- stream:数据的移动和转换的一系列活动过程
- element: 流里的最小数据计量单位,也是akka streams里通用单位;诸如缓冲区大小就是指的element的个数
- graph: 数据流处理的拓扑,定义了在流运行时elements流动的路径
- processing stage: 所有用来构建graph的单元块的统称。比如流式api 中的
map(), filter()
…
一些抽象
linear pipelines有这么一些关于拓扑的抽象:
- Source: 只有一个出口的processing stage,在下游processing stage准备好接收数据时进行数据发送
- Sink: 只有一个入口的processing stage,它可以控制上游的速率
- Flow: 可以接入一个入口和一个出口的processing stage,可对流过的数据进行转换处理
- RunnableGraph: 一个已经附加Source和Sink的Flow,它有一个
run()
方法可以执行管道处理,并获取Sink输出(一个关于类型T的具体化实例)
linear pipelines?
来看一个 从Source 通过 Flow 最终到 Sink 一条龙的管道:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Sink, Source} object WireUp extends App { implicit val system = ActorSystem("actor-system") implicit val materializer = ActorMaterializer() Source(0 to 25) .via(Flow[Int].map(i => (i + 'A').asInstanceOf[Char])) .to(Sink.foreach(println(_))).run() system.terminate() } |
很简单的一段代码,通过流的方式将数字序列,转换成字母序列(隐式的 ActorMaterializer
为整个流程run()
起来所必需,最后的termiate()
相当于这个例子的teardown
一个实例
来看另一个略为复杂些的例子:有100个编号随机的顾客涌入,我们用管道来接待他们
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Sink, Source} import scala.util.Random object InputCustomer { def random(): InputCustomer = { InputCustomer(s"FirstName${Random.nextInt(1000)} " + s"LastName${Random.nextInt(1000)}") } } case class InputCustomer(name: String) case class OutputCustomer(firstName: String, lastName: String) object CustomersExample extends App { implicit val actorSystem = ActorSystem() import actorSystem.dispatcher implicit val materializer = ActorMaterializer() val inputCustomers = Source((1 to 100).map(_ => InputCustomer.random())) val normalize = Flow[InputCustomer] .map(c => c.name.split(" ").toList) .collect { case firstName :: lastName :: Nil => OutputCustomer(firstName, lastName) } val writeCustomers = Sink.foreach { println} // η-conversion inputCustomers.via(normalize).runWith(writeCustomers).andThen { case _ => actorSystem.terminate() } } |
可以看到,顾客进入时,via
了一个normalize 中间件进行编号处理,然后runWith
传递给println打印(一个η-规约,今天现学的 🙂 ),最终进到teardown
打赏作者
您的支持将激励我继续创作!
η-变换
前两条规则之后,还可以加入第三条规则,eta-变换,来形成一个新的等价关系。Eta-变换表达的是外延性的概念,在这里外延性指的是,两个函数对于所有的参数得到的结果都一致,当且仅当它们是同一个函数。Eta-变换可以令λx .f x和f相互转换,只要x不是f中的自由出现。下面说明了为何这条规则和外延性是等价的:
若f与g外延地等价,即,f a == g a对所有的lambda表达式a成立,则当取a为在f中不是自由出现的变量x时,我们有f x == g x,因此λx .f x == λx .g x,由eta-变换f == g。所以只要eta-变换是有效的,会得到外延性也是有效的。
相反地,若外延性是有效的,则由beta-归约,对所有的y有(λx .f x) y == f y,可得λx .f x == f,即eta-变换也是有效的
Reffer: https://zh.wikipedia.org/wiki/%CE%9B%E6%BC%94%E7%AE%97
最近在看这本:Lambda-Calculus and Combinators
https://book.douban.com/subject/4323391/