Skip to content

Commit

Permalink
Improve callMethod logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ravinperera00 committed Nov 22, 2024
1 parent f5e06e7 commit a187c3c
Showing 1 changed file with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.ballerina.lib.data.xmldata.utils.DiagnosticErrorCode;
import io.ballerina.lib.data.xmldata.utils.DiagnosticLog;
import io.ballerina.runtime.api.Environment;
import io.ballerina.runtime.api.concurrent.StrandMetadata;
import io.ballerina.runtime.api.creators.ErrorCreator;
import io.ballerina.runtime.api.types.MethodType;
import io.ballerina.runtime.api.values.BArray;
import io.ballerina.runtime.api.values.BError;
Expand All @@ -31,6 +33,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -83,7 +87,9 @@ public int read() {
public void close() throws IOException {
super.close();
if (closeMethod != null) {
env.getRuntime().callMethod(iterator, closeMethod.getName(), null);
Thread.startVirtualThread(() -> {
env.getRuntime().callMethod(iterator, closeMethod.getName(), new StrandMetadata(true, null));
});
}
}

Expand All @@ -92,25 +98,34 @@ private boolean hasBytesInCurrentChunk() {
}

private boolean readNextChunk() throws InterruptedException {
try {
Object result = env.getRuntime().callMethod(iterator, nextMethodName, null);
if (result == null) {
done.set(true);
return true;
}
if (result instanceof BMap<?, ?>) {
BMap<BString, Object> valueRecord = (BMap<BString, Object>) result;
final BString value = Arrays.stream(valueRecord.getKeys()).findFirst().get();
final BArray arrayValue = valueRecord.getArrayValue(value);
currentChunk = arrayValue.getByteArray();
} else {
CompletableFuture<Boolean> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
try {
Object result = env.getRuntime().callMethod(iterator, nextMethodName, new StrandMetadata(true, null));
if (result == null) {
done.set(true);
balFuture.complete(!done.get());
return;
}
if (result instanceof BMap<?, ?>) {
BMap<BString, Object> valueRecord = (BMap<BString, Object>) result;
final BString value = Arrays.stream(valueRecord.getKeys()).findFirst().get();
final BArray arrayValue = valueRecord.getArrayValue(value);
currentChunk = arrayValue.getByteArray();
} else {
done.set(true);
}
} catch (BError bError) {
done.set(true);
currentChunk = new byte[0];
}
} catch (BError bError) {
done.set(true);
currentChunk = new byte[0];
balFuture.complete(!done.get());
});
try {
return balFuture.get();
} catch (BError | ExecutionException bError) {
throw ErrorCreator.createError(bError);
}
return !done.get();
}

public BError getError() {
Expand Down

0 comments on commit a187c3c

Please sign in to comment.