From f3c07b15e55970960b0273828bf0968506989e8f Mon Sep 17 00:00:00 2001 From: Mohamad Choukair <98760197+mchoukairprov@users.noreply.github.com> Date: Thu, 10 Mar 2022 06:13:39 -0500 Subject: [PATCH] Fix zookeeper thread leaking (#1574) * Issue #1472 Added protection to prevent infinite loops when connection is in error with zookeeper * Issue #1472 Added protection to prevent infinite loops when connection is in error with zookeeper * Issue #1472 Added protection to prevent infinite loops when connection is in error with zookeeper * Issue #1472 corrected checkStyle error * Issue #1472 corrected bug * Get rid of a successful test this will always fail since it's not an integration test * Fix test Signed-off-by: Roman Zabaluev Co-authored-by: Roman Zabaluev --- .../kafka/ui/service/ZookeeperService.java | 10 ++++ .../ui/service/ZookeeperServiceTest.java | 52 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ZookeeperServiceTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java index 03638afc7f..6e8f1a5f51 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java @@ -56,6 +56,7 @@ public class ZookeeperService { zkClient.getChildren("/brokers/ids", null); } catch (KeeperException e) { log.error("A zookeeper exception has occurred", e); + closeZkClientSession(zkClient, e); return false; } catch (InterruptedException e) { log.error("Interrupted: ", e); @@ -64,6 +65,15 @@ public class ZookeeperService { return true; } + private void closeZkClientSession(ZooKeeper zkClient, KeeperException e) { + try { + zkClient.close(); + } catch (InterruptedException ex) { + log.error("Unable to close zkClient session: ", e); + Thread.currentThread().interrupt(); + } + } + @Nullable private ZooKeeper getOrCreateZkClient(KafkaCluster cluster) { final var clusterName = cluster.getName(); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ZookeeperServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ZookeeperServiceTest.java new file mode 100644 index 0000000000..92da37f27a --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ZookeeperServiceTest.java @@ -0,0 +1,52 @@ +package com.provectus.kafka.ui.service; + +import com.provectus.kafka.ui.AbstractIntegrationTest; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.ServerStatusDTO; +import java.util.Properties; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +class ZookeeperServiceTest extends AbstractIntegrationTest { + private ZookeeperService zookeeperService; + + @BeforeEach + void init() { + AdminClientServiceImpl adminClientService = new AdminClientServiceImpl(); + adminClientService.setClientTimeout(5_000); + zookeeperService = new ZookeeperService(); + } + + @Test + void getZkStatusEmptyConfig() { + KafkaCluster kafkaCluster = + KafkaCluster.builder() + .name(LOCAL) + .bootstrapServers(kafka.getBootstrapServers()) + .properties(new Properties()) + .build(); + + ZookeeperService.ZkStatus zkStatus = new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null); + StepVerifier.create(zookeeperService.getZkStatus(kafkaCluster)) + .expectNext(zkStatus) + .verifyComplete(); + } + + @Test + void getZkStatusWrongConfig() { + KafkaCluster kafkaCluster = + KafkaCluster.builder() + .name(LOCAL) + .bootstrapServers(kafka.getBootstrapServers()) + .zookeeper("localhost:1000") + .properties(new Properties()) + .build(); + + ZookeeperService.ZkStatus zkStatus = new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null); + StepVerifier.create(zookeeperService.getZkStatus(kafkaCluster)) + .expectNext(zkStatus) + .verifyComplete(); + } + +}