引子
准确来说,IO API 不算是 Akka Streams的内容,但它却是 Streaming IO (TCP) 的基础。
IO API 编程的第一步,是获取一个管理对象的引用。比如我们想针对TCP编程,可以这么获取 TCP Manager:
1 2 3 |
import akka.io.{ IO, Tcp } val manager = IO(Tcp) |
manager 是一个 actor 对象,用于处理底层 I/O资源和操作(诸如channels/selectors,或是workers的初始化),同时对于上层提供基于actor 模型的抽象
IO TCP
IO TCP 编程也是 CS 模型的,好处是不需要写繁琐的 SOCKET 逻辑——当然,如果你乐意,也可以掺合这些 SOCKET 事务,比如:在客户端进行连接时,指定SOCKET的SO_NODELAY
( windows里叫TCP_NODELAY
)选项,以启用 Nagle’s algorithm(Akka里默认是禁用):
1 2 |
IO(Tcp) ! Connect(remote, options = Vector(SO.TcpNoDelay(false))) |
在 IO TCP编程中,无论是客户端还是服务端,都是一个actor对象,通过它与manger对象及对端的连接进行消息上的交互:
客户端
客户端流程:
- 连接到服务器,即向 manger 发送
Connect
消息 - 接收消息:
CommandFailed
意味着连接失败;Connected
代表了一个成功的连接 Connected
消息的sender()
就代表了与服务器建立TCP连接的代理对象,它亦是一个 ActorRef,我们可以向它Register(handler)
消息,以指定处理它消息的handler(还是一个ActorRef)- 利用hander,可以向对端的服务器发送消息,即写入
ByteString
到SOCKET以抵达服务器,写入消息,是特定的Tcp.Write
、Tcp.WriteFile
、Tcp.CompoundWrite
消息,分别对应了:写入ByteString
、写入文件以及混合写入多种消息 - 同时,如果在
Tcp.Write
消息中指定了ACK事件参数,那么在发送成功后,handler将会接受到ACK
消息
服务端
服务端流程:
- 绑定到服务器,即向 manger 发送
Bind
消息 - 接收消息:
CommandFailed
代表绑定失败;Bound
代表绑定成功;Connected
代表了一个从客户端发来的连接 Connected
消息的sender()
就代表了与客户端建立TCP连接的代理对象,它亦是一个 ActorRef,我们可以向它Register(handler)
消息,以指定处理它消息的handler……- 利用hander,可以向对端的服务器发送消息,即写入
ByteString
到SOCKET以抵达服务器,写入消息,是特定的Tcp.Write
、Tcp.WriteFile
、Tcp.CompoundWrite
消息,分别对应了:写入ByteString
、写入文件以及混合写入多种消息 - 同时,如果在
Tcp.Write
消息中指定了ACK事件参数,那么在发送成功后,handler将会接受到ACK
消息
除了绑定等前置步骤,代理对象的处理和客户端是一样的……
TCP CS 模型实例
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
import java.net.InetSocketAddress import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.io.Tcp._ import akka.io.{IO, Tcp} import akka.stream.ActorMaterializer import akka.util.ByteString object TcpClient_ extends App { class Client(remote: InetSocketAddress, handler: ActorRef) extends Actor { override def preStart(): Unit = { IO(Tcp) ! Connect(remote, options = Vector(SO.TcpNoDelay(false))) } def receive: Receive = { case CommandFailed(_: Connect) => handler ! "connect failed" context stop self case c@Connected(remote, local) => handler ! c val connection = sender() connection ! Register(self) context become { case data: ByteString => connection ! Write(data) case CommandFailed(w: Write) => handler ! "write failed" case Received(data) => handler ! data case "close" => connection ! Close case _: ConnectionClosed => handler ! "connection closed" context stop self } } } class ClientHandler extends Actor { def receive: Receive = { case c@Connected(remote, local) => println(s"Connnected -> remote: $remote, local: $local") sender() ! ByteString("hello") case b: ByteString => println(b.utf8String) case f@("connect failed" | "connection closed" | "write failed") => println(f) } } object Client { def generate = system.actorOf(Props(classOf[Client], serverAddress, system.actorOf(Props[ClientHandler]))) } // 调用部分 implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() val serverAddress = new InetSocketAddress("localhost", 8000) val client = Client.generate Thread.sleep(1000) val client2 = Client.generate } |
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
import java.net.InetSocketAddress import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.io.{IO, Tcp} import Tcp._ import akka.stream.ActorMaterializer import akka.util.ByteString object TcpServer_ extends App { case object Ack extends Event var connections = Map.empty[String, ActorRef] def send(msg: String): Unit = { for (c <- connections) { c._2 ! msg } } class Server(serverAddress: InetSocketAddress) extends Actor { override def preStart(): Unit = { IO(Tcp) ! Bind(self, serverAddress) } override def postRestart(thr: Throwable): Unit = context stop self def receive: Receive = { case Bound(localAddress) => println(s"Server is bound to ${localAddress}") case CommandFailed(_: Bind) => context stop self case Connected(remote, local) => val connection = sender() val handler = ConnectionHandler.instanceOf(connection) connection ! Register(handler) connections += connection.path.toSerializationFormat -> handler } } object ConnectionHandler { def instanceOf(connection: ActorRef) = system.actorOf(Props(classOf[ConnectionHandler], connection)) } class ConnectionHandler(connection: ActorRef) extends Actor { def receive: Receive = { case Received(data) => println(data.utf8String) case PeerClosed => context stop self case msg: String => connection ! Write(ByteString( s"${msg} to: ${connection.path.toSerializationFormat}"), Ack) case Ack => println("ACK") } } // 调用部分 implicit val system = ActorSystem() implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() val serverAddress = new InetSocketAddress("localhost", 8000) val server = system.actorOf(Props(classOf[Server], serverAddress)) Thread.sleep(5000) send("hi") Thread.sleep(5000) send("holy") } |
运行结果
客户端
1 2 3 4 5 6 |
Connnected -> remote: localhost/127.0.0.1:8000, local: /127.0.0.1:64775 Connnected -> remote: localhost/127.0.0.1:8000, local: /127.0.0.1:64776 hi to: akka://default/system/IO-TCP/selectors/$a/2#1599711018 hi to: akka://default/system/IO-TCP/selectors/$a/1#629943617 holy to: akka://default/system/IO-TCP/selectors/$a/2#1599711018 holy to: akka://default/system/IO-TCP/selectors/$a/1#629943617 |
服务端
1 2 3 4 5 6 7 |
Server is bound to /127.0.0.1:8000 hello hello ACK ACK ACK ACK |
打赏作者
您的支持将激励我继续创作!