Skip to content

Commit

Permalink
fix: fail if conditions not met or launch failed
Browse files Browse the repository at this point in the history
Do not fail on resource cleanup error but log warning.
  • Loading branch information
fbiville committed Dec 16, 2024
1 parent 891bb58 commit 3fe399c
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions src/main/java/org/neo4j/dataflow/LocalRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.PipelineOperator.Config;
import org.apache.beam.it.common.PipelineOperator.Result;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils;
import org.apache.beam.it.gcp.dataflow.DirectRunnerClient;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.apache.beam.it.neo4j.Neo4jResourceManager;
import org.apache.beam.it.neo4j.conditions.Neo4jQueryCheck;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.ITypeConverter;
Expand All @@ -33,6 +39,8 @@
@Command(name = "local-dataflow", version = "0.0.1", mixinStandardHelpOptions = true)
public class LocalRunner implements Runnable, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(LocalRunner.class);

private GcsResourceManager gcs;
private Neo4jResourceManager neo4j;
private PipelineLauncher launcher;
Expand Down Expand Up @@ -70,13 +78,24 @@ public void run() {

uploadTemplateInputs();
execution = startPipeline(credentials);
waitUntilDone();
Result result = waitUntilDone();
boolean conditionsSet = !countQueryChecks.isEmpty();
if (conditionsSet && result != Result.CONDITION_MET) {
throw new RuntimeException("Conditions not met. Please check the above logs");
}
if (!conditionsSet && result != Result.LAUNCH_FINISHED) {
throw new RuntimeException("Execution failed. Please check the above logs");
}
}

@Override
public void close() {
stopPipeline();
ResourceManagerUtils.cleanResources(gcs, neo4j);
try {
ResourceManagerUtils.cleanResources(gcs, neo4j);
} catch (Exception e) {
LOGGER.warn("Could not properly clean up resources: {}", e.getMessage());
}
}

private void uploadTemplateInputs() {
Expand Down Expand Up @@ -131,7 +150,7 @@ private static String executionId() {
return String.format("local-runner-%d-%s", System.currentTimeMillis(), suffix);
}

private void waitUntilDone() {
private Result waitUntilDone() {
Config config = Config.builder()
.setJobId(execution.jobId())
.setProject(project)
Expand All @@ -141,9 +160,9 @@ private void waitUntilDone() {
PipelineOperator operator = new PipelineOperator(launcher);
if (countQueryChecks.isEmpty()) {
operator.waitUntilDone(config);
return;
return null;
}
operator.waitForCondition(
return operator.waitForCondition(
config,
countQueryChecks.stream()
.map(check -> check.asRunnableCondition(neo4j))
Expand Down

0 comments on commit 3fe399c

Please sign in to comment.