Browse Source

small pom updates //
continue review fixes

Zhenya Taran 5 years ago
parent
commit
b5d6f78962

+ 32 - 4
docker/kafka-clusters-only.yaml

@@ -20,13 +20,27 @@ services:
     environment:
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
       KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29091,PLAINTEXT_HOST://localhost:9091
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29091,PLAINTEXT_HOST://localhost:9091,PLAIN://kafka0:29090
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       JMX_PORT: 9997
       JMX_PORT: 9997
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
 
 
+  kafka-init-topics0:
+    image: confluentinc/cp-kafka:5.1.0
+    depends_on:
+      - kafka0
+    command: "bash -c 'echo Waiting for Kafka to be ready... && \
+                cub kafka-ready -b kafka0:29090 1 20 && \
+                kafka-topics --create --topic users --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
+                kafka-topics --create --topic messages --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181'"
+    environment:
+      KAFKA_BROKER_ID: ignored
+      KAFKA_ZOOKEEPER_CONNECT: ignored
+    networks:
+      - default
+
   zookeeper1:
   zookeeper1:
     image: confluentinc/cp-zookeeper:5.1.0
     image: confluentinc/cp-zookeeper:5.1.0
     environment:
     environment:
@@ -45,9 +59,23 @@ services:
     environment:
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
       KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092,PLAIN://kafka1:29090
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       JMX_PORT: 9998
       JMX_PORT: 9998
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998
+
+  kafka-init-topics1:
+    image: confluentinc/cp-kafka:5.1.0
+    depends_on:
+      - kafka1
+    command: "bash -c 'echo Waiting for Kafka to be ready... && \
+                cub kafka-ready -b kafka1:29090 1 20 && \
+                kafka-topics --create --topic users --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+                kafka-topics --create --topic messages --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181'"
+    environment:
+      KAFKA_BROKER_ID: ignored
+      KAFKA_ZOOKEEPER_CONNECT: ignored
+    networks:
+      - default

+ 11 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -6,7 +6,10 @@ import com.provectus.kafka.ui.model.TopicDetails;
 import lombok.AccessLevel;
 import lombok.AccessLevel;
 import lombok.Data;
 import lombok.Data;
 import lombok.experimental.FieldDefaults;
 import lombok.experimental.FieldDefaults;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.admin.AdminClient;
 
 
+import javax.management.MBeanServerConnection;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -28,6 +31,14 @@ public class KafkaCluster {
     List<Topic> topics = new ArrayList<>();
     List<Topic> topics = new ArrayList<>();
     List<TopicDetails> topicDetails = new ArrayList<>();
     List<TopicDetails> topicDetails = new ArrayList<>();
 
 
+    MBeanServerConnection mBeanServerConnection;
+    ZkClient zkClient;
+    AdminClient adminClient;
+
+    Exception kafkaException;
+    Exception jmxException;
+    Exception zookeeperException;
+
     public void putMetric(String metricKey, String metricValue) {
     public void putMetric(String metricKey, String metricValue) {
         metricsMap.put(metricKey, metricValue);
         metricsMap.put(metricKey, metricValue);
     }
     }

+ 13 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/jmx/JmxService.java

@@ -20,15 +20,23 @@ public class JmxService {
 
 
     @SneakyThrows
     @SneakyThrows
     public void loadClusterMetrics(KafkaCluster kafkaCluster) {
     public void loadClusterMetrics(KafkaCluster kafkaCluster) {
-        String url = "service:jmx:rmi:///jndi/rmi://" + kafkaCluster.getJmxHost() + ":" + kafkaCluster.getJmxPort() + "/jmxrmi";
-        JMXServiceURL serviceUrl = new JMXServiceURL(url);
-        try (JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceUrl, null)) {
-            MBeanServerConnection connection = jmxConnector.getMBeanServerConnection();
+        // check before getting something
+        try {
+            if (kafkaCluster.getMBeanServerConnection() == null) {
+                String url = "service:jmx:rmi:///jndi/rmi://" + kafkaCluster.getJmxHost() + ":" + kafkaCluster.getJmxPort() + "/jmxrmi";
+                JMXServiceURL serviceUrl = new JMXServiceURL(url);
+                JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceUrl, null);
+                kafkaCluster.setMBeanServerConnection(jmxConnector.getMBeanServerConnection());
+            }
             for (Map.Entry<MBeanInfo, String> mbeanToMetric : JmxConstants.mbeanToAttributeMap.entrySet()) {
             for (Map.Entry<MBeanInfo, String> mbeanToMetric : JmxConstants.mbeanToAttributeMap.entrySet()) {
                 MBeanInfo mBeanInfo = mbeanToMetric.getKey();
                 MBeanInfo mBeanInfo = mbeanToMetric.getKey();
-                Object attributeValue = connection.getAttribute(new ObjectName(mBeanInfo.getName()), mBeanInfo.getAttribute());
+                Object attributeValue = kafkaCluster.getMBeanServerConnection().getAttribute(new ObjectName(mBeanInfo.getName()), mBeanInfo.getAttribute());
                 kafkaCluster.putMetric(mbeanToMetric.getValue(), attributeValue.toString());
                 kafkaCluster.putMetric(mbeanToMetric.getValue(), attributeValue.toString());
             }
             }
+        } catch (Exception e) {
+            log.error(e);
+            kafkaCluster.setMBeanServerConnection(null);
         }
         }
+
     }
     }
 }
 }

+ 10 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -1,7 +1,7 @@
 package com.provectus.kafka.ui.kafka;
 package com.provectus.kafka.ui.kafka;
 
 
-import com.provectus.kafka.ui.cluster.model.MetricsConstants;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.MetricsConstants;
 import com.provectus.kafka.ui.model.Partition;
 import com.provectus.kafka.ui.model.Partition;
 import com.provectus.kafka.ui.model.Replica;
 import com.provectus.kafka.ui.model.Replica;
 import com.provectus.kafka.ui.model.Topic;
 import com.provectus.kafka.ui.model.Topic;
@@ -25,7 +25,7 @@ public class KafkaService {
 
 
     @SneakyThrows
     @SneakyThrows
     public void loadClusterMetrics(KafkaCluster kafkaCluster) {
     public void loadClusterMetrics(KafkaCluster kafkaCluster) {
-        isZookeeperRunning(kafkaCluster);
+        checkZookeperConnection(kafkaCluster);
 
 
         Properties properties = new Properties();
         Properties properties = new Properties();
         properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
         properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
@@ -97,15 +97,18 @@ public class KafkaService {
         kafkaCluster.putMetric(MetricsConstants.PARTITIONS_COUNT, String.valueOf(partitionsNum));
         kafkaCluster.putMetric(MetricsConstants.PARTITIONS_COUNT, String.valueOf(partitionsNum));
     }
     }
 
 
-    public static void isZookeeperRunning(KafkaCluster kafkaCluster){
-        //Because kafka connector waits for 2 minutes with retries before telling that there is no connection
-        //ZKClient is used to not wait 2 minutes for response. If there is no connection, exception will be thrown
+    public static void checkZookeperConnection(KafkaCluster kafkaCluster){
         try {
         try {
-            ZkClient zkClient = new ZkClient(kafkaCluster.getZookeeper(), 1000);
+            if (kafkaCluster.getZkClient() == null) {
+
+            } else {
+            }
+            kafkaCluster.setZkClient(new ZkClient(kafkaCluster.getZookeeper(), 1000));
             kafkaCluster.putMetric(ZOOKEEPER_STATUS, "1");
             kafkaCluster.putMetric(ZOOKEEPER_STATUS, "1");
-            zkClient.close();
         } catch (Exception e) {
         } catch (Exception e) {
+            kafkaCluster.setZkClient(null);
             kafkaCluster.putMetric(ZOOKEEPER_STATUS, "0");
             kafkaCluster.putMetric(ZOOKEEPER_STATUS, "0");
+            kafkaCluster.setZookeeperException(e);
             throw e;
             throw e;
         }
         }
     }
     }

+ 3 - 39
kafka-ui-contract/pom.xml

@@ -11,17 +11,6 @@
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>kafka-ui-contract</artifactId>
     <artifactId>kafka-ui-contract</artifactId>
 
 
-    <dependencies>
-        <!-- needed for running test in idea, without this fix, tests don't compile -->
-        <!-- https://mvnrepository.com/artifact/junit/junit -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.13</version>
-            <scope>compile</scope>
-        </dependency>
-    </dependencies>
-
     <profiles>
     <profiles>
         <!--        SPRING WebFlux API          -->
         <!--        SPRING WebFlux API          -->
         <profile>
         <profile>
@@ -37,9 +26,9 @@
                     <version>${spring-boot.version}</version>
                     <version>${spring-boot.version}</version>
                 </dependency>
                 </dependency>
                 <dependency>
                 <dependency>
-                    <groupId>io.springfox</groupId>
-                    <artifactId>springfox-swagger2</artifactId>
-                    <version>${springfox-swagger2.version}</version>
+                    <groupId>io.swagger</groupId>
+                    <artifactId>swagger-annotations</artifactId>
+                    <version>1.6.0</version>
                 </dependency>
                 </dependency>
                 <dependency>
                 <dependency>
                     <groupId>org.openapitools</groupId>
                     <groupId>org.openapitools</groupId>
@@ -89,29 +78,4 @@
             </build>
             </build>
         </profile>
         </profile>
     </profiles>
     </profiles>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>com.github.ngeor</groupId>
-                <artifactId>yak4j-json-yaml-converter-maven-plugin</artifactId>
-                <version>0.0.4</version>
-                <executions>
-                    <execution>
-                        <id>yaml2json</id>
-                        <goals>
-                            <goal>yaml2json</goal>
-                        </goals>
-                        <configuration>
-                            <sourceDirectory>src/main/resources/swagger</sourceDirectory>
-                            <includes>
-                                <include>*.yaml</include>
-                            </includes>
-                            <outputDirectory>target/generated-sources/swagger</outputDirectory>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
 </project>
 </project>

+ 1 - 1
pom.xml

@@ -19,7 +19,7 @@
 		<org.mapstruct.version>1.3.1.Final</org.mapstruct.version>
 		<org.mapstruct.version>1.3.1.Final</org.mapstruct.version>
 		<org.projectlombok.version>1.18.10</org.projectlombok.version>
 		<org.projectlombok.version>1.18.10</org.projectlombok.version>
 		<git.revision>latest</git.revision>
 		<git.revision>latest</git.revision>
-		<zkclient.version>0.2</zkclient.version>
+		<zkclient.version>0.11</zkclient.version>
 		<kafka-clients.version>2.4.0</kafka-clients.version>
 		<kafka-clients.version>2.4.0</kafka-clients.version>
 		<node.version>v12.13.1</node.version>
 		<node.version>v12.13.1</node.version>
 		<dockerfile-maven-plugin.version>1.4.10</dockerfile-maven-plugin.version>
 		<dockerfile-maven-plugin.version>1.4.10</dockerfile-maven-plugin.version>