Akka Stream, Tcp().bind, 客戶端關閉套接字時的處理 (Akka Stream, Tcp().bind, handle when the client close the socket)


問題描述

Akka Stream, Tcp().bind, 客戶端關閉套接字時的處理 (Akka Stream, Tcp().bind, handle when the client close the socket)

我是 Akka Stream 的新手,我想了解如何為我的項目處理 TCP 套接字。我從 Akka Stream 官方文檔中獲取了這段代碼.

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

如果我使用 netcat 從終端連接,我可以看到 Akka Stream TCP 套接字按預期工作。我還發現如果我需要使用用戶消息關閉連接,我可以使用 takeWhile 如下

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < ‑ ‑ ‑ ‑ ‑ ‑ HERE
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

我找不到的是如何管理由 CMD + C 操作關閉的套接字。Akka Stream 內部使用 Akka.io 來管理 TCP 連接,所以它必鬚髮送一些它的 PeerClose 套接字關閉時的消息。所以,我對 Akka.io 的理解告訴我,我應該收到來自套接字關閉的反饋,但我找不到如何使用 Akka Stream 來做到這一點。有沒有辦法管理它?


參考解法

方法 1:

connection.handleWith(echo) is syntactic sugar for connection.flow.joinMat(echo)(Keep.right).run() which will have the materialized value of echo, which is generally not useful. Flow.via.map.takeWhile has NotUsed as a materialized value, so that's also basically useless. However, you can attach stages to echo which will materialize differently.

One of these is .watchTermination:

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < ‑ ‑ ‑ ‑ ‑ ‑ HERE
    .map(_ + "!!!\n")
    .map(ByteString(_))
    // change the materialized value to a Future[Done]
    .watchTermination()(Keep.right)

  // you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
  //  if you don't already
  connection.handleWith(echo).onComplete {
    case Success(_) => println("stream completed successfully")
    case Failure(e) => println(e.getMessage)
  }
}

This will not distinguish between your side or the remote side closing the connection normally; it will distinguish the stream failing.

(by lucatagliaLevi Ramsey)

參考文件

  1. Akka Stream, Tcp().bind, handle when the client close the socket (CC BY‑SA 2.5/3.0/4.0)

#akka-io #akka-stream #akka #tcpsocket #scala






相關問題

Akka TCP 客戶端:如何使用 akka actor 通過 TCP 發送消息 (Akka TCP client: How can I send a message over TCP using akka actor)

Akka Stream, Tcp().bind, 客戶端關閉套接字時的處理 (Akka Stream, Tcp().bind, handle when the client close the socket)







留言討論