Akka Stream
提供了使用流来处理文件IO和TCP链接的方法. 虽然方法大体上和使用AKKA IO
来以Actor
的方式处理TCP相似, 使用Akka Stream
可以帮你从手动应对back-pressure
信号中解放出来, 因为库使得它对用户来说是透明的.
##1.11.1 TCP的流化(Streaming TCP)
###接受链接: 回声服务器(Echo Server)
要创建一个简单的回声服务器我们需要绑定到一个给定的地址, 它会返回一个每当有服务器需要处理链接时就会输出IncomingConnection
元素的源Source[IncomingConnection, Future[ServerBinding]]
:
val binding: Future[ServerBinding] =
Tcp().bind("127.0.0.1", 8888).to(Sink.ignore).run()
binding.map { b=>
b.unbind() onComplete {
case _=> // ...
}
}
下一步, 我们只需用一个Flow
简单的处理每一个到来的链接, 这个Flow
将被作为一个stage
, 负责处理TCP套接字到来的数据并且输出ByteStrings
到TCP套接字. 由于一个ByteString
并不一定确切的对应到一行文本(客户端可能以块的方式发送一行), 我们采用一个名为Framing.delimiter
的Flow
工具类来把输入块合并成实际的一行文本. 例子中最后一个布尔类型参数指示我们需要接受一个显示的行结束符, 甚至连链接关闭前最后的消息也需要. 在这个例子中我们简单的为每一个到来的文本消息添加了感叹号并且把它们输出到流:
import akka.stream.io.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
注意到虽然在Akka Stream
中创建的代码块大部分是可以复用和可以随意共享的, 但是对于InCommingConnection
的Flow
并不是这样,因为它对应了一个存在并且已经接受的链接,它的处理只能被具象化一次.
关闭连接可以通过在服务器逻辑中取消这个incoming connection Flow
(例如 把它的下游链接到Sink.cancelled
并且上游链接到一个Source.empty
). 也可以通过取消IncomingConnection
源的链接来关闭服务器的套接字.
我们可以使用netcat
来发送数据到TCP套接字, 以此来测试TCP服务器:
$ echo -n "Hello World" | netcat 127.0.0.1 8888
Hello World!!!
###链接:REPL 客户端
在这个例子中我们在TCP上实现一个比较原生的REPL客户端.我们假设已经知道某一个服务通过TCP暴露了一个简单的命令行接口,并且倾向于在TCP上用Akka Stream
来和它交互. 我们调用outgoingConnection
方法来开启一个对外的链接:
val connection = Tcp().outgoingConnection("127.0.0.1", 8888)
val replParser = new PushStage[String, ByteString] {
override def onPush(elem: String, ctx: Context[ByteString]): SyncDirective = {
elem match {
case "q" ⇒ ctx.pushAndFinish(ByteString("BYE\n"))
case _ ⇒ ctx.push(ByteString(s"$elem\n"))
}
}
}
val repl = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
.transform(() => replParser)
connection.join(repl).run()
我们用于处理服务器交互的这个REPL FLOW
, 首先打印了服务器的回复, 然后等待命令行的输入(在这里的阻塞调用仅仅是为了简化情况)并把它转换成一个ByteString
, 还将把它输出给远端服务器. 接着我们只需简单的把TCP
通道和这个处理步骤连接起来, 这个操作将具象化整个过程, 一旦服务器回复一个启动消息
,它将开始处理数据.
一个具有弹性的REPL
客户端会比这个例子复杂的多, 例如会把读取输入的过程拆解到一个单独的mapAsync
过程去, 且会让服务器在任何给定时刻输出更多的数据而不仅仅是一个ByteString
, 这些改进留给读者来实现.
###在具有back-pressure
的循环中避免死锁和活性问题
当写这样的端对端具有back-pressure
的系统时, 你有可能以一个循环的情况来实现, 在这种情况下任一方都在等待另一方来启动这个会话. 没有必要过多的去寻找这样的back-pressure
循环的例子.其实在前面展示的两个例子里, 我们总是假设我们连接的另一方会启动会话, 实际意味着双方都是具有back-pressure
特性, 且会话不会被开始. 在1.5.9图循环、活跃性以及死锁深度讨论了很多方法来处理这种情况, 然而在客户端-服务端的场景中, 方法往往只需要简化成某一端简单的发送一个初始化消息即可.
注意: 在具有
back-pressure
的循环中(即使是不同系统之间也会出现), 你有时候需要决定哪一方来启动会话来使得循环运转起来. 通常可以在某一方(会话启动方)注入一个启动消息来实现.
来打破这个back-pressure
循环我们需要注入一些启动消息, 作为一个"会话启动者". 首先, 我们需要决定哪一方是被动哪一方是主动. 还好在大部分的情况下找到正确的一方启动会话是相当简单的, 因为这通常由我们用流来实现的协议所固有. 在一个类聊天的应用中, 和我们的例子相仿, 通常是由服务端发送一个hello
消息来启动会话:
connections runForeach { connection =>
val serverLogic = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// 服务器逻辑 解析到来的命令
val commandParser = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]): SyncDirective = {
elem match {
case "BYE" ⇒ ctx.finish()
case _ ⇒ ctx.push(elem + "!")
}
}
}
import connection._
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!\n"
val welcome = Source.single(ByteString(welcomeMsg))
val echo = b.add(Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ commandParser)
.map(_ + "\n")
.map(ByteString(_)))
val concat = b.add(Concat[ByteString]())
// 首先我们输出欢迎消息,
welcome ~> concat.in(0)
// 接着我们使用 echo-logic Flow
echo.outlet ~> concat.in(1)
FlowShape(echo.in, concat.out)
})
connection.handleWith(serverLogic)
}
我们使用GraphDSL创建一个Flow
的方法已经在Constructing Sources, Sinks and Flows from Partial Graphs
中详细介绍了, 基础的概念十分简单, 我们可以将任意复杂的逻辑封装成一个Flow
只要它暴露出相同的接口, 这意味着它需要暴露仅一个Outlet
和仅一个Inlet
接口, 这两者将会连接到TCP通道. 在这个例子中我们使用一个Concat
的图处理步骤来注入一个启动消息, 接着用echo
逻辑继续处理其他到来的数据. 你应该使用这种模式来把复杂的业务封装成多个Flow
并将它们附着到IO流
来实现你的自定义且有可能复杂的TCP服务器.
在这个例子中客户端和服务端都可能需要通过一个已解析的命令来结束流, 服务器端是通过输出BYE
, 而客户端是通过输出q
. 这是通过使用自定义的PushStage
来实现的, 一旦它遇到这样的命令它将完成流.
##1.11.2 文件IO流化
Akka Stream
提供了简单的作用在ByteString
实例上的Source
和Sink
, 来实现在文件上的IO操作.
注意: 由于当前版本的
Akka
(2.3.x)需要兼容JDK6, 当前的文件IO实现不能使用在JDK7(或者更新版本)中引入的异步文件IO操作. 一旦Akka
能够使用JDK8(从2.4.x开始), 这些实现将会被更新,采用新的NIO API
(即AsynchronousFileChannel
)
把文件中的数据流化十分简单, 只需创建一个FileIO.fromFile
, 给定一个目标文件和一个可选的chunkSize
, 这个chunkSize
用来决定多大的buff视为一个在流中的"元素"":
import akka.stream.io._
val file = new File("example.csv")
val foreach: Future[Long] = FileIO.fromFile(file)
.to(Sink.ignore)
.run()
请注意这些处理步骤是由Actor
实现的并且默认是配置为运行在一个预定义的基于线程池的分发器上, 该分发器只为文件IO使用. 这一点非常重要, 因为这样一来把阻塞的文件IO操作从ActorSystem
中隔离了出来, 使得每个分发器能够以最有效的方式来使用. 如果你希望为文件IO操作配置一个全局自定义的分发器, 你可以这样做:改变akka.stream.blocking-io-dispatcher
的配置,或者像以下代码中在某个特定的步骤中指定使用一个自定义的分发器:
FileIO.fromFile(file)
.withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"))