diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java index 03638afc7f..6e8f1a5f51 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java @@ -56,6 +56,7 @@ public class ZookeeperService { zkClient.getChildren("/brokers/ids", null); } catch (KeeperException e) { log.error("A zookeeper exception has occurred", e); + closeZkClientSession(zkClient, e); return false; } catch (InterruptedException e) { log.error("Interrupted: ", e); @@ -64,6 +65,15 @@ public class ZookeeperService { return true; } + private void closeZkClientSession(ZooKeeper zkClient, KeeperException e) { + try { + zkClient.close(); + } catch (InterruptedException ex) { + log.error("Unable to close zkClient session: ", e); + Thread.currentThread().interrupt(); + } + } + @Nullable private ZooKeeper getOrCreateZkClient(KafkaCluster cluster) { final var clusterName = cluster.getName(); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ZookeeperServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ZookeeperServiceTest.java new file mode 100644 index 0000000000..92da37f27a --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ZookeeperServiceTest.java @@ -0,0 +1,52 @@ +package com.provectus.kafka.ui.service; + +import com.provectus.kafka.ui.AbstractIntegrationTest; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.ServerStatusDTO; +import java.util.Properties; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +class ZookeeperServiceTest extends AbstractIntegrationTest { + private ZookeeperService zookeeperService; + + @BeforeEach + void init() { + AdminClientServiceImpl adminClientService = new AdminClientServiceImpl(); + adminClientService.setClientTimeout(5_000); + zookeeperService = new ZookeeperService(); + } + + @Test + void getZkStatusEmptyConfig() { + KafkaCluster kafkaCluster = + KafkaCluster.builder() + .name(LOCAL) + .bootstrapServers(kafka.getBootstrapServers()) + .properties(new Properties()) + .build(); + + ZookeeperService.ZkStatus zkStatus = new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null); + StepVerifier.create(zookeeperService.getZkStatus(kafkaCluster)) + .expectNext(zkStatus) + .verifyComplete(); + } + + @Test + void getZkStatusWrongConfig() { + KafkaCluster kafkaCluster = + KafkaCluster.builder() + .name(LOCAL) + .bootstrapServers(kafka.getBootstrapServers()) + .zookeeper("localhost:1000") + .properties(new Properties()) + .build(); + + ZookeeperService.ZkStatus zkStatus = new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null); + StepVerifier.create(zookeeperService.getZkStatus(kafkaCluster)) + .expectNext(zkStatus) + .verifyComplete(); + } + +}