Browse Source

Review changes

Anton Petrov 5 years ago
parent
commit
802b1b34c9

+ 11 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

@@ -5,14 +5,17 @@ import lombok.extern.log4j.Log4j2;
 
 
 import java.time.Duration;
 import java.time.Duration;
 import java.util.List;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 
 
+import com.provectus.kafka.ui.cluster.model.InternalTopic;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.kafka.KafkaService;
@@ -50,11 +53,11 @@ public class ConsumingService {
 		private final KafkaCluster cluster;
 		private final KafkaCluster cluster;
 		private final String topic;
 		private final String topic;
 
 
-		public void emit(FluxSink<ConsumerRecord<String, String>> sink) {
-			try (KafkaConsumer<String, String> consumer = kafkaService.createConsumer(cluster)) {
+		public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
+			try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
 				assignPartitions(consumer, topic);
 				assignPartitions(consumer, topic);
 				while (!sink.isCancelled()) {
 				while (!sink.isCancelled()) {
-					ConsumerRecords<String, String> records = consumer.poll(POLL_TIMEOUT_MS);
+					ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
 					log.info("{} records polled", records.count());
 					log.info("{} records polled", records.count());
 					records.iterator()
 					records.iterator()
 							.forEachRemaining(sink::next);
 							.forEachRemaining(sink::next);
@@ -65,9 +68,11 @@ public class ConsumingService {
 			}
 			}
 		}
 		}
 
 
-		private void assignPartitions(KafkaConsumer<String, String> consumer, String topic) {
-			List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
-					.map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
+		private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer, String topicName) {
+			List<TopicPartition> partitions =  Optional.ofNullable(cluster.getTopics().get(topicName))
+					.orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topicName))
+					.getPartitions().stream()
+					.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.getPartition()))
 					.collect(Collectors.toList());
 					.collect(Collectors.toList());
 
 
 			consumer.assign(partitions);
 			consumer.assign(partitions);

+ 7 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -15,6 +15,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Bytes;
 
 
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
@@ -138,15 +139,12 @@ public class ClusterUtil {
         return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
         return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
     }
     }
 
 
-    public static TopicMessage mapToTopicMessage(ConsumerRecord<String, String> consumerRecord) {
+    public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord) {
         OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
         OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
         TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
         TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
         Map<String, String> headers = new HashMap<>();
         Map<String, String> headers = new HashMap<>();
         consumerRecord.headers().iterator()
         consumerRecord.headers().iterator()
-                .forEachRemaining(header -> {
-                    headers.put(header.key(), new String(header.value()));
-                });
-
+                .forEachRemaining(header -> headers.put(header.key(), new String(header.value())));
 
 
         TopicMessage topicMessage = new TopicMessage();
         TopicMessage topicMessage = new TopicMessage();
 
 
@@ -154,9 +152,11 @@ public class ClusterUtil {
         topicMessage.setOffset(consumerRecord.offset());
         topicMessage.setOffset(consumerRecord.offset());
         topicMessage.setTimestamp(timestamp);
         topicMessage.setTimestamp(timestamp);
         topicMessage.setTimestampType(timestampType);
         topicMessage.setTimestampType(timestampType);
-        topicMessage.setKey(consumerRecord.key());
+        if (consumerRecord.key() != null) {
+            topicMessage.setKey(consumerRecord.key().toString());
+        }
         topicMessage.setHeaders(headers);
         topicMessage.setHeaders(headers);
-        topicMessage.setContent(consumerRecord.value());
+        topicMessage.setContent(consumerRecord.value().toString());
 
 
         return topicMessage;
         return topicMessage;
     }
     }

+ 5 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -18,7 +18,8 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple2;
@@ -243,12 +244,12 @@ public class KafkaService {
                         .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
                         .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
     }
     }
 
 
-    public KafkaConsumer<String, String> createConsumer(KafkaCluster cluster) {
+    public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
         Properties props = new Properties();
         Properties props = new Properties();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
 
 
         return new KafkaConsumer<>(props);
         return new KafkaConsumer<>(props);
     }
     }