From 396341dbf7a52fa8fc9daf7c444893f1a29fa520 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Tue, 14 Jun 2022 15:31:00 +0400 Subject: [PATCH] Backend: topic analysis (#1965) * Topic Analyzer implementation * small cleanup * hourly stats added * imports fix * compilation fixes * PR fixes * PR fixes (renaming) * tests compilation fix * apis improved * checkstyle fix * renaming * node version rollback * Fix naming Signed-off-by: Roman Zabaluev Co-authored-by: iliax Co-authored-by: Roman Zabaluev --- kafka-ui-api/pom.xml | 5 + .../kafka/ui/controller/TopicsController.java | 28 +++ .../kafka/ui/exception/ErrorCode.java | 3 +- .../ui/exception/TopicAnalysisException.java | 13 ++ .../service/analyze/AnalysisTasksStore.java | 117 +++++++++++ .../service/analyze/TopicAnalysisService.java | 156 ++++++++++++++ .../service/analyze/TopicAnalysisStats.java | 141 +++++++++++++ .../ui/service/analyze/TopicIdentity.java | 17 ++ .../service/TopicsServicePaginationTest.java | 4 +- .../analyze/TopicAnalysisServiceTest.java | 62 ++++++ .../main/resources/swagger/kafka-ui-api.yaml | 194 ++++++++++++++++++ pom.xml | 1 + 12 files changed, 739 insertions(+), 2 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicAnalysisException.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisStats.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicIdentity.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisServiceTest.java diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 9665aad565..c3d30e9ab6 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -212,6 +212,11 @@ groovy-json ${groovy.version} + + org.apache.datasketches + datasketches-java + ${datasketches-java.version} + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index e30a540c90..ccfe898fdc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -11,6 +11,7 @@ import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO; import com.provectus.kafka.ui.model.SortOrderDTO; +import com.provectus.kafka.ui.model.TopicAnalysisDTO; import com.provectus.kafka.ui.model.TopicColumnsToSortDTO; import com.provectus.kafka.ui.model.TopicConfigDTO; import com.provectus.kafka.ui.model.TopicCreationDTO; @@ -19,6 +20,7 @@ import com.provectus.kafka.ui.model.TopicDetailsDTO; import com.provectus.kafka.ui.model.TopicUpdateDTO; import com.provectus.kafka.ui.model.TopicsResponseDTO; import com.provectus.kafka.ui.service.TopicsService; +import com.provectus.kafka.ui.service.analyze.TopicAnalysisService; import java.util.Comparator; import java.util.List; import javax.validation.Valid; @@ -40,6 +42,7 @@ public class TopicsController extends AbstractController implements TopicsApi { private static final Integer DEFAULT_PAGE_SIZE = 25; private final TopicsService topicsService; + private final TopicAnalysisService topicAnalysisService; private final ClusterMapper clusterMapper; @Override @@ -181,4 +184,29 @@ public class TopicsController extends AbstractController implements TopicsApi { topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc)) .map(ResponseEntity::ok); } + + @Override + public Mono> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) { + return topicAnalysisService.analyze(getCluster(clusterName), topicName) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono> cancelTopicAnalysis(String clusterName, String topicName, + ServerWebExchange exchange) { + topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName); + return Mono.just(ResponseEntity.ok().build()); + } + + + @Override + public Mono> getTopicAnalysis(String clusterName, + String topicName, + ServerWebExchange exchange) { + return Mono.just( + topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName) + .map(ResponseEntity::ok) + .orElseGet(() -> ResponseEntity.notFound().build()) + ); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 2c801d3d05..ecd1915316 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -26,7 +26,8 @@ public enum ErrorCode { INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST), RECREATE_TOPIC_TIMEOUT(4015, HttpStatus.REQUEST_TIMEOUT), INVALID_ENTITY_STATE(4016, HttpStatus.BAD_REQUEST), - SCHEMA_NOT_DELETED(4017, HttpStatus.INTERNAL_SERVER_ERROR); + SCHEMA_NOT_DELETED(4017, HttpStatus.INTERNAL_SERVER_ERROR), + TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST); static { // codes uniqueness check diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicAnalysisException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicAnalysisException.java new file mode 100644 index 0000000000..ecf80febc1 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicAnalysisException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class TopicAnalysisException extends CustomBaseException { + + public TopicAnalysisException(String message) { + super(message); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.TOPIC_ANALYSIS_ERROR; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java new file mode 100644 index 0000000000..d5ed1a36d6 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java @@ -0,0 +1,117 @@ +package com.provectus.kafka.ui.service.analyze; + +import com.google.common.base.Throwables; +import com.provectus.kafka.ui.model.TopicAnalysisDTO; +import com.provectus.kafka.ui.model.TopicAnalysisProgressDTO; +import com.provectus.kafka.ui.model.TopicAnalysisResultDTO; +import java.io.Closeable; +import java.math.BigDecimal; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.SneakyThrows; +import lombok.Value; + +class AnalysisTasksStore { + + private final Map running = new ConcurrentHashMap<>(); + private final Map completed = new ConcurrentHashMap<>(); + + void setAnalysisError(TopicIdentity topicId, + Instant collectionStartedAt, + Throwable th) { + running.remove(topicId); + completed.put( + topicId, + new TopicAnalysisResultDTO() + .startedAt(collectionStartedAt.toEpochMilli()) + .finishedAt(System.currentTimeMillis()) + .error(Throwables.getStackTraceAsString(th)) + ); + } + + void setAnalysisResult(TopicIdentity topicId, + Instant collectionStartedAt, + TopicAnalysisStats totalStats, + Map partitionStats) { + running.remove(topicId); + completed.put(topicId, + new TopicAnalysisResultDTO() + .startedAt(collectionStartedAt.toEpochMilli()) + .finishedAt(System.currentTimeMillis()) + .totalStats(totalStats.toDto(null)) + .partitionStats( + partitionStats.entrySet().stream() + .map(e -> e.getValue().toDto(e.getKey())) + .collect(Collectors.toList()) + )); + } + + void updateProgress(TopicIdentity topicId, + long msgsScanned, + long bytesScanned, + Double completeness) { + running.computeIfPresent(topicId, (k, state) -> + state.toBuilder() + .msgsScanned(msgsScanned) + .bytesScanned(bytesScanned) + .completenessPercent(completeness) + .build()); + } + + void registerNewTask(TopicIdentity topicId, Closeable task) { + running.put(topicId, new RunningAnalysis(Instant.now(), 0.0, 0, 0, task)); + } + + void cancelAnalysis(TopicIdentity topicId) { + Optional.ofNullable(running.remove(topicId)) + .ifPresent(RunningAnalysis::stopTask); + } + + boolean isAnalysisInProgress(TopicIdentity id) { + return running.containsKey(id); + } + + Optional getTopicAnalysis(TopicIdentity id) { + var runningState = running.get(id); + var completedState = completed.get(id); + if (runningState == null && completedState == null) { + return Optional.empty(); + } + return Optional.of(createAnalysisDto(runningState, completedState)); + } + + private TopicAnalysisDTO createAnalysisDto(@Nullable RunningAnalysis runningState, + @Nullable TopicAnalysisResultDTO completedState) { + return new TopicAnalysisDTO() + .progress(runningState != null ? runningState.toDto() : null) + .result(completedState); + } + + @Value + @Builder(toBuilder = true) + private static class RunningAnalysis { + Instant startedAt; + double completenessPercent; + long msgsScanned; + long bytesScanned; + Closeable task; + + TopicAnalysisProgressDTO toDto() { + return new TopicAnalysisProgressDTO() + .startedAt(startedAt.toEpochMilli()) + .bytesScanned(bytesScanned) + .msgsScanned(msgsScanned) + .completenessPercent(BigDecimal.valueOf(completenessPercent)); + } + + @SneakyThrows + void stopTask() { + task.close(); + } + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java new file mode 100644 index 0000000000..757b469a6b --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java @@ -0,0 +1,156 @@ +package com.provectus.kafka.ui.service.analyze; + +import com.provectus.kafka.ui.exception.TopicAnalysisException; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.TopicAnalysisDTO; +import com.provectus.kafka.ui.service.ConsumerGroupService; +import com.provectus.kafka.ui.service.TopicsService; +import com.provectus.kafka.ui.util.OffsetsSeek.WaitingOffsets; +import java.io.Closeable; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Bytes; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + + +@Slf4j +@Component +@RequiredArgsConstructor +public class TopicAnalysisService { + + private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore(); + + private final TopicsService topicsService; + private final ConsumerGroupService consumerGroupService; + + public Mono analyze(KafkaCluster cluster, String topicName) { + return topicsService.getTopicDetails(cluster, topicName) + .doOnNext(topic -> + startAnalysis( + cluster, + topicName, + topic.getPartitionCount(), + topic.getPartitions().values() + .stream() + .mapToLong(p -> p.getOffsetMax() - p.getOffsetMin()) + .sum() + ) + ).then(); + } + + private synchronized void startAnalysis(KafkaCluster cluster, + String topic, + int partitionsCnt, + long approxNumberOfMsgs) { + var topicId = new TopicIdentity(cluster, topic); + if (analysisTasksStore.isAnalysisInProgress(topicId)) { + throw new TopicAnalysisException("Topic is already analyzing"); + } + var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs); + analysisTasksStore.registerNewTask(topicId, task); + Schedulers.boundedElastic().schedule(task); + } + + public void cancelAnalysis(KafkaCluster cluster, String topicName) { + analysisTasksStore.cancelAnalysis(new TopicIdentity(cluster, topicName)); + } + + public Optional getTopicAnalysis(KafkaCluster cluster, String topicName) { + return analysisTasksStore.getTopicAnalysis(new TopicIdentity(cluster, topicName)); + } + + class AnalysisTask implements Runnable, Closeable { + + private final Instant startedAt = Instant.now(); + + private final TopicIdentity topicId; + private final int partitionsCnt; + private final long approxNumberOfMsgs; + + private final TopicAnalysisStats totalStats = new TopicAnalysisStats(); + private final Map partitionStats = new HashMap<>(); + + private final KafkaConsumer consumer; + + AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt, long approxNumberOfMsgs) { + this.topicId = topicId; + this.approxNumberOfMsgs = approxNumberOfMsgs; + this.partitionsCnt = partitionsCnt; + this.consumer = consumerGroupService.createConsumer( + cluster, + // to improve polling throughput + Map.of( + ConsumerConfig.RECEIVE_BUFFER_CONFIG, "-1", //let OS tune buffer size + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000" + ) + ); + } + + @Override + public void close() { + consumer.wakeup(); + } + + @Override + public void run() { + try { + log.info("Starting {} topic analysis", topicId); + var topicPartitions = IntStream.range(0, partitionsCnt) + .peek(i -> partitionStats.put(i, new TopicAnalysisStats())) + .mapToObj(i -> new TopicPartition(topicId.topicName, i)) + .collect(Collectors.toList()); + + consumer.assign(topicPartitions); + consumer.seekToBeginning(topicPartitions); + + var waitingOffsets = new WaitingOffsets(topicId.topicName, consumer, topicPartitions); + for (int emptyPolls = 0; !waitingOffsets.endReached() && emptyPolls < 3; ) { + var polled = consumer.poll(Duration.ofSeconds(3)); + emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0; + polled.forEach(r -> { + totalStats.apply(r); + partitionStats.get(r.partition()).apply(r); + waitingOffsets.markPolled(r); + }); + updateProgress(); + } + analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats); + log.info("{} topic analysis finished", topicId); + } catch (WakeupException | InterruptException cancelException) { + log.info("{} topic analysis stopped", topicId); + // calling cancel for cases when our thread was interrupted by some non-user cancellation reason + analysisTasksStore.cancelAnalysis(topicId); + } catch (Throwable th) { + log.error("Error analyzing topic {}", topicId, th); + analysisTasksStore.setAnalysisError(topicId, startedAt, th); + } finally { + consumer.close(); + } + } + + private void updateProgress() { + if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) { + analysisTasksStore.updateProgress( + topicId, + totalStats.totalMsgs, + totalStats.keysSize.sum + totalStats.valuesSize.sum, + Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100) + ); + } + } + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisStats.java new file mode 100644 index 0000000000..2d8e0dc38f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisStats.java @@ -0,0 +1,141 @@ +package com.provectus.kafka.ui.service.analyze; + +import com.provectus.kafka.ui.model.TopicAnalysisSizeStatsDTO; +import com.provectus.kafka.ui.model.TopicAnalysisStatsDTO; +import com.provectus.kafka.ui.model.TopicAnalysisStatsHourlyMsgCountsDTO; +import java.time.Duration; +import java.time.Instant; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.quantiles.DoublesSketch; +import org.apache.datasketches.quantiles.UpdateDoublesSketch; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; + +class TopicAnalysisStats { + + Long totalMsgs = 0L; + Long minOffset; + Long maxOffset; + + Long minTimestamp; + Long maxTimestamp; + + long nullKeys = 0L; + long nullValues = 0L; + + final SizeStats keysSize = new SizeStats(); + final SizeStats valuesSize = new SizeStats(); + + final HllSketch uniqKeys = new HllSketch(); + final HllSketch uniqValues = new HllSketch(); + + final HourlyCounts hourlyCounts = new HourlyCounts(); + + static class SizeStats { + long sum = 0; + Long min; + Long max; + final UpdateDoublesSketch sizeSketch = DoublesSketch.builder().build(); + + void apply(byte[] bytes) { + int len = bytes.length; + sum += len; + min = minNullable(min, len); + max = maxNullable(max, len); + sizeSketch.update(len); + } + + TopicAnalysisSizeStatsDTO toDto() { + return new TopicAnalysisSizeStatsDTO() + .sum(sum) + .min(min) + .max(max) + .avg((long) (((double) sum) / sizeSketch.getN())) + .prctl50((long) sizeSketch.getQuantile(0.5)) + .prctl75((long) sizeSketch.getQuantile(0.75)) + .prctl95((long) sizeSketch.getQuantile(0.95)) + .prctl99((long) sizeSketch.getQuantile(0.99)) + .prctl999((long) sizeSketch.getQuantile(0.999)); + } + } + + static class HourlyCounts { + + // hour start ms -> count + private final Map hourlyStats = new HashMap<>(); + private final long minTs = Instant.now().minus(Duration.ofDays(14)).toEpochMilli(); + + void apply(ConsumerRecord rec) { + if (rec.timestamp() > minTs) { + var hourStart = rec.timestamp() - rec.timestamp() % (1_000 * 60 * 60); + hourlyStats.compute(hourStart, (h, cnt) -> cnt == null ? 1 : cnt + 1); + } + } + + List toDto() { + return hourlyStats.entrySet().stream() + .sorted(Comparator.comparingLong(Map.Entry::getKey)) + .map(e -> new TopicAnalysisStatsHourlyMsgCountsDTO() + .hourStart(e.getKey()) + .count(e.getValue())) + .collect(Collectors.toList()); + } + } + + void apply(ConsumerRecord rec) { + totalMsgs++; + minTimestamp = minNullable(minTimestamp, rec.timestamp()); + maxTimestamp = maxNullable(maxTimestamp, rec.timestamp()); + minOffset = minNullable(minOffset, rec.offset()); + maxOffset = maxNullable(maxOffset, rec.offset()); + hourlyCounts.apply(rec); + + if (rec.key() != null) { + byte[] keyBytes = rec.key().get(); + keysSize.apply(keyBytes); + uniqKeys.update(keyBytes); + } else { + nullKeys++; + } + + if (rec.value() != null) { + byte[] valueBytes = rec.value().get(); + valuesSize.apply(valueBytes); + uniqValues.update(valueBytes); + } else { + nullValues++; + } + } + + TopicAnalysisStatsDTO toDto(@Nullable Integer partition) { + return new TopicAnalysisStatsDTO() + .partition(partition) + .totalMsgs(totalMsgs) + .minOffset(minOffset) + .maxOffset(maxOffset) + .minTimestamp(minTimestamp) + .maxTimestamp(maxTimestamp) + .nullKeys(nullKeys) + .nullValues(nullValues) + // because of hll error estimated size can be greater that actual msgs count + .approxUniqKeys(Math.min(totalMsgs, (long) uniqKeys.getEstimate())) + .approxUniqValues(Math.min(totalMsgs, (long) uniqValues.getEstimate())) + .keySize(keysSize.toDto()) + .valueSize(valuesSize.toDto()) + .hourlyMsgCounts(hourlyCounts.toDto()); + } + + private static Long maxNullable(@Nullable Long v1, long v2) { + return v1 == null ? v2 : Math.max(v1, v2); + } + + private static Long minNullable(@Nullable Long v1, long v2) { + return v1 == null ? v2 : Math.min(v1, v2); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicIdentity.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicIdentity.java new file mode 100644 index 0000000000..bfe75c1772 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicIdentity.java @@ -0,0 +1,17 @@ +package com.provectus.kafka.ui.service.analyze; + +import com.provectus.kafka.ui.model.KafkaCluster; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@ToString +@EqualsAndHashCode +class TopicIdentity { + final String clusterName; + final String topicName; + + public TopicIdentity(KafkaCluster cluster, String topic) { + this.clusterName = cluster.getName(); + this.topicName = topic; + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java index 76b1e8c4cb..42e44b0897 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java @@ -17,6 +17,7 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.SortOrderDTO; import com.provectus.kafka.ui.model.TopicColumnsToSortDTO; import com.provectus.kafka.ui.model.TopicDTO; +import com.provectus.kafka.ui.service.analyze.TopicAnalysisService; import com.provectus.kafka.ui.util.JmxClusterUtil; import java.util.ArrayList; import java.util.Comparator; @@ -41,7 +42,8 @@ class TopicsServicePaginationTest { private final ClustersStorage clustersStorage = mock(ClustersStorage.class); private final ClusterMapper clusterMapper = new ClusterMapperImpl(); - private final TopicsController topicsController = new TopicsController(topicsService, clusterMapper); + private final TopicsController topicsController = new TopicsController( + topicsService, mock(TopicAnalysisService.class), clusterMapper); private void init(Map topicsInCache) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisServiceTest.java new file mode 100644 index 0000000000..7d02219e9c --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisServiceTest.java @@ -0,0 +1,62 @@ +package com.provectus.kafka.ui.service.analyze; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.AbstractIntegrationTest; +import com.provectus.kafka.ui.producer.KafkaTestProducer; +import com.provectus.kafka.ui.service.ClustersStorage; +import java.time.Duration; +import java.util.UUID; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.testcontainers.shaded.org.awaitility.Awaitility; + + +class TopicAnalysisServiceTest extends AbstractIntegrationTest { + + @Autowired + private ClustersStorage clustersStorage; + + @Autowired + private TopicAnalysisService topicAnalysisService; + + @Test + void savesResultWhenAnalysisIsCompleted() { + String topic = "analyze_test_" + UUID.randomUUID(); + createTopic(new NewTopic(topic, 2, (short) 1)); + fillTopic(topic, 1_000); + + var cluster = clustersStorage.getClusterByName(LOCAL).get(); + topicAnalysisService.analyze(cluster, topic).block(); + + Awaitility.await() + .atMost(Duration.ofSeconds(20)) + .untilAsserted(() -> { + assertThat(topicAnalysisService.getTopicAnalysis(cluster, topic)) + .hasValueSatisfying(state -> { + assertThat(state.getProgress()).isNull(); + assertThat(state.getResult()).isNotNull(); + var completedAnalyze = state.getResult(); + assertThat(completedAnalyze.getTotalStats().getTotalMsgs()).isEqualTo(1_000); + assertThat(completedAnalyze.getPartitionStats().size()).isEqualTo(2); + }); + }); + } + + private void fillTopic(String topic, int cnt) { + try (var producer = KafkaTestProducer.forKafka(kafka)) { + for (int i = 0; i < cnt; i++) { + producer.send( + new ProducerRecord<>( + topic, + RandomStringUtils.randomAlphabetic(5), + RandomStringUtils.randomAlphabetic(10))); + } + } + } + + +} \ No newline at end of file diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 1416609982..63f313e2ac 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -363,6 +363,76 @@ paths: 404: description: Not found + /api/clusters/{clusterName}/topics/{topicName}/analysis: + get: + tags: + - Topics + summary: getTopicAnalysis + operationId: getTopicAnalysis + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/TopicAnalysis' + 404: + description: Not found + post: + tags: + - Topics + summary: analyzeTopic + operationId: analyzeTopic + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 200: + description: Analysis started + 404: + description: Not found + delete: + tags: + - Topics + summary: cancelTopicAnalysis + operationId: cancelTopicAnalysis + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 200: + description: Analysis cancelled + 404: + description: Not found + + /api/clusters/{clusterName}/topics/{topicName}: get: tags: @@ -1911,6 +1981,130 @@ components: required: - name + TopicAnalysis: + type: object + description: "Represents analysis state. Note: 'progress' and 'result' fields are set exclusively depending on analysis state." + properties: + progress: + $ref: '#/components/schemas/TopicAnalysisProgress' + result: + $ref: '#/components/schemas/TopicAnalysisResult' + + TopicAnalysisProgress: + type: object + properties: + startedAt: + type: integer + format: int64 + completenessPercent: + type: number + msgsScanned: + type: integer + format: int64 + bytesScanned: + type: integer + format: int64 + + TopicAnalysisResult: + type: object + properties: + startedAt: + type: integer + format: int64 + finishedAt: + type: integer + format: int64 + error: + type: string + totalStats: + $ref: '#/components/schemas/TopicAnalysisStats' + partitionStats: + type: array + items: + $ref: "#/components/schemas/TopicAnalysisStats" + + TopicAnalysisStats: + type: object + properties: + partition: + type: integer + format: int32 + description: "null if this is total stats" + totalMsgs: + type: integer + format: int64 + minOffset: + type: integer + format: int64 + maxOffset: + type: integer + format: int64 + minTimestamp: + type: integer + format: int64 + maxTimestamp: + type: integer + format: int64 + nullKeys: + type: integer + format: int64 + nullValues: + type: integer + format: int64 + approxUniqKeys: + type: integer + format: int64 + approxUniqValues: + type: integer + format: int64 + keySize: + $ref: "#/components/schemas/TopicAnalysisSizeStats" + valueSize: + $ref: "#/components/schemas/TopicAnalysisSizeStats" + hourlyMsgCounts: + type: array + items: + type: object + properties: + hourStart: + type: integer + format: int64 + count: + type: integer + format: int64 + + TopicAnalysisSizeStats: + type: object + description: "All sizes in bytes" + properties: + sum: + type: integer + format: int64 + min: + type: integer + format: int64 + max: + type: integer + format: int64 + avg: + type: integer + format: int64 + prctl50: + type: integer + format: int64 + prctl75: + type: integer + format: int64 + prctl95: + type: integer + format: int64 + prctl99: + type: integer + format: int64 + prctl999: + type: integer + format: int64 + Replica: type: object properties: diff --git a/pom.xml b/pom.xml index a73aaf0825..924108bd76 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ 3.19.0 4.7.1 3.0.9 + 3.1.0 ..//kafka-ui-react-app/src/generated-sources