Skip to content

Commit

Permalink
Merge grpc headers and trailers in case of failure (#391)
Browse files Browse the repository at this point in the history
* write trailers directly into headers on failure

* attempt fixing tests

* do not add trailer if none exist

* remove useless generic type

* apply fix to copy-pasted method

* complete headers with empty metadata

* match both the strict case and the chunked case
  • Loading branch information
NavidJalali authored Oct 29, 2024
1 parent 7d5269a commit fa1bfff
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,10 @@ object PekkoHttpServerProviderScala extends PekkoHttpServerProvider with Directi
HttpEntity.LastChunk(last.extension, f(last.trailer))
}))
case _ =>
val origTrailers = response
response
.attribute(AttributeKeys.trailer)
.map(_.headers)
.getOrElse(Vector.empty)
.map(e => RawHeader(e._1, e._2))
response.addAttribute(AttributeKeys.trailer, Trailer(f(origTrailers)))
.map(trailer => f(trailer.headers.map((RawHeader.apply _).tupled)))
.flatMap(headers => if (headers.isEmpty) None else Some(Trailer(headers)))
.fold(response)(response.addAttribute(AttributeKeys.trailer, _))
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.grpc.internal.{ GrpcProtocolNative, GrpcRequestHelpers, Identity }
import pekko.grpc.scaladsl.headers.`Status`
import pekko.http.scaladsl.model.{ AttributeKeys, HttpEntity, HttpRequest, HttpResponse }
import pekko.http.scaladsl.model._
import pekko.http.scaladsl.model.HttpEntity.{ Chunked, LastChunk, Strict }
import pekko.grpc.GrpcProtocol
import pekko.stream.scaladsl.{ Sink, Source }
Expand Down Expand Up @@ -58,7 +58,7 @@ class GrpcExceptionDefaultHandleSpec
case Seq(LastChunk("", List(`Status`("3")))) => // ok
}
case _: Strict =>
response.attribute(AttributeKeys.trailer).get.headers.contains("grpc-status" -> "3")
response.headers.find(_.is("grpc-status")).map(_.value()) shouldBe Some("3")
case other =>
fail(s"Unexpected [$other]")
}
Expand Down Expand Up @@ -131,14 +131,33 @@ class GrpcExceptionDefaultHandleSpec

val reply = GreeterServiceHandler(ExampleImpl).apply(request).futureValue

val lastChunk = reply.entity.asInstanceOf[Chunked].chunks.runWith(Sink.last).futureValue.asInstanceOf[LastChunk]
// Invalid argument is '3' https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
val statusHeader = lastChunk.trailer.find { _.name == "grpc-status" }
statusHeader.map(_.value()) should be(Some("3"))
val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" }
statusMessageHeader.map(_.value()) should be(Some("No name found"))
val trailer = reply.entity match {
case chunked: Chunked =>
val lastChunkWide = chunked.chunks.runWith(Sink.last).futureValue
lastChunkWide shouldBe a[LastChunk]
val lastChunk = lastChunkWide.asInstanceOf[LastChunk]
val trailer = lastChunk.trailer

val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer)
val trailerAttribute = reply.attribute(AttributeKeys.trailer).map(_.headers)
trailerAttribute.isDefined shouldBe true
trailerAttribute.get should contain theSameElementsAs trailer.map { h => (h.name(), h.value()) }

trailer

case strict: Strict =>
strict.contentType.mediaType.toString shouldBe "application/grpc+proto"
strict.data.isEmpty shouldBe true
reply.attribute(AttributeKeys.trailer) shouldBe None

reply.headers

case _ => fail(s"Unexpected entity [$reply]. Should be one of [Chunked, Strict]")
}

trailer.collect { case header if header.is("grpc-status") => header.value() } shouldBe Seq("3")
trailer.collect { case header if header.is("grpc-message") => header.value() } shouldBe Seq("No name found")

val metadata = MetadataBuilder.fromHeaders(trailer)
metadata.getText("test-text") should be(Some("test-text-data"))
metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data")))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ object GrpcEntityHelpers {
TrailerFrame(trailers = statusHeaders(status))

def trailer(status: Status, metadata: Metadata): TrailerFrame =
TrailerFrame(trailers = statusHeaders(status) ++ metadataHeaders(metadata))
TrailerFrame(trailers = trailers(status, metadata))

def trailers(status: Status, metadata: Metadata): List[HttpHeader] =
statusHeaders(status) ++ metadataHeaders(metadata)

def statusHeaders(status: Status): List[HttpHeader] =
List(headers.`Status`(status.getCode.value.toString)) ++ Option(status.getDescription).map(d =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,18 @@ object GrpcResponseHelpers {
response(GrpcEntityHelpers(e, trail, eHandler))
}

private def response[T](entity: Source[ChunkStreamPart, NotUsed])(implicit writer: GrpcProtocolWriter) = {
private def response(entity: Source[ChunkStreamPart, NotUsed])(implicit writer: GrpcProtocolWriter) = {
HttpResponse(
headers = immutable.Seq(headers.`Message-Encoding`(writer.messageEncoding.name)),
entity = HttpEntity.Chunked(writer.contentType, entity))
}

def status(trailer: Trailers)(implicit writer: GrpcProtocolWriter): HttpResponse =
response(Source.single(writer.encodeFrame(GrpcEntityHelpers.trailer(trailer.status, trailer.metadata))))
def status(trailer: Trailers)(implicit writer: GrpcProtocolWriter): HttpResponse = {
HttpResponse(
headers =
headers.`Message-Encoding`(writer.messageEncoding.name) ::
GrpcEntityHelpers.trailers(trailer.status, trailer.metadata),
entity = HttpEntity.empty(writer.contentType)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package org.apache.pekko.grpc.internal
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.grpc.GrpcResponseMetadata
import pekko.grpc.{ javadsl, scaladsl, GrpcResponseMetadata }
import pekko.stream
import pekko.stream.{ Attributes => _, _ }
import pekko.stream.stage._
import pekko.util.FutureConverters._
import io.grpc._

import java.util.concurrent.CompletionStage
import scala.concurrent.{ Future, Promise }

@InternalApi
Expand Down Expand Up @@ -92,26 +93,29 @@ private final class PekkoNettyGrpcClientGraphStage[I, O](
var call: ClientCall[I, O] = null

val listener = new ClientCall.Listener[O] {
override def onReady(): Unit =
callback.invoke(ReadyForSending)
override def onHeaders(responseHeaders: Metadata): Unit =
matVal.success(new GrpcResponseMetadata {
private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(responseHeaders)
private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(responseHeaders)
def headers = sMetadata
def getHeaders() = jMetadata
private def makeResponseMetadata(metadata: Metadata) =
new GrpcResponseMetadata {
private lazy val sMetadata = MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(metadata)
private lazy val jMetadata = MetadataImpl.javaMetadataFromGoogleGrpcMetadata(metadata)
def headers: scaladsl.Metadata = sMetadata
def getHeaders(): javadsl.Metadata = jMetadata

private lazy val sTrailers =
trailerPromise.future.map(MetadataImpl.scalaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic)
private lazy val jTrailers = trailerPromise.future
.map(MetadataImpl.javaMetadataFromGoogleGrpcMetadata)(ExecutionContexts.parasitic)
.asJava
def trailers = sTrailers
def getTrailers() = jTrailers
})
def trailers: Future[scaladsl.Metadata] = sTrailers
def getTrailers(): CompletionStage[javadsl.Metadata] = jTrailers
}
override def onReady(): Unit =
callback.invoke(ReadyForSending)
override def onHeaders(responseHeaders: Metadata): Unit =
matVal.success(makeResponseMetadata(responseHeaders))
override def onMessage(message: O): Unit =
callback.invoke(message)
override def onClose(status: Status, trailers: Metadata): Unit = {
matVal.trySuccess(makeResponseMetadata(new Metadata()))
trailerPromise.success(trailers)
callback.invoke(Closed(status, trailers))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@ object PekkoHttpServerProviderScala extends PekkoHttpServerProvider {
HttpEntity.LastChunk(last.extension, f(last.trailer))
}))
case _ =>
val origTrailers = response
response
.attribute(AttributeKeys.trailer)
.map(_.headers)
.getOrElse(Vector.empty)
.map(e => RawHeader(e._1, e._2))
response.addAttribute(AttributeKeys.trailer, Trailer(f(origTrailers)))
.map(trailer => f(trailer.headers.map((RawHeader.apply _).tupled)))
.flatMap(headers => if (headers.isEmpty) None else Some(Trailer(headers)))
.fold(response)(response.addAttribute(AttributeKeys.trailer, _))
})
}

Expand Down

0 comments on commit fa1bfff

Please sign in to comment.