diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java new file mode 100644 index 0000000000..49db34b8de --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java @@ -0,0 +1,29 @@ +package com.provectus.kafka.ui.model; + +import com.provectus.kafka.ui.exception.IllegalEntityStateException; +import java.util.Arrays; + +public enum CleanupPolicy { + DELETE("delete"), + COMPACT("compact"), + COMPACT_DELETE("compact, delete"), + UNKNOWN("unknown"); + + private final String cleanUpPolicy; + + CleanupPolicy(String cleanUpPolicy) { + this.cleanUpPolicy = cleanUpPolicy; + } + + public String getCleanUpPolicy() { + return cleanUpPolicy; + } + + public static CleanupPolicy fromString(String string) { + return Arrays.stream(CleanupPolicy.values()) + .filter(v -> v.cleanUpPolicy.equals(string)) + .findFirst() + .orElseThrow(() -> + new IllegalEntityStateException("Unknown cleanup policy value: " + string)); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java index 879c9cdac0..4ac44407f4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java @@ -14,6 +14,7 @@ public class InternalTopic { private final Map partitions; private final List topicConfigs; + private final CleanupPolicy cleanUpPolicy; private final int replicas; private final int partitionCount; private final int inSyncReplicas; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index b3a2d92d56..e914a16a1e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.exception.TopicMetadataException; import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.CleanupPolicy; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; @@ -207,12 +208,18 @@ public class KafkaService { private Map mergeWithConfigs( List topics, Map> configs) { - return topics.stream().map( - t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build() - ).collect(Collectors.toMap( - InternalTopic::getName, - e -> e - )); + return topics.stream() + .map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()) + .map(t -> t.toBuilder().cleanUpPolicy( + CleanupPolicy.fromString(t.getTopicConfigs().stream() + .filter(config -> config.getName().equals("cleanup.policy")) + .findFirst() + .orElseGet(() -> InternalTopicConfig.builder().value("unknown").build()) + .getValue())).build()) + .collect(Collectors.toMap( + InternalTopic::getName, + e -> e + )); } @SneakyThrows @@ -225,11 +232,12 @@ public class KafkaService { final Mono>> configsMono = loadTopicsConfig(adminClient, topics); - return ClusterUtil.toMono(adminClient.describeTopics(topics).all()).map( - m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()) - ).flatMap(internalTopics -> configsMono.map(configs -> - mergeWithConfigs(internalTopics, configs).values() - )).flatMapMany(Flux::fromIterable); + return ClusterUtil.toMono(adminClient.describeTopics(topics).all()) + .map(m -> m.values().stream() + .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())) + .flatMap(internalTopics -> configsMono + .map(configs -> mergeWithConfigs(internalTopics, configs).values())) + .flatMapMany(Flux::fromIterable); } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 2bb431ac6e..6643282ffc 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1486,6 +1486,13 @@ components: type: integer underReplicatedPartitions: type: integer + cleanUpPolicy: + type: string + enum: + - DELETE + - COMPACT + - COMPACT_DELETE + - UNKNOWN partitions: type: array items: