diff --git a/src/main/java/com/teragrep/pth_06/planner/KafkaQueryProcessor.java b/src/main/java/com/teragrep/pth_06/planner/KafkaQueryProcessor.java index 3a1474df..6c31d2c3 100644 --- a/src/main/java/com/teragrep/pth_06/planner/KafkaQueryProcessor.java +++ b/src/main/java/com/teragrep/pth_06/planner/KafkaQueryProcessor.java @@ -76,10 +76,14 @@ public class KafkaQueryProcessor implements KafkaQuery { // NOTE, for Kafka: start is inclusive (GreaterOrEqualThan), end is exclusive private final Logger LOGGER = LoggerFactory.getLogger(KafkaQueryProcessor.class); + private final boolean continuousProcessing; private final Consumer kafkaConsumer; private final Set topicPartitionSet; + private final Map persistedEndOffsetMap; public KafkaQueryProcessor(Config config) { + this.persistedEndOffsetMap = new HashMap<>(); + this.continuousProcessing = config.kafkaConfig.kafkaContinuousProcessing; Map executorKafkaProperties = config.kafkaConfig.driverOpts; this.kafkaConsumer = new KafkaConsumer<>(executorKafkaProperties); @@ -187,6 +191,8 @@ private void handleTopicAuthorizationException(String topicsRegexString, TopicAu @VisibleForTesting public KafkaQueryProcessor(Consumer consumer) { + this.persistedEndOffsetMap = new HashMap<>(); + this.continuousProcessing = false; kafkaConsumer = consumer; topicPartitionSet = consumer.assignment(); LOGGER.debug("@VisibleForTesting KafkaQueryProcessor"); @@ -194,29 +200,42 @@ public KafkaQueryProcessor(Consumer consumer) { @Override public Map getInitialEndOffsets() { - Map topicPartitionEndOffsetMap = new HashMap<>(); - Map endOffsets = kafkaConsumer.endOffsets(topicPartitionSet, Duration.ofSeconds(60)); + if (persistedEndOffsetMap.isEmpty() || continuousProcessing) { - for (TopicPartition topicPartition : topicPartitionSet) { - long partitionEnd = endOffsets.get(topicPartition); - topicPartitionEndOffsetMap.put(topicPartition, partitionEnd); + Map topicPartitionEndOffsetMap = new HashMap<>(); + Map endOffsets = kafkaConsumer.endOffsets(topicPartitionSet, Duration.ofSeconds(60)); + + for (TopicPartition topicPartition : topicPartitionSet) { + long partitionEnd = endOffsets.get(topicPartition); + topicPartitionEndOffsetMap.put(topicPartition, partitionEnd); + } + persistedEndOffsetMap.putAll(topicPartitionEndOffsetMap); + return topicPartitionEndOffsetMap; + } + else { + return persistedEndOffsetMap; } - return topicPartitionEndOffsetMap; } @Override public Map getEndOffsets(KafkaOffset startOffset) { - Map topicPartitionEndOffsetMap = new HashMap<>(); + if (persistedEndOffsetMap.isEmpty() || continuousProcessing) { + Map topicPartitionEndOffsetMap = new HashMap<>(); - Map endOffsets = kafkaConsumer.endOffsets(topicPartitionSet, Duration.ofSeconds(60)); // TODO parametrize + Map endOffsets = kafkaConsumer.endOffsets(topicPartitionSet, Duration.ofSeconds(60)); // TODO parametrize - for (TopicPartition topicPartition : topicPartitionSet) { - long partitionEnd = endOffsets.get(topicPartition); // partition end becomes the current end + for (TopicPartition topicPartition : topicPartitionSet) { + long partitionEnd = endOffsets.get(topicPartition); // partition end becomes the current end - topicPartitionEndOffsetMap.put(topicPartition, partitionEnd); + topicPartitionEndOffsetMap.put(topicPartition, partitionEnd); + } + persistedEndOffsetMap.putAll(topicPartitionEndOffsetMap); + return topicPartitionEndOffsetMap; + } + else { + return persistedEndOffsetMap; } - return topicPartitionEndOffsetMap; } @Override