Skip to content

Commit

Permalink
add persistedEndOffsetMap to KafkaQueryProcessor (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
eemhu authored Aug 22, 2024
1 parent ebe1dad commit d9b9608
Showing 1 changed file with 31 additions and 12 deletions.
43 changes: 31 additions & 12 deletions src/main/java/com/teragrep/pth_06/planner/KafkaQueryProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ public class KafkaQueryProcessor implements KafkaQuery {
// NOTE, for Kafka: start is inclusive (GreaterOrEqualThan), end is exclusive
private final Logger LOGGER = LoggerFactory.getLogger(KafkaQueryProcessor.class);

private final boolean continuousProcessing;
private final Consumer<byte[], byte[]> kafkaConsumer;
private final Set<TopicPartition> topicPartitionSet;
private final Map<TopicPartition, Long> persistedEndOffsetMap;

public KafkaQueryProcessor(Config config) {
this.persistedEndOffsetMap = new HashMap<>();
this.continuousProcessing = config.kafkaConfig.kafkaContinuousProcessing;
Map<String, Object> executorKafkaProperties = config.kafkaConfig.driverOpts;
this.kafkaConsumer = new KafkaConsumer<>(executorKafkaProperties);

Expand Down Expand Up @@ -187,36 +191,51 @@ private void handleTopicAuthorizationException(String topicsRegexString, TopicAu

@VisibleForTesting
public KafkaQueryProcessor(Consumer<byte[],byte[]> consumer) {
this.persistedEndOffsetMap = new HashMap<>();
this.continuousProcessing = false;
kafkaConsumer = consumer;
topicPartitionSet = consumer.assignment();
LOGGER.debug("@VisibleForTesting KafkaQueryProcessor");
}

@Override
public Map<TopicPartition, Long> getInitialEndOffsets() {
Map<TopicPartition, Long> topicPartitionEndOffsetMap = new HashMap<>();
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionSet, Duration.ofSeconds(60));
if (persistedEndOffsetMap.isEmpty() || continuousProcessing) {

for (TopicPartition topicPartition : topicPartitionSet) {
long partitionEnd = endOffsets.get(topicPartition);
topicPartitionEndOffsetMap.put(topicPartition, partitionEnd);
Map<TopicPartition, Long> topicPartitionEndOffsetMap = new HashMap<>();
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionSet, Duration.ofSeconds(60));

for (TopicPartition topicPartition : topicPartitionSet) {
long partitionEnd = endOffsets.get(topicPartition);
topicPartitionEndOffsetMap.put(topicPartition, partitionEnd);

}
persistedEndOffsetMap.putAll(topicPartitionEndOffsetMap);
return topicPartitionEndOffsetMap;
}
else {
return persistedEndOffsetMap;
}
return topicPartitionEndOffsetMap;
}

@Override
public Map<TopicPartition, Long> getEndOffsets(KafkaOffset startOffset) {
Map<TopicPartition, Long> topicPartitionEndOffsetMap = new HashMap<>();
if (persistedEndOffsetMap.isEmpty() || continuousProcessing) {
Map<TopicPartition, Long> topicPartitionEndOffsetMap = new HashMap<>();

Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionSet, Duration.ofSeconds(60)); // TODO parametrize
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionSet, Duration.ofSeconds(60)); // TODO parametrize

for (TopicPartition topicPartition : topicPartitionSet) {
long partitionEnd = endOffsets.get(topicPartition); // partition end becomes the current end
for (TopicPartition topicPartition : topicPartitionSet) {
long partitionEnd = endOffsets.get(topicPartition); // partition end becomes the current end

topicPartitionEndOffsetMap.put(topicPartition, partitionEnd);
topicPartitionEndOffsetMap.put(topicPartition, partitionEnd);
}
persistedEndOffsetMap.putAll(topicPartitionEndOffsetMap);
return topicPartitionEndOffsetMap;
}
else {
return persistedEndOffsetMap;
}
return topicPartitionEndOffsetMap;
}

@Override
Expand Down

0 comments on commit d9b9608

Please sign in to comment.