From fa1bfff67464f8015bfc1cc0e5af07e8e4f11b5d Mon Sep 17 00:00:00 2001 From: navid Date: Tue, 29 Oct 2024 15:17:13 +0100 Subject: [PATCH] Merge grpc headers and trailers in case of failure (#391) * write trailers directly into headers on failure * attempt fixing tests * do not add trailer if none exist * remove useless generic type * apply fix to copy-pasted method * complete headers with empty metadata * match both the strict case and the chunked case --- .../PekkoHttpServerProviderScala.scala | 9 ++--- .../GrpcExceptionDefaultHandleSpec.scala | 37 ++++++++++++++----- .../grpc/internal/GrpcEntityHelpers.scala | 5 ++- .../grpc/internal/GrpcResponseHelpers.scala | 12 ++++-- .../PekkoNettyGrpcClientGraphStage.scala | 28 ++++++++------ .../apache/pekko/grpc/GrpcInteropSpec.scala | 9 ++--- 6 files changed, 65 insertions(+), 35 deletions(-) diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala index af3b3221..367bb43d 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/interop/PekkoHttpServerProviderScala.scala @@ -118,11 +118,10 @@ object PekkoHttpServerProviderScala extends PekkoHttpServerProvider with Directi HttpEntity.LastChunk(last.extension, f(last.trailer)) })) case _ => - val origTrailers = response + response .attribute(AttributeKeys.trailer) - .map(_.headers) - .getOrElse(Vector.empty) - .map(e => RawHeader(e._1, e._2)) - response.addAttribute(AttributeKeys.trailer, Trailer(f(origTrailers))) + .map(trailer => f(trailer.headers.map((RawHeader.apply _).tupled))) + .flatMap(headers => if (headers.isEmpty) None else Some(Trailer(headers))) + .fold(response)(response.addAttribute(AttributeKeys.trailer, _)) }) } diff --git a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala index 2d84893f..26ceff02 100644 --- a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala +++ b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionDefaultHandleSpec.scala @@ -17,7 +17,7 @@ import org.apache.pekko import pekko.actor.ActorSystem import pekko.grpc.internal.{ GrpcProtocolNative, GrpcRequestHelpers, Identity } import pekko.grpc.scaladsl.headers.`Status` -import pekko.http.scaladsl.model.{ AttributeKeys, HttpEntity, HttpRequest, HttpResponse } +import pekko.http.scaladsl.model._ import pekko.http.scaladsl.model.HttpEntity.{ Chunked, LastChunk, Strict } import pekko.grpc.GrpcProtocol import pekko.stream.scaladsl.{ Sink, Source } @@ -58,7 +58,7 @@ class GrpcExceptionDefaultHandleSpec case Seq(LastChunk("", List(`Status`("3")))) => // ok } case _: Strict => - response.attribute(AttributeKeys.trailer).get.headers.contains("grpc-status" -> "3") + response.headers.find(_.is("grpc-status")).map(_.value()) shouldBe Some("3") case other => fail(s"Unexpected [$other]") } @@ -131,14 +131,33 @@ class GrpcExceptionDefaultHandleSpec val reply = GreeterServiceHandler(ExampleImpl).apply(request).futureValue - val lastChunk = reply.entity.asInstanceOf[Chunked].chunks.runWith(Sink.last).futureValue.asInstanceOf[LastChunk] - // Invalid argument is '3' https://github.com/grpc/grpc/blob/master/doc/statuscodes.md - val statusHeader = lastChunk.trailer.find { _.name == "grpc-status" } - statusHeader.map(_.value()) should be(Some("3")) - val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" } - statusMessageHeader.map(_.value()) should be(Some("No name found")) + val trailer = reply.entity match { + case chunked: Chunked => + val lastChunkWide = chunked.chunks.runWith(Sink.last).futureValue + lastChunkWide shouldBe a[LastChunk] + val lastChunk = lastChunkWide.asInstanceOf[LastChunk] + val trailer = lastChunk.trailer - val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer) + val trailerAttribute = reply.attribute(AttributeKeys.trailer).map(_.headers) + trailerAttribute.isDefined shouldBe true + trailerAttribute.get should contain theSameElementsAs trailer.map { h => (h.name(), h.value()) } + + trailer + + case strict: Strict => + strict.contentType.mediaType.toString shouldBe "application/grpc+proto" + strict.data.isEmpty shouldBe true + reply.attribute(AttributeKeys.trailer) shouldBe None + + reply.headers + + case _ => fail(s"Unexpected entity [$reply]. Should be one of [Chunked, Strict]") + } + + trailer.collect { case header if header.is("grpc-status") => header.value() } shouldBe Seq("3") + trailer.collect { case header if header.is("grpc-message") => header.value() } shouldBe Seq("No name found") + + val metadata = MetadataBuilder.fromHeaders(trailer) metadata.getText("test-text") should be(Some("test-text-data")) metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data"))) } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcEntityHelpers.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcEntityHelpers.scala index 75a7da03..128b53f8 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcEntityHelpers.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcEntityHelpers.scala @@ -74,7 +74,10 @@ object GrpcEntityHelpers { TrailerFrame(trailers = statusHeaders(status)) def trailer(status: Status, metadata: Metadata): TrailerFrame = - TrailerFrame(trailers = statusHeaders(status) ++ metadataHeaders(metadata)) + TrailerFrame(trailers = trailers(status, metadata)) + + def trailers(status: Status, metadata: Metadata): List[HttpHeader] = + statusHeaders(status) ++ metadataHeaders(metadata) def statusHeaders(status: Status): List[HttpHeader] = List(headers.`Status`(status.getCode.value.toString)) ++ Option(status.getDescription).map(d => diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala index bb41a680..75cb3e2b 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/GrpcResponseHelpers.scala @@ -101,12 +101,18 @@ object GrpcResponseHelpers { response(GrpcEntityHelpers(e, trail, eHandler)) } - private def response[T](entity: Source[ChunkStreamPart, NotUsed])(implicit writer: GrpcProtocolWriter) = { + private def response(entity: Source[ChunkStreamPart, NotUsed])(implicit writer: GrpcProtocolWriter) = { HttpResponse( headers = immutable.Seq(headers.`Message-Encoding`(writer.messageEncoding.name)), entity = HttpEntity.Chunked(writer.contentType, entity)) } - def status(trailer: Trailers)(implicit writer: GrpcProtocolWriter): HttpResponse = - response(Source.single(writer.encodeFrame(GrpcEntityHelpers.trailer(trailer.status, trailer.metadata)))) + def status(trailer: Trailers)(implicit writer: GrpcProtocolWriter): HttpResponse = { + HttpResponse( + headers = + headers.`Message-Encoding`(writer.messageEncoding.name) :: + GrpcEntityHelpers.trailers(trailer.status, trailer.metadata), + entity = HttpEntity.empty(writer.contentType) + ) + } } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala index e129b5b3..5d0e37f3 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/PekkoNettyGrpcClientGraphStage.scala @@ -16,13 +16,14 @@ package org.apache.pekko.grpc.internal import org.apache.pekko import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts -import pekko.grpc.GrpcResponseMetadata +import pekko.grpc.{ javadsl, scaladsl, GrpcResponseMetadata } import pekko.stream import pekko.stream.{ Attributes => _, _ } import pekko.stream.stage._ import pekko.util.FutureConverters._ import io.grpc._ +import java.util.concurrent.CompletionStage import scala.concurrent.{ Future, Promise } @InternalApi @@ -92,26 +93,29 @@ private final class PekkoNettyGrpcClientGraphStage[I, O]( var call: ClientCall[I, O] = null val listener = new ClientCall.Listener[O] { - override def onReady(): Unit = - callback.invoke(ReadyForSending) - override def onHeaders(responseHeaders: Metadata): Unit = - matVal.success(new GrpcResponseMetadata { - private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(responseHeaders) - private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(responseHeaders) - def headers = sMetadata - def getHeaders() = jMetadata + private def makeResponseMetadata(metadata: Metadata) = + new GrpcResponseMetadata { + private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(metadata) + private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(metadata) + def headers: scaladsl.Metadata = sMetadata + def getHeaders(): javadsl.Metadata = jMetadata private lazy val sTrailers = trailerPromise.future.map(MetadataImpl.scalaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic) private lazy val jTrailers = trailerPromise.future .map(MetadataImpl.javaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic) .asJava - def trailers = sTrailers - def getTrailers() = jTrailers - }) + def trailers: Future[scaladsl.Metadata] = sTrailers + def getTrailers(): CompletionStage[javadsl.Metadata] = jTrailers + } + override def onReady(): Unit = + callback.invoke(ReadyForSending) + override def onHeaders(responseHeaders: Metadata): Unit = + matVal.success(makeResponseMetadata(responseHeaders)) override def onMessage(message: O): Unit = callback.invoke(message) override def onClose(status: Status, trailers: Metadata): Unit = { + matVal.trySuccess(makeResponseMetadata(new Metadata())) trailerPromise.success(trailers) callback.invoke(Closed(status, trailers)) } diff --git a/sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/test/scala/org/apache/pekko/grpc/GrpcInteropSpec.scala b/sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/test/scala/org/apache/pekko/grpc/GrpcInteropSpec.scala index f2052823..d7686830 100644 --- a/sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/test/scala/org/apache/pekko/grpc/GrpcInteropSpec.scala +++ b/sbt-plugin/src/sbt-test/gen-scala-server/00-interop/src/test/scala/org/apache/pekko/grpc/GrpcInteropSpec.scala @@ -74,12 +74,11 @@ object PekkoHttpServerProviderScala extends PekkoHttpServerProvider { HttpEntity.LastChunk(last.extension, f(last.trailer)) })) case _ => - val origTrailers = response + response .attribute(AttributeKeys.trailer) - .map(_.headers) - .getOrElse(Vector.empty) - .map(e => RawHeader(e._1, e._2)) - response.addAttribute(AttributeKeys.trailer, Trailer(f(origTrailers))) + .map(trailer => f(trailer.headers.map((RawHeader.apply _).tupled))) + .flatMap(headers => if (headers.isEmpty) None else Some(Trailer(headers))) + .fold(response)(response.addAttribute(AttributeKeys.trailer, _)) }) }