Skip to content

Commit

Permalink
[test] use AtomicBoolean
Browse files Browse the repository at this point in the history
  • Loading branch information
mas-chen committed Jan 12, 2024
1 parent 65e4c74 commit 53447ff
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -93,7 +94,7 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka
private MultipleFuturesAvailabilityHelper availabilityHelper;
private boolean isActivelyConsumingSplits;
private boolean isNoMoreSplits;
private boolean restartingReaders;
private AtomicBoolean restartingReaders;

public DynamicKafkaSourceReader(
SourceReaderContext readerContext,
Expand All @@ -113,7 +114,7 @@ public DynamicKafkaSourceReader(
this.availabilityHelper = new MultipleFuturesAvailabilityHelper(0);
this.isNoMoreSplits = false;
this.isActivelyConsumingSplits = false;
this.restartingReaders = false;
this.restartingReaders = new AtomicBoolean();
this.clustersProperties = new HashMap<>();
}

Expand All @@ -136,7 +137,7 @@ public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
}

if (restartingReaders) {
if (restartingReaders.get()) {
logger.info("Poll next invoked while restarting readers");
return logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
}
Expand Down Expand Up @@ -282,7 +283,7 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
// comes at the cost of complexity and it is an optimization for a corner case. We can
// revisit if necessary.
if (!newClustersAndTopics.equals(currentMetadataFromState)) {
restartingReaders = true;
restartingReaders.set(true);
closeAllReadersAndClearState();

clustersProperties.putAll(newClustersProperties);
Expand Down Expand Up @@ -485,7 +486,7 @@ private void completeAndResetAvailabilityHelper() {
.getAvailableFuture()
.whenComplete(
(ignore, t) -> {
restartingReaders = false;
restartingReaders.set(false);
cachedPreviousFuture.complete(null);
});
}
Expand Down

0 comments on commit 53447ff

Please sign in to comment.