|
@@ -12,7 +12,8 @@ import com.provectus.kafka.ui.model.InternalTopicConfig;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
import com.provectus.kafka.ui.model.Metric;
|
|
|
import com.provectus.kafka.ui.model.ServerStatus;
|
|
|
-import com.provectus.kafka.ui.model.TopicFormData;
|
|
|
+import com.provectus.kafka.ui.model.TopicCreation;
|
|
|
+import com.provectus.kafka.ui.model.TopicUpdate;
|
|
|
import com.provectus.kafka.ui.util.ClusterUtil;
|
|
|
import com.provectus.kafka.ui.util.JmxClusterUtil;
|
|
|
import com.provectus.kafka.ui.util.JmxMetricsName;
|
|
@@ -223,8 +224,8 @@ public class KafkaService {
|
|
|
|
|
|
@SneakyThrows
|
|
|
public Mono<InternalTopic> createTopic(AdminClient adminClient,
|
|
|
- Mono<TopicFormData> topicFormData) {
|
|
|
- return topicFormData.flatMap(
|
|
|
+ Mono<TopicCreation> topicCreation) {
|
|
|
+ return topicCreation.flatMap(
|
|
|
topicData -> {
|
|
|
NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(),
|
|
|
topicData.getReplicationFactor().shortValue());
|
|
@@ -242,9 +243,9 @@ public class KafkaService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
|
|
|
+ public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicCreation> topicCreation) {
|
|
|
return getOrCreateAdminClient(cluster)
|
|
|
- .flatMap(ac -> createTopic(ac.getAdminClient(), topicFormData));
|
|
|
+ .flatMap(ac -> createTopic(ac.getAdminClient(), topicCreation));
|
|
|
}
|
|
|
|
|
|
public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
|
|
@@ -320,16 +321,16 @@ public class KafkaService {
|
|
|
|
|
|
@SneakyThrows
|
|
|
public Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName,
|
|
|
- TopicFormData topicFormData) {
|
|
|
+ TopicUpdate topicUpdate) {
|
|
|
ConfigResource topicCr = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
|
|
|
return getOrCreateAdminClient(cluster)
|
|
|
.flatMap(ac -> {
|
|
|
if (ac.getSupportedFeatures()
|
|
|
.contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
|
|
|
- return incrementalAlterConfig(topicFormData, topicCr, ac)
|
|
|
+ return incrementalAlterConfig(topicUpdate, topicCr, ac)
|
|
|
.flatMap(c -> getUpdatedTopic(ac, topicName));
|
|
|
} else {
|
|
|
- return alterConfig(topicFormData, topicCr, ac)
|
|
|
+ return alterConfig(topicUpdate, topicCr, ac)
|
|
|
.flatMap(c -> getUpdatedTopic(ac, topicName));
|
|
|
}
|
|
|
});
|
|
@@ -341,9 +342,9 @@ public class KafkaService {
|
|
|
.filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow());
|
|
|
}
|
|
|
|
|
|
- private Mono<String> incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCr,
|
|
|
+ private Mono<String> incrementalAlterConfig(TopicUpdate topicUpdate, ConfigResource topicCr,
|
|
|
ExtendedAdminClient ac) {
|
|
|
- List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
|
|
|
+ List<AlterConfigOp> listOp = topicUpdate.getConfigs().entrySet().stream()
|
|
|
.flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()),
|
|
|
AlterConfigOp.OpType.SET))).collect(Collectors.toList());
|
|
|
return ClusterUtil.toMono(
|
|
@@ -352,9 +353,9 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("deprecation")
|
|
|
- private Mono<String> alterConfig(TopicFormData topicFormData, ConfigResource topicCr,
|
|
|
+ private Mono<String> alterConfig(TopicUpdate topicUpdate, ConfigResource topicCr,
|
|
|
ExtendedAdminClient ac) {
|
|
|
- List<ConfigEntry> configEntries = topicFormData.getConfigs().entrySet().stream()
|
|
|
+ List<ConfigEntry> configEntries = topicUpdate.getConfigs().entrySet().stream()
|
|
|
.flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
|
|
|
.collect(Collectors.toList());
|
|
|
Config config = new Config(configEntries);
|