diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java new file mode 100644 index 000000000000..fba2eec99c5c --- /dev/null +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.auto.value.AutoValue; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link PrismExecutor} builds and executes a {@link ProcessBuilder} for use by the {@link + * PrismRunner}. Prism is a {@link org.apache.beam.runners.portability.PortableRunner} maintained at + * sdks/go/cmd/prism. + */ +@AutoValue +abstract class PrismExecutor { + + private static final Logger LOG = LoggerFactory.getLogger(PrismExecutor.class); + + protected @MonotonicNonNull Process process; + protected ExecutorService executorService = Executors.newSingleThreadExecutor(); + protected @MonotonicNonNull Future future = null; + + static Builder builder() { + return new AutoValue_PrismExecutor.Builder(); + } + + /** The command to execute the Prism binary. */ + abstract String getCommand(); + + /** + * Additional arguments to pass when invoking the Prism binary. Defaults to an {@link + * Collections#emptyList()}. + */ + abstract List getArguments(); + + /** Stops the execution of the {@link Process}, created as a result of {@link #execute}. */ + void stop() { + LOG.info("Stopping Prism..."); + if (future != null) { + future.cancel(true); + } + executorService.shutdown(); + try { + boolean ignored = executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { + } + if (process == null) { + return; + } + if (!process.isAlive()) { + return; + } + process.destroy(); + try { + process.waitFor(); + } catch (InterruptedException ignored) { + } + } + + /** + * Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to STDOUT. + */ + void execute() throws IOException { + execute(createProcessBuilder().inheritIO()); + } + + /** + * Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to the + * {@param outputStream}. + */ + void execute(OutputStream outputStream) throws IOException { + execute(createProcessBuilder().redirectErrorStream(true)); + this.future = + executorService.submit( + () -> { + try { + ByteStreams.copy(checkStateNotNull(process).getInputStream(), outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + /** + * Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to the + * {@param file}. + */ + void execute(File file) throws IOException { + execute( + createProcessBuilder() + .redirectErrorStream(true) + .redirectOutput(ProcessBuilder.Redirect.appendTo(file))); + } + + private void execute(ProcessBuilder processBuilder) throws IOException { + this.process = processBuilder.start(); + LOG.info("started {}", String.join(" ", getCommandWithArguments())); + } + + private List getCommandWithArguments() { + List commandWithArguments = new ArrayList<>(); + commandWithArguments.add(getCommand()); + commandWithArguments.addAll(getArguments()); + + return commandWithArguments; + } + + private ProcessBuilder createProcessBuilder() { + return new ProcessBuilder(getCommandWithArguments()); + } + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setCommand(String command); + + abstract Builder setArguments(List arguments); + + abstract Optional> getArguments(); + + abstract PrismExecutor autoBuild(); + + final PrismExecutor build() { + if (!getArguments().isPresent()) { + setArguments(Collections.emptyList()); + } + return autoBuild(); + } + } +} diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java new file mode 100644 index 000000000000..315e585a0c5f --- /dev/null +++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.prism; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Collections; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link PrismExecutor}. */ +@RunWith(JUnit4.class) +public class PrismExecutorTest { + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule public TestName testName = new TestName(); + + @Test + public void executeThenStop() throws IOException { + PrismExecutor executor = underTest().build(); + executor.execute(); + sleep(3000L); + executor.stop(); + } + + @Test + public void executeWithStreamRedirectThenStop() throws IOException { + PrismExecutor executor = underTest().build(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + executor.execute(outputStream); + sleep(3000L); + executor.stop(); + String output = outputStream.toString(StandardCharsets.UTF_8.name()); + assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:8073"); + } + + @Test + public void executeWithFileOutputThenStop() throws IOException { + PrismExecutor executor = underTest().build(); + File log = temporaryFolder.newFile(testName.getMethodName()); + executor.execute(log); + sleep(3000L); + executor.stop(); + try (Stream stream = Files.lines(log.toPath(), StandardCharsets.UTF_8)) { + String output = stream.collect(Collectors.joining("\n")); + assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:8073"); + } + } + + @Test + public void executeWithCustomArgumentsThenStop() throws IOException { + PrismExecutor executor = + underTest().setArguments(Collections.singletonList("-job_port=5555")).build(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + executor.execute(outputStream); + sleep(3000L); + executor.stop(); + String output = outputStream.toString(StandardCharsets.UTF_8.name()); + assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:5555"); + } + + private PrismExecutor.Builder underTest() { + return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()); + } + + private void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ignored) { + } + } +}