|
@@ -42,19 +42,48 @@ public class KafkaService {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- kafkaCluster.setId(kafkaCluster.getAdminClient().describeCluster().clusterId().get());
|
|
|
- kafkaCluster.getCluster().setId(kafkaCluster.getAdminClient().describeCluster().clusterId().get());
|
|
|
+ kafkaCluster.getCluster().setId(kafkaCluster.getId());
|
|
|
kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
|
|
|
loadMetrics(kafkaCluster);
|
|
|
loadTopicsData(kafkaCluster);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ @SneakyThrows
|
|
|
+ public Mono<ResponseEntity<Topic>> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
|
|
|
+ return topicFormData.flatMap(
|
|
|
+ topicData -> {
|
|
|
+ AdminClient adminClient = cluster.getAdminClient();
|
|
|
+ NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
|
|
|
+ newTopic.configs(topicData.getConfigs());
|
|
|
+
|
|
|
+ createTopic(adminClient, newTopic);
|
|
|
+
|
|
|
+ DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
|
|
|
+ Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
+ var entry = topicDescriptionFuturesMap.entrySet().iterator().next();
|
|
|
+ var topicDescription = getTopicDescription(entry);
|
|
|
+ if (topicDescription == null) return Mono.error(new RuntimeException("Can't find created topic"));
|
|
|
+
|
|
|
+ Topic topic = collectTopicData(cluster, topicDescription);
|
|
|
+ cluster.getTopics().add(topic);
|
|
|
+ return Mono.just(new ResponseEntity<>(topic, HttpStatus.CREATED));
|
|
|
+ }
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ @SneakyThrows
|
|
|
+ private String getClusterId(KafkaCluster kafkaCluster) {
|
|
|
+ return kafkaCluster.getAdminClient().describeCluster().clusterId().get();
|
|
|
+ }
|
|
|
+
|
|
|
private boolean createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
try {
|
|
|
Properties properties = new Properties();
|
|
|
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
|
|
|
kafkaCluster.setAdminClient(AdminClient.create(properties));
|
|
|
+ kafkaCluster.setId(getClusterId(kafkaCluster));
|
|
|
|
|
|
return true;
|
|
|
} catch (Exception e) {
|
|
@@ -67,7 +96,7 @@ public class KafkaService {
|
|
|
|
|
|
private boolean isAdminClientConnected(KafkaCluster kafkaCluster) {
|
|
|
try {
|
|
|
- kafkaCluster.getAdminClient().listTopics();
|
|
|
+ getClusterId(kafkaCluster);
|
|
|
|
|
|
return true;
|
|
|
} catch (Exception e) {
|
|
@@ -221,29 +250,6 @@ public class KafkaService {
|
|
|
kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
|
|
|
}
|
|
|
|
|
|
- @SneakyThrows
|
|
|
- public Mono<ResponseEntity<Topic>> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
|
|
|
- return topicFormData.flatMap(
|
|
|
- topicData -> {
|
|
|
- AdminClient adminClient = cluster.getAdminClient();
|
|
|
- NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
|
|
|
- newTopic.configs(topicData.getConfigs());
|
|
|
-
|
|
|
- createTopic(adminClient, newTopic);
|
|
|
-
|
|
|
- DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
|
|
|
- Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
|
|
|
- var entry = topicDescriptionFuturesMap.entrySet().iterator().next();
|
|
|
- var topicDescription = getTopicDescription(entry);
|
|
|
- if (topicDescription == null) return Mono.error(new RuntimeException("Can't find created topic"));
|
|
|
-
|
|
|
- Topic topic = collectTopicData(cluster, topicDescription);
|
|
|
- cluster.getTopics().add(topic);
|
|
|
- return Mono.just(new ResponseEntity<>(topic, HttpStatus.CREATED));
|
|
|
- }
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
@SneakyThrows
|
|
|
private void createTopic(AdminClient adminClient, NewTopic newTopic) {
|
|
|
adminClient.createTopics(Collections.singletonList(newTopic))
|