diff --git a/rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpConnection.kt b/rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpConnection.kt index 3ad8ac7d..b9250358 100644 --- a/rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpConnection.kt +++ b/rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpConnection.kt @@ -91,18 +91,21 @@ private class KtorTcpConnection( } } -@OptIn(InternalAPI::class) // TODO? +@OptIn(InternalAPI::class) private fun ByteWriteChannel.writeFrame(frame: Buffer) { writeBuffer.writeInt24(frame.size.toInt()) writeBuffer.transferFrom(frame) } -@OptIn(InternalAPI::class) // TODO? +@OptIn(InternalAPI::class) private suspend fun ByteReadChannel.readFrame(): Buffer? { - while (availableForRead < 3 && awaitContent(3)) yield() - if (availableForRead == 0) return null - - val length = readBuffer.readInt24() +// TODO: replace after ktor 3.0.2: https://youtrack.jetbrains.com/issue/KTOR-7746 + val length = readBuffer(3).also { + if (it.exhausted()) return null + }.readInt24() +// while (availableForRead < 3 && awaitContent(3)) yield() +// if (availableForRead == 0) return null +// val length = readBuffer.readInt24() return readBuffer(length).also { it.require(length.toLong()) }