|
@@ -290,12 +290,7 @@ public class KafkaService {
|
|
|
topicData ->
|
|
|
getTopicsData(adminClient, Collections.singleton(topicData.getName()))
|
|
|
.next()
|
|
|
- ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
|
|
|
- .flatMap(t ->
|
|
|
- loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
|
|
|
- .map(c -> mergeWithConfigs(Collections.singletonList(t), c))
|
|
|
- .map(m -> m.values().iterator().next())
|
|
|
- );
|
|
|
+ ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")));
|
|
|
}
|
|
|
|
|
|
public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicCreation> topicCreation) {
|
|
@@ -306,8 +301,9 @@ public class KafkaService {
|
|
|
public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
|
|
|
return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.map(ExtendedAdminClient::getAdminClient)
|
|
|
- .map(adminClient -> adminClient.deleteTopics(List.of(topicName)))
|
|
|
- .then();
|
|
|
+ .flatMap(adminClient ->
|
|
|
+ ClusterUtil.toMono(adminClient.deleteTopics(List.of(topicName)).all())
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@@ -667,7 +663,9 @@ public class KafkaService {
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
|
return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.map(ExtendedAdminClient::getAdminClient)
|
|
|
- .map(ac -> ac.deleteRecords(records)).then();
|
|
|
+ .flatMap(ac ->
|
|
|
+ ClusterUtil.toMono(ac.deleteRecords(records).all())
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
|