問題描述
Akka Stream, Tcp().bind, 客戶端關閉套接字時的處理 (Akka Stream, Tcp().bind, handle when the client close the socket)
我是 Akka Stream 的新手,我想了解如何為我的項目處理 TCP 套接字。我從 Akka Stream 官方文檔中獲取了這段代碼.
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
如果我使用 netcat 從終端連接,我可以看到 Akka Stream TCP 套接字按預期工作。我還發現如果我需要使用用戶消息關閉連接,我可以使用 takeWhile
如下
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < ‑ ‑ ‑ ‑ ‑ ‑ HERE
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
我找不到的是如何管理由 CMD + C
操作關閉的套接字。Akka Stream 內部使用 Akka.io 來管理 TCP 連接,所以它必鬚髮送一些它的 PeerClose
套接字關閉時的消息。所以,我對 Akka.io 的理解告訴我,我應該收到來自套接字關閉的反饋,但我找不到如何使用 Akka Stream 來做到這一點。有沒有辦法管理它?
參考解法
方法 1:
connection.handleWith(echo)
is syntactic sugar for connection.flow.joinMat(echo)(Keep.right).run()
which will have the materialized value of echo
, which is generally not useful. Flow.via.map.takeWhile
has NotUsed
as a materialized value, so that's also basically useless. However, you can attach stages to echo
which will materialize differently.
One of these is .watchTermination
:
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < ‑ ‑ ‑ ‑ ‑ ‑ HERE
.map(_ + "!!!\n")
.map(ByteString(_))
// change the materialized value to a Future[Done]
.watchTermination()(Keep.right)
// you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
// if you don't already
connection.handleWith(echo).onComplete {
case Success(_) => println("stream completed successfully")
case Failure(e) => println(e.getMessage)
}
}
This will not distinguish between your side or the remote side closing the connection normally; it will distinguish the stream failing.