|
@@ -264,7 +264,11 @@ public class KafkaService {
|
|
|
|
|
|
@SneakyThrows
|
|
|
private Mono<KafkaFuture<Void>> createTopic(AdminClient adminClient, NewTopic newTopic) {
|
|
|
- return Mono.just(adminClient.createTopics(Collections.singletonList(newTopic)).all());
|
|
|
+ return Mono.just(adminClient.createTopics(Collections.singletonList(newTopic)).values().get(newTopic.name()))
|
|
|
+ .onErrorResume(e -> {
|
|
|
+ log.error(new Exception(e));
|
|
|
+ return Mono.empty();
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private Mono<Void> incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {
|