diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index 9665aad565..c3d30e9ab6 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -212,6 +212,11 @@
groovy-json
${groovy.version}
+
+ org.apache.datasketches
+ datasketches-java
+ ${datasketches-java.version}
+
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
index e30a540c90..ccfe898fdc 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
@@ -11,6 +11,7 @@ import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
import com.provectus.kafka.ui.model.SortOrderDTO;
+import com.provectus.kafka.ui.model.TopicAnalysisDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicCreationDTO;
@@ -19,6 +20,7 @@ import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.service.TopicsService;
+import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import java.util.Comparator;
import java.util.List;
import javax.validation.Valid;
@@ -40,6 +42,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
private static final Integer DEFAULT_PAGE_SIZE = 25;
private final TopicsService topicsService;
+ private final TopicAnalysisService topicAnalysisService;
private final ClusterMapper clusterMapper;
@Override
@@ -181,4 +184,29 @@ public class TopicsController extends AbstractController implements TopicsApi {
topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
.map(ResponseEntity::ok);
}
+
+ @Override
+ public Mono> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
+ return topicAnalysisService.analyze(getCluster(clusterName), topicName)
+ .thenReturn(ResponseEntity.ok().build());
+ }
+
+ @Override
+ public Mono> cancelTopicAnalysis(String clusterName, String topicName,
+ ServerWebExchange exchange) {
+ topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName);
+ return Mono.just(ResponseEntity.ok().build());
+ }
+
+
+ @Override
+ public Mono> getTopicAnalysis(String clusterName,
+ String topicName,
+ ServerWebExchange exchange) {
+ return Mono.just(
+ topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
+ .map(ResponseEntity::ok)
+ .orElseGet(() -> ResponseEntity.notFound().build())
+ );
+ }
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java
index 2c801d3d05..ecd1915316 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java
@@ -26,7 +26,8 @@ public enum ErrorCode {
INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),
RECREATE_TOPIC_TIMEOUT(4015, HttpStatus.REQUEST_TIMEOUT),
INVALID_ENTITY_STATE(4016, HttpStatus.BAD_REQUEST),
- SCHEMA_NOT_DELETED(4017, HttpStatus.INTERNAL_SERVER_ERROR);
+ SCHEMA_NOT_DELETED(4017, HttpStatus.INTERNAL_SERVER_ERROR),
+ TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST);
static {
// codes uniqueness check
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicAnalysisException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicAnalysisException.java
new file mode 100644
index 0000000000..ecf80febc1
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicAnalysisException.java
@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.exception;
+
+public class TopicAnalysisException extends CustomBaseException {
+
+ public TopicAnalysisException(String message) {
+ super(message);
+ }
+
+ @Override
+ public ErrorCode getErrorCode() {
+ return ErrorCode.TOPIC_ANALYSIS_ERROR;
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java
new file mode 100644
index 0000000000..d5ed1a36d6
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java
@@ -0,0 +1,117 @@
+package com.provectus.kafka.ui.service.analyze;
+
+import com.google.common.base.Throwables;
+import com.provectus.kafka.ui.model.TopicAnalysisDTO;
+import com.provectus.kafka.ui.model.TopicAnalysisProgressDTO;
+import com.provectus.kafka.ui.model.TopicAnalysisResultDTO;
+import java.io.Closeable;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.Value;
+
+class AnalysisTasksStore {
+
+ private final Map running = new ConcurrentHashMap<>();
+ private final Map completed = new ConcurrentHashMap<>();
+
+ void setAnalysisError(TopicIdentity topicId,
+ Instant collectionStartedAt,
+ Throwable th) {
+ running.remove(topicId);
+ completed.put(
+ topicId,
+ new TopicAnalysisResultDTO()
+ .startedAt(collectionStartedAt.toEpochMilli())
+ .finishedAt(System.currentTimeMillis())
+ .error(Throwables.getStackTraceAsString(th))
+ );
+ }
+
+ void setAnalysisResult(TopicIdentity topicId,
+ Instant collectionStartedAt,
+ TopicAnalysisStats totalStats,
+ Map partitionStats) {
+ running.remove(topicId);
+ completed.put(topicId,
+ new TopicAnalysisResultDTO()
+ .startedAt(collectionStartedAt.toEpochMilli())
+ .finishedAt(System.currentTimeMillis())
+ .totalStats(totalStats.toDto(null))
+ .partitionStats(
+ partitionStats.entrySet().stream()
+ .map(e -> e.getValue().toDto(e.getKey()))
+ .collect(Collectors.toList())
+ ));
+ }
+
+ void updateProgress(TopicIdentity topicId,
+ long msgsScanned,
+ long bytesScanned,
+ Double completeness) {
+ running.computeIfPresent(topicId, (k, state) ->
+ state.toBuilder()
+ .msgsScanned(msgsScanned)
+ .bytesScanned(bytesScanned)
+ .completenessPercent(completeness)
+ .build());
+ }
+
+ void registerNewTask(TopicIdentity topicId, Closeable task) {
+ running.put(topicId, new RunningAnalysis(Instant.now(), 0.0, 0, 0, task));
+ }
+
+ void cancelAnalysis(TopicIdentity topicId) {
+ Optional.ofNullable(running.remove(topicId))
+ .ifPresent(RunningAnalysis::stopTask);
+ }
+
+ boolean isAnalysisInProgress(TopicIdentity id) {
+ return running.containsKey(id);
+ }
+
+ Optional getTopicAnalysis(TopicIdentity id) {
+ var runningState = running.get(id);
+ var completedState = completed.get(id);
+ if (runningState == null && completedState == null) {
+ return Optional.empty();
+ }
+ return Optional.of(createAnalysisDto(runningState, completedState));
+ }
+
+ private TopicAnalysisDTO createAnalysisDto(@Nullable RunningAnalysis runningState,
+ @Nullable TopicAnalysisResultDTO completedState) {
+ return new TopicAnalysisDTO()
+ .progress(runningState != null ? runningState.toDto() : null)
+ .result(completedState);
+ }
+
+ @Value
+ @Builder(toBuilder = true)
+ private static class RunningAnalysis {
+ Instant startedAt;
+ double completenessPercent;
+ long msgsScanned;
+ long bytesScanned;
+ Closeable task;
+
+ TopicAnalysisProgressDTO toDto() {
+ return new TopicAnalysisProgressDTO()
+ .startedAt(startedAt.toEpochMilli())
+ .bytesScanned(bytesScanned)
+ .msgsScanned(msgsScanned)
+ .completenessPercent(BigDecimal.valueOf(completenessPercent));
+ }
+
+ @SneakyThrows
+ void stopTask() {
+ task.close();
+ }
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
new file mode 100644
index 0000000000..757b469a6b
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
@@ -0,0 +1,156 @@
+package com.provectus.kafka.ui.service.analyze;
+
+import com.provectus.kafka.ui.exception.TopicAnalysisException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.TopicAnalysisDTO;
+import com.provectus.kafka.ui.service.ConsumerGroupService;
+import com.provectus.kafka.ui.service.TopicsService;
+import com.provectus.kafka.ui.util.OffsetsSeek.WaitingOffsets;
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Bytes;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class TopicAnalysisService {
+
+ private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();
+
+ private final TopicsService topicsService;
+ private final ConsumerGroupService consumerGroupService;
+
+ public Mono analyze(KafkaCluster cluster, String topicName) {
+ return topicsService.getTopicDetails(cluster, topicName)
+ .doOnNext(topic ->
+ startAnalysis(
+ cluster,
+ topicName,
+ topic.getPartitionCount(),
+ topic.getPartitions().values()
+ .stream()
+ .mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
+ .sum()
+ )
+ ).then();
+ }
+
+ private synchronized void startAnalysis(KafkaCluster cluster,
+ String topic,
+ int partitionsCnt,
+ long approxNumberOfMsgs) {
+ var topicId = new TopicIdentity(cluster, topic);
+ if (analysisTasksStore.isAnalysisInProgress(topicId)) {
+ throw new TopicAnalysisException("Topic is already analyzing");
+ }
+ var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs);
+ analysisTasksStore.registerNewTask(topicId, task);
+ Schedulers.boundedElastic().schedule(task);
+ }
+
+ public void cancelAnalysis(KafkaCluster cluster, String topicName) {
+ analysisTasksStore.cancelAnalysis(new TopicIdentity(cluster, topicName));
+ }
+
+ public Optional getTopicAnalysis(KafkaCluster cluster, String topicName) {
+ return analysisTasksStore.getTopicAnalysis(new TopicIdentity(cluster, topicName));
+ }
+
+ class AnalysisTask implements Runnable, Closeable {
+
+ private final Instant startedAt = Instant.now();
+
+ private final TopicIdentity topicId;
+ private final int partitionsCnt;
+ private final long approxNumberOfMsgs;
+
+ private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
+ private final Map partitionStats = new HashMap<>();
+
+ private final KafkaConsumer consumer;
+
+ AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt, long approxNumberOfMsgs) {
+ this.topicId = topicId;
+ this.approxNumberOfMsgs = approxNumberOfMsgs;
+ this.partitionsCnt = partitionsCnt;
+ this.consumer = consumerGroupService.createConsumer(
+ cluster,
+ // to improve polling throughput
+ Map.of(
+ ConsumerConfig.RECEIVE_BUFFER_CONFIG, "-1", //let OS tune buffer size
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
+ )
+ );
+ }
+
+ @Override
+ public void close() {
+ consumer.wakeup();
+ }
+
+ @Override
+ public void run() {
+ try {
+ log.info("Starting {} topic analysis", topicId);
+ var topicPartitions = IntStream.range(0, partitionsCnt)
+ .peek(i -> partitionStats.put(i, new TopicAnalysisStats()))
+ .mapToObj(i -> new TopicPartition(topicId.topicName, i))
+ .collect(Collectors.toList());
+
+ consumer.assign(topicPartitions);
+ consumer.seekToBeginning(topicPartitions);
+
+ var waitingOffsets = new WaitingOffsets(topicId.topicName, consumer, topicPartitions);
+ for (int emptyPolls = 0; !waitingOffsets.endReached() && emptyPolls < 3; ) {
+ var polled = consumer.poll(Duration.ofSeconds(3));
+ emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
+ polled.forEach(r -> {
+ totalStats.apply(r);
+ partitionStats.get(r.partition()).apply(r);
+ waitingOffsets.markPolled(r);
+ });
+ updateProgress();
+ }
+ analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
+ log.info("{} topic analysis finished", topicId);
+ } catch (WakeupException | InterruptException cancelException) {
+ log.info("{} topic analysis stopped", topicId);
+ // calling cancel for cases when our thread was interrupted by some non-user cancellation reason
+ analysisTasksStore.cancelAnalysis(topicId);
+ } catch (Throwable th) {
+ log.error("Error analyzing topic {}", topicId, th);
+ analysisTasksStore.setAnalysisError(topicId, startedAt, th);
+ } finally {
+ consumer.close();
+ }
+ }
+
+ private void updateProgress() {
+ if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) {
+ analysisTasksStore.updateProgress(
+ topicId,
+ totalStats.totalMsgs,
+ totalStats.keysSize.sum + totalStats.valuesSize.sum,
+ Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100)
+ );
+ }
+ }
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisStats.java
new file mode 100644
index 0000000000..2d8e0dc38f
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisStats.java
@@ -0,0 +1,141 @@
+package com.provectus.kafka.ui.service.analyze;
+
+import com.provectus.kafka.ui.model.TopicAnalysisSizeStatsDTO;
+import com.provectus.kafka.ui.model.TopicAnalysisStatsDTO;
+import com.provectus.kafka.ui.model.TopicAnalysisStatsHourlyMsgCountsDTO;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.datasketches.quantiles.DoublesSketch;
+import org.apache.datasketches.quantiles.UpdateDoublesSketch;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+
+class TopicAnalysisStats {
+
+ Long totalMsgs = 0L;
+ Long minOffset;
+ Long maxOffset;
+
+ Long minTimestamp;
+ Long maxTimestamp;
+
+ long nullKeys = 0L;
+ long nullValues = 0L;
+
+ final SizeStats keysSize = new SizeStats();
+ final SizeStats valuesSize = new SizeStats();
+
+ final HllSketch uniqKeys = new HllSketch();
+ final HllSketch uniqValues = new HllSketch();
+
+ final HourlyCounts hourlyCounts = new HourlyCounts();
+
+ static class SizeStats {
+ long sum = 0;
+ Long min;
+ Long max;
+ final UpdateDoublesSketch sizeSketch = DoublesSketch.builder().build();
+
+ void apply(byte[] bytes) {
+ int len = bytes.length;
+ sum += len;
+ min = minNullable(min, len);
+ max = maxNullable(max, len);
+ sizeSketch.update(len);
+ }
+
+ TopicAnalysisSizeStatsDTO toDto() {
+ return new TopicAnalysisSizeStatsDTO()
+ .sum(sum)
+ .min(min)
+ .max(max)
+ .avg((long) (((double) sum) / sizeSketch.getN()))
+ .prctl50((long) sizeSketch.getQuantile(0.5))
+ .prctl75((long) sizeSketch.getQuantile(0.75))
+ .prctl95((long) sizeSketch.getQuantile(0.95))
+ .prctl99((long) sizeSketch.getQuantile(0.99))
+ .prctl999((long) sizeSketch.getQuantile(0.999));
+ }
+ }
+
+ static class HourlyCounts {
+
+ // hour start ms -> count
+ private final Map hourlyStats = new HashMap<>();
+ private final long minTs = Instant.now().minus(Duration.ofDays(14)).toEpochMilli();
+
+ void apply(ConsumerRecord, ?> rec) {
+ if (rec.timestamp() > minTs) {
+ var hourStart = rec.timestamp() - rec.timestamp() % (1_000 * 60 * 60);
+ hourlyStats.compute(hourStart, (h, cnt) -> cnt == null ? 1 : cnt + 1);
+ }
+ }
+
+ List toDto() {
+ return hourlyStats.entrySet().stream()
+ .sorted(Comparator.comparingLong(Map.Entry::getKey))
+ .map(e -> new TopicAnalysisStatsHourlyMsgCountsDTO()
+ .hourStart(e.getKey())
+ .count(e.getValue()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ void apply(ConsumerRecord rec) {
+ totalMsgs++;
+ minTimestamp = minNullable(minTimestamp, rec.timestamp());
+ maxTimestamp = maxNullable(maxTimestamp, rec.timestamp());
+ minOffset = minNullable(minOffset, rec.offset());
+ maxOffset = maxNullable(maxOffset, rec.offset());
+ hourlyCounts.apply(rec);
+
+ if (rec.key() != null) {
+ byte[] keyBytes = rec.key().get();
+ keysSize.apply(keyBytes);
+ uniqKeys.update(keyBytes);
+ } else {
+ nullKeys++;
+ }
+
+ if (rec.value() != null) {
+ byte[] valueBytes = rec.value().get();
+ valuesSize.apply(valueBytes);
+ uniqValues.update(valueBytes);
+ } else {
+ nullValues++;
+ }
+ }
+
+ TopicAnalysisStatsDTO toDto(@Nullable Integer partition) {
+ return new TopicAnalysisStatsDTO()
+ .partition(partition)
+ .totalMsgs(totalMsgs)
+ .minOffset(minOffset)
+ .maxOffset(maxOffset)
+ .minTimestamp(minTimestamp)
+ .maxTimestamp(maxTimestamp)
+ .nullKeys(nullKeys)
+ .nullValues(nullValues)
+ // because of hll error estimated size can be greater that actual msgs count
+ .approxUniqKeys(Math.min(totalMsgs, (long) uniqKeys.getEstimate()))
+ .approxUniqValues(Math.min(totalMsgs, (long) uniqValues.getEstimate()))
+ .keySize(keysSize.toDto())
+ .valueSize(valuesSize.toDto())
+ .hourlyMsgCounts(hourlyCounts.toDto());
+ }
+
+ private static Long maxNullable(@Nullable Long v1, long v2) {
+ return v1 == null ? v2 : Math.max(v1, v2);
+ }
+
+ private static Long minNullable(@Nullable Long v1, long v2) {
+ return v1 == null ? v2 : Math.min(v1, v2);
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicIdentity.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicIdentity.java
new file mode 100644
index 0000000000..bfe75c1772
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicIdentity.java
@@ -0,0 +1,17 @@
+package com.provectus.kafka.ui.service.analyze;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@ToString
+@EqualsAndHashCode
+class TopicIdentity {
+ final String clusterName;
+ final String topicName;
+
+ public TopicIdentity(KafkaCluster cluster, String topic) {
+ this.clusterName = cluster.getName();
+ this.topicName = topic;
+ }
+}
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
index 76b1e8c4cb..42e44b0897 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
@@ -17,6 +17,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicDTO;
+import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.util.JmxClusterUtil;
import java.util.ArrayList;
import java.util.Comparator;
@@ -41,7 +42,8 @@ class TopicsServicePaginationTest {
private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
private final ClusterMapper clusterMapper = new ClusterMapperImpl();
- private final TopicsController topicsController = new TopicsController(topicsService, clusterMapper);
+ private final TopicsController topicsController = new TopicsController(
+ topicsService, mock(TopicAnalysisService.class), clusterMapper);
private void init(Map topicsInCache) {
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisServiceTest.java
new file mode 100644
index 0000000000..7d02219e9c
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisServiceTest.java
@@ -0,0 +1,62 @@
+package com.provectus.kafka.ui.service.analyze;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import com.provectus.kafka.ui.service.ClustersStorage;
+import java.time.Duration;
+import java.util.UUID;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+
+class TopicAnalysisServiceTest extends AbstractIntegrationTest {
+
+ @Autowired
+ private ClustersStorage clustersStorage;
+
+ @Autowired
+ private TopicAnalysisService topicAnalysisService;
+
+ @Test
+ void savesResultWhenAnalysisIsCompleted() {
+ String topic = "analyze_test_" + UUID.randomUUID();
+ createTopic(new NewTopic(topic, 2, (short) 1));
+ fillTopic(topic, 1_000);
+
+ var cluster = clustersStorage.getClusterByName(LOCAL).get();
+ topicAnalysisService.analyze(cluster, topic).block();
+
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(20))
+ .untilAsserted(() -> {
+ assertThat(topicAnalysisService.getTopicAnalysis(cluster, topic))
+ .hasValueSatisfying(state -> {
+ assertThat(state.getProgress()).isNull();
+ assertThat(state.getResult()).isNotNull();
+ var completedAnalyze = state.getResult();
+ assertThat(completedAnalyze.getTotalStats().getTotalMsgs()).isEqualTo(1_000);
+ assertThat(completedAnalyze.getPartitionStats().size()).isEqualTo(2);
+ });
+ });
+ }
+
+ private void fillTopic(String topic, int cnt) {
+ try (var producer = KafkaTestProducer.forKafka(kafka)) {
+ for (int i = 0; i < cnt; i++) {
+ producer.send(
+ new ProducerRecord<>(
+ topic,
+ RandomStringUtils.randomAlphabetic(5),
+ RandomStringUtils.randomAlphabetic(10)));
+ }
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
index 1416609982..63f313e2ac 100644
--- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
@@ -363,6 +363,76 @@ paths:
404:
description: Not found
+ /api/clusters/{clusterName}/topics/{topicName}/analysis:
+ get:
+ tags:
+ - Topics
+ summary: getTopicAnalysis
+ operationId: getTopicAnalysis
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: topicName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/TopicAnalysis'
+ 404:
+ description: Not found
+ post:
+ tags:
+ - Topics
+ summary: analyzeTopic
+ operationId: analyzeTopic
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: topicName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: Analysis started
+ 404:
+ description: Not found
+ delete:
+ tags:
+ - Topics
+ summary: cancelTopicAnalysis
+ operationId: cancelTopicAnalysis
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: topicName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: Analysis cancelled
+ 404:
+ description: Not found
+
+
/api/clusters/{clusterName}/topics/{topicName}:
get:
tags:
@@ -1911,6 +1981,130 @@ components:
required:
- name
+ TopicAnalysis:
+ type: object
+ description: "Represents analysis state. Note: 'progress' and 'result' fields are set exclusively depending on analysis state."
+ properties:
+ progress:
+ $ref: '#/components/schemas/TopicAnalysisProgress'
+ result:
+ $ref: '#/components/schemas/TopicAnalysisResult'
+
+ TopicAnalysisProgress:
+ type: object
+ properties:
+ startedAt:
+ type: integer
+ format: int64
+ completenessPercent:
+ type: number
+ msgsScanned:
+ type: integer
+ format: int64
+ bytesScanned:
+ type: integer
+ format: int64
+
+ TopicAnalysisResult:
+ type: object
+ properties:
+ startedAt:
+ type: integer
+ format: int64
+ finishedAt:
+ type: integer
+ format: int64
+ error:
+ type: string
+ totalStats:
+ $ref: '#/components/schemas/TopicAnalysisStats'
+ partitionStats:
+ type: array
+ items:
+ $ref: "#/components/schemas/TopicAnalysisStats"
+
+ TopicAnalysisStats:
+ type: object
+ properties:
+ partition:
+ type: integer
+ format: int32
+ description: "null if this is total stats"
+ totalMsgs:
+ type: integer
+ format: int64
+ minOffset:
+ type: integer
+ format: int64
+ maxOffset:
+ type: integer
+ format: int64
+ minTimestamp:
+ type: integer
+ format: int64
+ maxTimestamp:
+ type: integer
+ format: int64
+ nullKeys:
+ type: integer
+ format: int64
+ nullValues:
+ type: integer
+ format: int64
+ approxUniqKeys:
+ type: integer
+ format: int64
+ approxUniqValues:
+ type: integer
+ format: int64
+ keySize:
+ $ref: "#/components/schemas/TopicAnalysisSizeStats"
+ valueSize:
+ $ref: "#/components/schemas/TopicAnalysisSizeStats"
+ hourlyMsgCounts:
+ type: array
+ items:
+ type: object
+ properties:
+ hourStart:
+ type: integer
+ format: int64
+ count:
+ type: integer
+ format: int64
+
+ TopicAnalysisSizeStats:
+ type: object
+ description: "All sizes in bytes"
+ properties:
+ sum:
+ type: integer
+ format: int64
+ min:
+ type: integer
+ format: int64
+ max:
+ type: integer
+ format: int64
+ avg:
+ type: integer
+ format: int64
+ prctl50:
+ type: integer
+ format: int64
+ prctl75:
+ type: integer
+ format: int64
+ prctl95:
+ type: integer
+ format: int64
+ prctl99:
+ type: integer
+ format: int64
+ prctl999:
+ type: integer
+ format: int64
+
Replica:
type: object
properties:
diff --git a/pom.xml b/pom.xml
index a73aaf0825..924108bd76 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,7 @@
3.19.0
4.7.1
3.0.9
+ 3.1.0
..//kafka-ui-react-app/src/generated-sources