Issue#634 clean up policy in topic list (#665)
* adding cleanup policy to InternalTopic * adding CleanupPolicy enum Co-authored-by: marselakhmetov <makhmetov@provectus.com>
This commit is contained in:
parent
32a0ece0a3
commit
cac9069774
4 changed files with 56 additions and 11 deletions
|
@ -0,0 +1,29 @@
|
||||||
|
package com.provectus.kafka.ui.model;
|
||||||
|
|
||||||
|
import com.provectus.kafka.ui.exception.IllegalEntityStateException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
public enum CleanupPolicy {
|
||||||
|
DELETE("delete"),
|
||||||
|
COMPACT("compact"),
|
||||||
|
COMPACT_DELETE("compact, delete"),
|
||||||
|
UNKNOWN("unknown");
|
||||||
|
|
||||||
|
private final String cleanUpPolicy;
|
||||||
|
|
||||||
|
CleanupPolicy(String cleanUpPolicy) {
|
||||||
|
this.cleanUpPolicy = cleanUpPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getCleanUpPolicy() {
|
||||||
|
return cleanUpPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CleanupPolicy fromString(String string) {
|
||||||
|
return Arrays.stream(CleanupPolicy.values())
|
||||||
|
.filter(v -> v.cleanUpPolicy.equals(string))
|
||||||
|
.findFirst()
|
||||||
|
.orElseThrow(() ->
|
||||||
|
new IllegalEntityStateException("Unknown cleanup policy value: " + string));
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,6 +14,7 @@ public class InternalTopic {
|
||||||
private final Map<Integer, InternalPartition> partitions;
|
private final Map<Integer, InternalPartition> partitions;
|
||||||
private final List<InternalTopicConfig> topicConfigs;
|
private final List<InternalTopicConfig> topicConfigs;
|
||||||
|
|
||||||
|
private final CleanupPolicy cleanUpPolicy;
|
||||||
private final int replicas;
|
private final int replicas;
|
||||||
private final int partitionCount;
|
private final int partitionCount;
|
||||||
private final int inSyncReplicas;
|
private final int inSyncReplicas;
|
||||||
|
|
|
@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.exception.TopicMetadataException;
|
import com.provectus.kafka.ui.exception.TopicMetadataException;
|
||||||
import com.provectus.kafka.ui.exception.ValidationException;
|
import com.provectus.kafka.ui.exception.ValidationException;
|
||||||
|
import com.provectus.kafka.ui.model.CleanupPolicy;
|
||||||
import com.provectus.kafka.ui.model.CreateTopicMessage;
|
import com.provectus.kafka.ui.model.CreateTopicMessage;
|
||||||
import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
||||||
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
|
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
|
||||||
|
@ -207,9 +208,15 @@ public class KafkaService {
|
||||||
|
|
||||||
private Map<String, InternalTopic> mergeWithConfigs(
|
private Map<String, InternalTopic> mergeWithConfigs(
|
||||||
List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
|
List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
|
||||||
return topics.stream().map(
|
return topics.stream()
|
||||||
t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()
|
.map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build())
|
||||||
).collect(Collectors.toMap(
|
.map(t -> t.toBuilder().cleanUpPolicy(
|
||||||
|
CleanupPolicy.fromString(t.getTopicConfigs().stream()
|
||||||
|
.filter(config -> config.getName().equals("cleanup.policy"))
|
||||||
|
.findFirst()
|
||||||
|
.orElseGet(() -> InternalTopicConfig.builder().value("unknown").build())
|
||||||
|
.getValue())).build())
|
||||||
|
.collect(Collectors.toMap(
|
||||||
InternalTopic::getName,
|
InternalTopic::getName,
|
||||||
e -> e
|
e -> e
|
||||||
));
|
));
|
||||||
|
@ -225,11 +232,12 @@ public class KafkaService {
|
||||||
final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
|
final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
|
||||||
loadTopicsConfig(adminClient, topics);
|
loadTopicsConfig(adminClient, topics);
|
||||||
|
|
||||||
return ClusterUtil.toMono(adminClient.describeTopics(topics).all()).map(
|
return ClusterUtil.toMono(adminClient.describeTopics(topics).all())
|
||||||
m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())
|
.map(m -> m.values().stream()
|
||||||
).flatMap(internalTopics -> configsMono.map(configs ->
|
.map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()))
|
||||||
mergeWithConfigs(internalTopics, configs).values()
|
.flatMap(internalTopics -> configsMono
|
||||||
)).flatMapMany(Flux::fromIterable);
|
.map(configs -> mergeWithConfigs(internalTopics, configs).values()))
|
||||||
|
.flatMapMany(Flux::fromIterable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1486,6 +1486,13 @@ components:
|
||||||
type: integer
|
type: integer
|
||||||
underReplicatedPartitions:
|
underReplicatedPartitions:
|
||||||
type: integer
|
type: integer
|
||||||
|
cleanUpPolicy:
|
||||||
|
type: string
|
||||||
|
enum:
|
||||||
|
- DELETE
|
||||||
|
- COMPACT
|
||||||
|
- COMPACT_DELETE
|
||||||
|
- UNKNOWN
|
||||||
partitions:
|
partitions:
|
||||||
type: array
|
type: array
|
||||||
items:
|
items:
|
||||||
|
|
Loading…
Add table
Reference in a new issue