diff --git a/netty-reactive-streams-http/pom.xml b/netty-reactive-streams-http/pom.xml index 71ed43b..e1f2dea 100644 --- a/netty-reactive-streams-http/pom.xml +++ b/netty-reactive-streams-http/pom.xml @@ -32,9 +32,8 @@ testng - com.typesafe.akka - akka-stream_2.12 - ${akka-stream.version} + org.apache.pekko + pekko-stream_2.12 diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/FullStackHttpIdentityProcessorVerificationTest.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/FullStackHttpIdentityProcessorVerificationTest.java index 840d6df..92953a8 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/FullStackHttpIdentityProcessorVerificationTest.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/FullStackHttpIdentityProcessorVerificationTest.java @@ -1,10 +1,10 @@ package com.typesafe.netty.http; -import akka.NotUsed; -import akka.actor.ActorSystem; -import akka.japi.function.Function; -import akka.stream.Materializer; -import akka.stream.javadsl.*; +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.japi.function.Function; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.*; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.codec.http.*; @@ -30,11 +30,11 @@ * body back. * * The server uses the {@link HttpStreamsServerHandler}, and then exposes the messages sent/received by - * that using reactive streams. So it effectively uses streams of streams. It then uses Akka streams + * that using reactive streams. So it effectively uses streams of streams. It then uses Pekko streams * to actually handle the requests, echoing the bodies back in the responses as is. * * The client uses the {@link HttpStreamsClientHandler}, and then exposes the messages sent/received by - * that using reactive streams, so it too is effectively a stream of streams. Here Akka streams is used + * that using reactive streams, so it too is effectively a stream of streams. Here Pekko streams is used * to split the String bodies into many chunks, for more interesting verification of the bodies, and then * combines all the chunks together back into a String at the end. */ @@ -72,7 +72,7 @@ public HttpResponse apply(HttpRequest request) throws Exception { serverBindChannel = server.bind(new InetSocketAddress("127.0.0.1", 0), new Callable>() { @Override public Processor call() throws Exception { - return AkkaStreamsUtil.flowToProcessor(flow, materializer); + return PekkoStreamsUtil.flowToProcessor(flow, materializer); } }).await().channel(); } @@ -112,7 +112,7 @@ public HttpRequest apply(String body) throws Exception { } }) // Send the flow via the HTTP client connection - .via(AkkaStreamsUtil.processorToFlow(connection)) + .via(PekkoStreamsUtil.processorToFlow(connection)) // Convert the responses to Strings .mapAsync(4, new Function>() { @Override @@ -121,7 +121,7 @@ public CompletionStage apply(HttpResponse response) throws Exception { } }); - return AkkaStreamsUtil.flowToProcessor(flow, materializer); + return PekkoStreamsUtil.flowToProcessor(flow, materializer); } private Processor getProcessor(ProcessorHttpClient client) { diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpHelper.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpHelper.java index f12e167..dc6e62a 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpHelper.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpHelper.java @@ -1,10 +1,10 @@ package com.typesafe.netty.http; -import akka.japi.function.Function2; -import akka.stream.Materializer; -import akka.stream.javadsl.AsPublisher; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; +import org.apache.pekko.japi.function.Function2; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.AsPublisher; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.*; diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpStreamsTest.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpStreamsTest.java index d92bc9f..83a286b 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpStreamsTest.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/HttpStreamsTest.java @@ -1,9 +1,9 @@ package com.typesafe.netty.http; -import akka.actor.ActorSystem; -import akka.japi.function.Function; -import akka.stream.Materializer; -import akka.stream.javadsl.Flow; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.japi.function.Function; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.Flow; import com.typesafe.netty.HandlerPublisher; import com.typesafe.netty.HandlerSubscriber; import io.netty.bootstrap.Bootstrap; @@ -226,7 +226,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { HandlerPublisher publisher = new HandlerPublisher<>(ctx.executor(), HttpRequest.class); HandlerSubscriber subscriber = new HandlerSubscriber<>(ctx.executor()); ctx.pipeline().addLast(publisher, subscriber); - Processor processor = AkkaStreamsUtil.flowToProcessor(Flow.create() + Processor processor = PekkoStreamsUtil.flowToProcessor(Flow.create() .mapAsync(4, new Function>() { public CompletionStage apply(HttpRequest request) throws Exception { return helper.extractBodyAsync(request); diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/AkkaStreamsUtil.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/PekkoStreamsUtil.java similarity index 82% rename from netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/AkkaStreamsUtil.java rename to netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/PekkoStreamsUtil.java index 38487c4..ddbe4ba 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/AkkaStreamsUtil.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/PekkoStreamsUtil.java @@ -1,14 +1,14 @@ package com.typesafe.netty.http; -import akka.japi.Pair; -import akka.japi.function.Creator; -import akka.stream.Materializer; -import akka.stream.javadsl.*; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.japi.function.Creator; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.*; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; -public class AkkaStreamsUtil { +public class PekkoStreamsUtil { public static Processor flowToProcessor(Flow flow, Materializer materializer) { Pair, Publisher> pair = diff --git a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/WebSocketsTest.java b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/WebSocketsTest.java index 2dd4a12..8fb10ee 100644 --- a/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/WebSocketsTest.java +++ b/netty-reactive-streams-http/src/test/java/com/typesafe/netty/http/WebSocketsTest.java @@ -1,9 +1,9 @@ package com.typesafe.netty.http; -import akka.actor.ActorSystem; -import akka.japi.function.Function; -import akka.stream.Materializer; -import akka.stream.javadsl.Flow; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.japi.function.Function; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.stream.javadsl.Flow; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; diff --git a/pom.xml b/pom.xml index 8a87378..62e135b 100644 --- a/pom.xml +++ b/pom.xml @@ -75,9 +75,9 @@ test - com.typesafe.akka - akka-stream_2.12 - ${akka-stream.version} + org.apache.pekko + pekko-stream_2.12 + ${pekko-stream.version} test @@ -86,7 +86,7 @@ 4.1.97.Final 1.0.4 - 2.6.21 + 1.0.1 5.1.9 3.3.0