引子
最近在学习 Akka,因为Scala的新版本已将Actors迁移到Akka [1](教材就是那本烂到一定程度的 Akka in action,排版字体代码示例一如既往的Manning式的糟糕)——至于为什么要学习Actors,大概是因为 红宝书 里提出了一些对异步编程组合子的设计要求。
Akka
官方定义:
Akka is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM.
做为一个消息驱动的响应式工具集,Akka具有如下特性:
- 高性能:单机吞吐可达 5千万条消息/秒;1G堆内存可容纳250万个Actor
- 简单:通过Actors/Future/EventStream一系列高度抽象,向开发者提供了简单的并发/分布式编程接口
- 系统富于弹性:Akka对于错误的处理哲学是“Let it crash”,Actor生灭并不影响整体
- 集群富于弹性:基于Gossip消息协议,可去中心化运转,CAP方面强调的是最终一致性
- 易于扩展:可与play、slick、spray、Camel、spark 等各路框架无缝集成
同时,借助 Actor model,Akka 为上层屏蔽了线程相关细节,提供了简洁的编程接口。
Actor model
Actor model 最早是由Carl Hewitt在1973定义,由Erlang OTP (Open Telecom Platform) 发扬光大,其消息传递更加符合面向对象的原始意图。Actor model属于并发计算模型 ,通过对Actors并发原语的定义和使用,来避免使用者直接接触多线程并发或线程池等基础概念。
传统的并发模型是基于多线程之间的共享内存,使用同步(锁)方法防止写争夺;而Actors使用的是消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actors,保证了单写原则,从而避免了多线程写争夺。
关于这个Actors,InfoQ上有一个很生动的解释:
你可以将Actors当作是一群人,他们互相之间不会面对面地交流,而只是通过邮件的方式进行沟通。
Actor URI
Actors之间进行通信时,它们通过Actor URI(上图的mailbox) 来进行定位,格式如下:
如akka.tcp://backend@0.0.0.0:2551/user/tooSimple
就是一个合法的Actors URI,它代表的意思是:这个Actor 运行在backend系统,走的是akka.tcp协议管道,本机2551端口,路径为根结点下的tooSimple结点
消息接收决策
Actors在收到消息时,它可以做出以下决策(可多选):
- 发送消息
- 对发送方进行响应
- 进行其他核心操作
这个决策模型,在代码中看起来是这样的(根据消息的不同,会有不同的决策):
1 2 3 4 5 6 7 8 9 10 |
def receive = { case Withdraw(amount) => { if(this.balance > amount) { this.balance -= this.balance } } case Deposit(amount) => { this.balance += amount } } |
Actors 核心操作
- CREATE: 一个Actor可以创建另一个Actor,由于Actors是分层的结构,被创建的Actor将成为创建者的子节点
- SEND: 发送消息,形如
actor ! message
- BECOME:Actor在运行时的行为可以动态的改变(即指定下一次接收消息时,Actor的消息接收决策)
- SUPERVISE: Actor可以监督他的子节点,并决定子结点出错时的处理策略
Actors 的生命周期
一个Actor的生命周期由如下事件组成(重启与否,由Actor的监督者/父结点来决定:
来看一个演示Actors生命周期的代码片断(LifeCycleHook是用于输出生命周期事件的Actor,TestActor是与它交互的Actor):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
import akka.actor.{ActorSystem, Props, ActorLogging, Actor} class LifeCycleHook extends Actor with ActorLogging { println("Constructor") override def preStart() { println("preStart") } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { println("preRestart") super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { println("postRestart") super.postRestart(reason) } override def postStop() { println("postStop") } def receive = { case "restart" => throw new IllegalStateException("force restart") case msg: AnyRef => println("Receive") } } class TestActor extends Actor { def receive = { case _ => println("Received") } } object app extends App { implicit val system = ActorSystem("my-system") val testActor = system.actorOf(Props[TestActor], "Sender") val testActorRef = system.actorOf(Props[LifeCycleHook], "LifeCycleHook") testActorRef ! "restart" Thread.sleep(2000) testActorRef.tell("msg", testActor) system.stop(testActorRef) Thread.sleep(1000) system.stop(testActor) system.shutdown() } |
它的输出:
小结
Actor model里只有消息,只关注和传递消息,没有共享数据结构,Locking-free,这也是Akka能提供高性能计算模型的原因
PS: Akka也有.NET平台的实现 => { http://getakka.net/ } 🙂
Ak47~
@_@
清爽的博客,很喜欢。
最近也在学习akka和scala。有没有好的教材和学习路线。看了半天感觉有点懵
学习路线,你可以联系这个repo的作者:
https://github.com/ReactivePlatform/TheAkkaWay