|
@@ -11,6 +11,7 @@ import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@@ -37,18 +38,16 @@ public class AuditService implements Closeable {
|
|
|
public AuditService(ClustersProperties clustersProperties,
|
|
|
AdminClientService adminClientService,
|
|
|
ClustersStorage clustersStorage) {
|
|
|
- if (clustersProperties.getClusters() != null) {
|
|
|
- for (var clusterProps : clustersProperties.getClusters()) {
|
|
|
- var cluster = clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow();
|
|
|
- ReactiveAdminClient adminClient = null;
|
|
|
- try {
|
|
|
- adminClient = adminClientService.get(cluster).block();
|
|
|
- } catch (Exception e) {
|
|
|
- printAuditInitError(cluster, "Error connect to cluster", e);
|
|
|
- continue;
|
|
|
- }
|
|
|
- initialize(cluster, adminClient);
|
|
|
+ for (var clusterProps : Optional.ofNullable(clustersProperties.getClusters()).orElse(List.of())) {
|
|
|
+ var cluster = clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow();
|
|
|
+ ReactiveAdminClient adminClient;
|
|
|
+ try {
|
|
|
+ adminClient = adminClientService.get(cluster).block();
|
|
|
+ } catch (Exception e) {
|
|
|
+ printAuditInitError(cluster, "Error connect to cluster", e);
|
|
|
+ continue;
|
|
|
}
|
|
|
+ initialize(cluster, adminClient);
|
|
|
}
|
|
|
}
|
|
|
|