iliax hai 1 ano
pai
achega
7d1d2fbc47

+ 9 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java

@@ -10,6 +10,7 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
@@ -51,6 +52,13 @@ public class SeekOperations {
     return offsetsInfo.assignedPartitionsFullyPolled();
   }
 
+  //TODO: document
+  public long offsetsProcessedFromSeek() {
+    MutableLong count = new MutableLong();
+    offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));
+    return count.getValue();
+  }
+
   // Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
   public Map<TopicPartition, Long> getOffsetsForSeek() {
     return offsetsForSeek;
@@ -100,7 +108,7 @@ public class SeekOperations {
   }
 
   private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
-                                                        Map<TopicPartition, Long> timestamps) {
+                                                               Map<TopicPartition, Long> timestamps) {
     timestamps = new HashMap<>(timestamps);
     timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
 

+ 21 - 16
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,14 +1,12 @@
 package com.provectus.kafka.ui.service.analyze;
 
-import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
+import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
+
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
-import com.provectus.kafka.ui.emitter.OffsetsInfo;
-import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.emitter.SeekOperations;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
 import com.provectus.kafka.ui.service.TopicsService;
@@ -18,16 +16,14 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 
@@ -36,6 +32,14 @@ import reactor.core.scheduler.Schedulers;
 @RequiredArgsConstructor
 public class TopicAnalysisService {
 
+  private static final Scheduler SCHEDULER = Schedulers.newBoundedElastic(
+      Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
+      Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+      "topic-analysis-tasks",
+      10, //ttl for idle threads (in sec)
+      true //daemon
+  );
+
   private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();
 
   private final TopicsService topicsService;
@@ -64,7 +68,7 @@ public class TopicAnalysisService {
     }
     var task = new AnalysisTask(cluster, topicId, approxNumberOfMsgs);
     analysisTasksStore.registerNewTask(topicId, task);
-    Schedulers.boundedElastic().schedule(task);
+    SCHEDULER.schedule(task);
   }
 
   public void cancelAnalysis(KafkaCluster cluster, String topicName) {
@@ -109,10 +113,10 @@ public class TopicAnalysisService {
     public void run() {
       try {
         log.info("Starting {} topic analysis", topicId);
-        var seekOperations = SeekOperations.create(
-            consumer,
-            new ConsumerPosition(SeekTypeDTO.BEGINNING, topicId.topicName, null)
-        );
+        consumer.partitionsFor(topicId.topicName)
+            .forEach(i -> partitionStats.put(i.partition(), new TopicAnalysisStats()));
+
+        var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
         seekOperations.assignAndSeekNonEmptyPartitions();
 
         while (!seekOperations.assignedPartitionsFullyPolled()) {
@@ -121,7 +125,7 @@ public class TopicAnalysisService {
             totalStats.apply(r);
             partitionStats.get(r.partition()).apply(r);
           });
-          updateProgress();
+          updateProgress(seekOperations);
         }
         analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
         log.info("{} topic analysis finished", topicId);
@@ -137,13 +141,14 @@ public class TopicAnalysisService {
       }
     }
 
-    private void updateProgress() {
-      if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) {
+    private void updateProgress(SeekOperations seekOperations) {
+      long processedOffsets = seekOperations.offsetsProcessedFromSeek();
+      if (processedOffsets > 0 && approxNumberOfMsgs != 0) {
         analysisTasksStore.updateProgress(
             topicId,
             totalStats.totalMsgs,
             totalStats.keysSize.sum + totalStats.valuesSize.sum,
-            Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100)
+            Math.min(100.0, (((double) processedOffsets) / approxNumberOfMsgs) * 100)
         );
       }
     }