فهرست منبع

Fix zookeeper thread leaking (#1574)

* Issue #1472 Added protection to prevent infinite loops when connection is in error with zookeeper

* Issue #1472 Added protection to prevent infinite loops when connection is in error with zookeeper

* Issue #1472 Added protection to prevent infinite loops when connection is in error with zookeeper

* Issue #1472 corrected checkStyle error

* Issue #1472 corrected bug

* Get rid of a successful test

this will always fail since it's not an integration test

* Fix test

Signed-off-by: Roman Zabaluev <rzabaluev@provectus.com>

Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Mohamad Choukair 3 سال پیش
والد
کامیت
f3c07b15e5

+ 10 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java

@@ -56,6 +56,7 @@ public class ZookeeperService {
       zkClient.getChildren("/brokers/ids", null);
     } catch (KeeperException e) {
       log.error("A zookeeper exception has occurred", e);
+      closeZkClientSession(zkClient, e);
       return false;
     } catch (InterruptedException e) {
       log.error("Interrupted: ", e);
@@ -64,6 +65,15 @@ public class ZookeeperService {
     return true;
   }
 
+  private void closeZkClientSession(ZooKeeper zkClient, KeeperException e) {
+    try {
+      zkClient.close();
+    } catch (InterruptedException ex) {
+      log.error("Unable to close zkClient session: ", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
   @Nullable
   private ZooKeeper getOrCreateZkClient(KafkaCluster cluster) {
     final var clusterName = cluster.getName();

+ 52 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ZookeeperServiceTest.java

@@ -0,0 +1,52 @@
+package com.provectus.kafka.ui.service;
+
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.ServerStatusDTO;
+import java.util.Properties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import reactor.test.StepVerifier;
+
+class ZookeeperServiceTest extends AbstractIntegrationTest {
+  private ZookeeperService zookeeperService;
+
+  @BeforeEach
+  void init() {
+    AdminClientServiceImpl adminClientService = new AdminClientServiceImpl();
+    adminClientService.setClientTimeout(5_000);
+    zookeeperService = new ZookeeperService();
+  }
+
+  @Test
+  void getZkStatusEmptyConfig() {
+    KafkaCluster kafkaCluster =
+        KafkaCluster.builder()
+            .name(LOCAL)
+            .bootstrapServers(kafka.getBootstrapServers())
+            .properties(new Properties())
+            .build();
+
+    ZookeeperService.ZkStatus zkStatus = new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null);
+    StepVerifier.create(zookeeperService.getZkStatus(kafkaCluster))
+        .expectNext(zkStatus)
+        .verifyComplete();
+  }
+
+  @Test
+  void getZkStatusWrongConfig() {
+    KafkaCluster kafkaCluster =
+        KafkaCluster.builder()
+            .name(LOCAL)
+            .bootstrapServers(kafka.getBootstrapServers())
+            .zookeeper("localhost:1000")
+            .properties(new Properties())
+            .build();
+
+    ZookeeperService.ZkStatus zkStatus = new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null);
+    StepVerifier.create(zookeeperService.getZkStatus(kafkaCluster))
+        .expectNext(zkStatus)
+        .verifyComplete();
+  }
+
+}