ZookeeperService.java 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package com.provectus.kafka.ui.zookeeper;
  2. import com.provectus.kafka.ui.cluster.model.KafkaCluster;
  3. import lombok.RequiredArgsConstructor;
  4. import lombok.extern.log4j.Log4j2;
  5. import org.I0Itec.zkclient.ZkClient;
  6. import org.springframework.scheduling.annotation.Async;
  7. import org.springframework.stereotype.Service;
  8. @Service
  9. @RequiredArgsConstructor
  10. @Log4j2
  11. public class ZookeeperService {
  12. @Async
  13. public void checkZookeeperStatus(KafkaCluster kafkaCluster) {
  14. log.debug("Start getting Zookeeper metrics for kafkaCluster: " + kafkaCluster.getName());
  15. boolean isConnected = false;
  16. if (kafkaCluster.getZkClient() != null) {
  17. isConnected = isZkClientConnected(kafkaCluster);
  18. }
  19. if (kafkaCluster.getZkClient() == null || !isConnected) {
  20. isConnected = createZookeeperConnection(kafkaCluster);
  21. }
  22. if (!isConnected) {
  23. kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.OFFLINE);
  24. return;
  25. }
  26. kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.ONLINE);
  27. }
  28. private boolean createZookeeperConnection(KafkaCluster kafkaCluster) {
  29. try {
  30. kafkaCluster.setZkClient(new ZkClient(kafkaCluster.getZookeeper(), 1000));
  31. return true;
  32. } catch (Exception e) {
  33. log.error(e);
  34. kafkaCluster.setLastZookeeperException(e);
  35. return false;
  36. }
  37. }
  38. private boolean isZkClientConnected(KafkaCluster kafkaCluster) {
  39. try {
  40. kafkaCluster.getZkClient().getChildren("/brokers/ids");
  41. return true;
  42. } catch (Exception e) {
  43. log.error(e);
  44. kafkaCluster.setLastZookeeperException(e);
  45. return false;
  46. }
  47. }
  48. }