|
@@ -208,12 +208,13 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
- Properties properties = new Properties();
|
|
|
- properties.putAll(kafkaCluster.getProperties());
|
|
|
- properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
- properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
|
|
- AdminClient adminClient = AdminClient.create(properties);
|
|
|
- return ExtendedAdminClient.extendedAdminClient(adminClient);
|
|
|
+ return Mono.fromSupplier(() -> {
|
|
|
+ Properties properties = new Properties();
|
|
|
+ properties.putAll(kafkaCluster.getProperties());
|
|
|
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
+ properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
|
|
+ return AdminClient.create(properties);
|
|
|
+ }).flatMap(ExtendedAdminClient::extendedAdminClient);
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|