소스 검색

Fixed Zookeeper connecion leak (#185)

German Osin 4 년 전
부모
커밋
b9e92114e6

+ 36 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

@@ -9,6 +9,7 @@ import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
 import com.provectus.kafka.ui.model.CompatibilityLevel;
 import com.provectus.kafka.ui.model.NewSchemaSubject;
 import com.provectus.kafka.ui.model.SchemaSubject;
+import java.util.Formatter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.http.HttpStatus;
@@ -51,9 +52,12 @@ public class SchemaRegistryService {
                 .map(cluster -> webClient.get()
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
                         .retrieve()
-                        .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
-                        .bodyToFlux(Integer.class))
-                .orElse(Flux.error(new NotFoundException("No such cluster")));
+                        .onStatus(HttpStatus.NOT_FOUND::equals,
+                            resp -> Mono.error(
+                                new NotFoundException(formatted("No such schema %s"))
+                            )
+                        ).bodyToFlux(Integer.class)
+                ).orElse(Flux.error(new NotFoundException("No such cluster")));
     }
 
     public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
@@ -70,8 +74,12 @@ public class SchemaRegistryService {
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
                         .retrieve()
                         .onStatus(HttpStatus.NOT_FOUND::equals,
-                                resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
-                        .bodyToMono(SchemaSubject.class)
+                                resp -> Mono.error(
+                                    new NotFoundException(
+                                        formatted("No such schema %s with version %s", schemaName, version)
+                                    )
+                                )
+                        ).bodyToMono(SchemaSubject.class)
                         .zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
                         .map(tuple -> {
                             SchemaSubject schema = tuple.getT1();
@@ -97,9 +105,13 @@ public class SchemaRegistryService {
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
                         .retrieve()
                         .onStatus(HttpStatus.NOT_FOUND::equals,
-                                resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
-                        .toBodilessEntity())
-                .orElse(Mono.error(new NotFoundException("No such cluster")));
+                                resp -> Mono.error(
+                                    new NotFoundException(
+                                        formatted("No such schema %s with version %s", schemaName, version)
+                                    )
+                                )
+                        ).toBodilessEntity()
+                ).orElse(Mono.error(new NotFoundException("No such cluster")));
     }
 
     public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
@@ -107,7 +119,13 @@ public class SchemaRegistryService {
                 .map(cluster -> webClient.delete()
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
                         .retrieve()
-                        .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
+                        .onStatus(HttpStatus.NOT_FOUND::equals,
+                            resp -> Mono.error(
+                                new NotFoundException(
+                                    formatted("No such schema %s", schemaName)
+                                )
+                            )
+                        )
                         .toBodilessEntity())
                 .orElse(Mono.error(new NotFoundException("No such cluster")));
     }
@@ -120,7 +138,9 @@ public class SchemaRegistryService {
                         .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
                         .retrieve()
                         .onStatus(HttpStatus.NOT_FOUND::equals,
-                                resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
+                                resp -> Mono.error(
+                                    new NotFoundException(formatted("No such schema %s", schemaName)))
+                        )
                         .toEntity(SchemaSubject.class)
                         .log())
                 .orElse(Mono.error(new NotFoundException("No such cluster")));
@@ -142,7 +162,7 @@ public class SchemaRegistryService {
                             .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
                             .retrieve()
                             .onStatus(HttpStatus.NOT_FOUND::equals,
-                                    resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
+                                    resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
                             .bodyToMono(Void.class);
                 }).orElse(Mono.error(new NotFoundException("No such cluster")));
     }
@@ -181,10 +201,14 @@ public class SchemaRegistryService {
                         .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
                         .retrieve()
                         .onStatus(HttpStatus.NOT_FOUND::equals,
-                                resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
+                                resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
                         .bodyToMono(InternalCompatibilityCheck.class)
                         .map(mapper::toCompatibilityCheckResponse)
                         .log()
                 ).orElse(Mono.error(new NotFoundException("No such cluster")));
     }
+
+    public String formatted(String str, Object... args) {
+        return new Formatter().format(str, args).toString();
+    }
 }

+ 6 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.zookeeper;
 
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import java.util.concurrent.ConcurrentHashMap;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.I0Itec.zkclient.ZkClient;
@@ -14,7 +15,7 @@ import java.util.Map;
 @Log4j2
 public class ZookeeperService {
 
-    private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
+    private final Map<String, ZkClient> cachedZkClient = new ConcurrentHashMap<>();
 
     public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
         var isConnected = false;
@@ -33,7 +34,10 @@ public class ZookeeperService {
 
     private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
         try {
-            return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
+            return cachedZkClient.computeIfAbsent(
+                cluster.getName(),
+                (n) -> new ZkClient(cluster.getZookeeper(), 1000)
+            );
         } catch (Exception e) {
             log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
             return null;