From 3fa2b995c6dbbd949e0367b5366f4bc743d04ab0 Mon Sep 17 00:00:00 2001 From: German Osin Date: Tue, 29 Jun 2021 19:00:31 +0300 Subject: [PATCH] #122 Fixed npe in backward emitter (#601) --- .../ui/emitter/BackwardRecordEmitter.java | 52 ++++++++++--------- .../src/main/resources/application-sdp.yml | 2 +- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index ee407a167f..088366c238 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -40,42 +40,44 @@ public class BackwardRecordEmitter ) { final Map partitionsOffsets = offsetsSeek.getPartitionsOffsets(consumer); - log.info("partition offsets: {}", partitionsOffsets); + log.debug("partition offsets: {}", partitionsOffsets); var waitingOffsets = offsetsSeek.waitingOffsets(consumer, partitionsOffsets.keySet()); - log.info("waittin offsets {} {}", + log.debug("waittin offsets {} {}", waitingOffsets.getBeginOffsets(), waitingOffsets.getEndOffsets() ); while (!sink.isCancelled() && !waitingOffsets.beginReached()) { for (Map.Entry entry : partitionsOffsets.entrySet()) { final Long lowest = waitingOffsets.getBeginOffsets().get(entry.getKey().partition()); - consumer.assign(Collections.singleton(entry.getKey())); - final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition); - log.info("Polling {} from {}", entry.getKey(), offset); - consumer.seek(entry.getKey(), offset); - ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); - final List> partitionRecords = - records.records(entry.getKey()).stream() - .filter(r -> r.offset() < partitionsOffsets.get(entry.getKey())) - .collect(Collectors.toList()); - Collections.reverse(partitionRecords); + if (lowest != null) { + consumer.assign(Collections.singleton(entry.getKey())); + final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition); + log.debug("Polling {} from {}", entry.getKey(), offset); + consumer.seek(entry.getKey(), offset); + ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); + final List> partitionRecords = + records.records(entry.getKey()).stream() + .filter(r -> r.offset() < partitionsOffsets.get(entry.getKey())) + .collect(Collectors.toList()); + Collections.reverse(partitionRecords); - log.info("{} records polled", records.count()); - log.info("{} records sent", partitionRecords.size()); - for (ConsumerRecord msg : partitionRecords) { - if (!sink.isCancelled() && !waitingOffsets.beginReached()) { - sink.next(msg); - waitingOffsets.markPolled(msg); - } else { - log.info("Begin reached"); - break; + log.debug("{} records polled", records.count()); + log.debug("{} records sent", partitionRecords.size()); + for (ConsumerRecord msg : partitionRecords) { + if (!sink.isCancelled() && !waitingOffsets.beginReached()) { + sink.next(msg); + waitingOffsets.markPolled(msg); + } else { + log.info("Begin reached"); + break; + } } + partitionsOffsets.put( + entry.getKey(), + Math.max(offset, entry.getValue() - msgsPerPartition) + ); } - partitionsOffsets.put( - entry.getKey(), - Math.max(offset, entry.getValue() - msgsPerPartition) - ); } if (waitingOffsets.beginReached()) { log.info("begin reached after partitions"); diff --git a/kafka-ui-api/src/main/resources/application-sdp.yml b/kafka-ui-api/src/main/resources/application-sdp.yml index cde2ab2e59..71af1078f9 100644 --- a/kafka-ui-api/src/main/resources/application-sdp.yml +++ b/kafka-ui-api/src/main/resources/application-sdp.yml @@ -3,7 +3,7 @@ kafka: - name: local bootstrapServers: b-1.kad-msk.uxahxx.c6.kafka.eu-west-1.amazonaws.com:9092 # zookeeper: localhost:2181 -# schemaRegistry: http://localhost:8083 + schemaRegistry: http://kad-ecs-application-lb-857515197.eu-west-1.elb.amazonaws.com:9000/api/schema-registry # - # name: secondLocal # zookeeper: zookeeper1:2181