فهرست منبع

Topic messages API. Reactive consumer (#50)

* Topic messages API. Reactive consumer

* Topic messages API. Reactive consumer

* Review changes
Anton Petrov 5 سال پیش
والد
کامیت
803f0be7d7

+ 18 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -8,6 +8,7 @@ import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
+
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -19,6 +20,7 @@ import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
+import java.time.OffsetDateTime;
 import java.util.*;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
@@ -30,6 +32,7 @@ public class ClusterService {
     private final ClustersStorage clustersStorage;
     private final ClustersStorage clustersStorage;
     private final ClusterMapper clusterMapper;
     private final ClusterMapper clusterMapper;
     private final KafkaService kafkaService;
     private final KafkaService kafkaService;
+    private final ConsumingService consumingService;
 
 
     public List<Cluster> getClusters() {
     public List<Cluster> getClusters() {
         return clustersStorage.getKafkaClusters()
         return clustersStorage.getKafkaClusters()
@@ -54,15 +57,17 @@ public class ClusterService {
     }
     }
 
 
     public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
     public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
-        return clustersStorage.getClusterByName(name).flatMap(
-                c -> Optional.ofNullable(c.getTopics().get(topicName))
-        ).map(clusterMapper::toTopicDetails);
+        return clustersStorage.getClusterByName(name)
+                .map(KafkaCluster::getTopics)
+                .map(t -> t.get(topicName))
+                .map(clusterMapper::toTopicDetails);
     }
     }
 
 
     public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
     public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
-        return clustersStorage.getClusterByName(name).flatMap(
-                c -> Optional.ofNullable(c.getTopics().get(topicName))
-        ).map( t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
+        return clustersStorage.getClusterByName(name)
+                .map(KafkaCluster::getTopics)
+                .map(t -> t.get(topicName))
+                .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
     }
     }
 
 
     public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
     public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
@@ -135,4 +140,11 @@ public class ClusterService {
                     }).collect(Collectors.toList())))
                     }).collect(Collectors.toList())))
                 .flatMapMany(Flux::fromIterable);
                 .flatMapMany(Flux::fromIterable);
     }
     }
+
+    public Flux<TopicMessage> getMessages(String clusterName, String topicName, Integer partition, Long offset, OffsetDateTime timestamp) {
+        return clustersStorage.getClusterByName(clusterName)
+                .map(c -> consumingService.loadMessages(c, topicName))
+                .orElse(Flux.empty());
+
+    }
 }
 }

+ 83 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

@@ -0,0 +1,83 @@
+package com.provectus.kafka.ui.cluster.service;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.springframework.stereotype.Service;
+
+import com.provectus.kafka.ui.cluster.model.InternalTopic;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
+import com.provectus.kafka.ui.kafka.KafkaService;
+import com.provectus.kafka.ui.model.TopicMessage;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.scheduler.Schedulers;
+
+@Service
+@Log4j2
+@RequiredArgsConstructor
+public class ConsumingService {
+
+
+	// TODO: make this configurable
+	private static final int BATCH_SIZE = 20;
+
+	private final KafkaService kafkaService;
+
+	public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic) {
+		RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic);
+		return Flux.create(emitter::emit)
+				.subscribeOn(Schedulers.boundedElastic())
+				.map(ClusterUtil::mapToTopicMessage)
+				.limitRequest(BATCH_SIZE);
+	}
+
+	@RequiredArgsConstructor
+	private static class RecordEmitter {
+
+		private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
+
+		private final KafkaService kafkaService;
+		private final KafkaCluster cluster;
+		private final String topic;
+
+		public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
+			try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
+				assignPartitions(consumer, topic);
+				while (!sink.isCancelled()) {
+					ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
+					log.info("{} records polled", records.count());
+					records.iterator()
+							.forEachRemaining(sink::next);
+				}
+			} catch (Exception e) {
+				log.error("Error occurred while consuming records", e);
+				throw new RuntimeException(e);
+			}
+		}
+
+		private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer, String topicName) {
+			List<TopicPartition> partitions =  Optional.ofNullable(cluster.getTopics().get(topicName))
+					.orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topicName))
+					.getPartitions().stream()
+					.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.getPartition()))
+					.collect(Collectors.toList());
+
+			consumer.assign(partitions);
+			// TODO: seek to requested offsets
+			consumer.seekToBeginning(partitions);
+		}
+	}
+}

+ 46 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -4,15 +4,25 @@ import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail;
 import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail;
 import com.provectus.kafka.ui.model.ServerStatus;
 import com.provectus.kafka.ui.model.ServerStatus;
+import com.provectus.kafka.ui.model.TopicMessage;
+
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Bytes;
+
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
@@ -23,6 +33,8 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 
 
 public class ClusterUtil {
 public class ClusterUtil {
 
 
+    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+
     public static <T> Mono<T> toMono(KafkaFuture<T> future){
     public static <T> Mono<T> toMono(KafkaFuture<T> future){
         return Mono.create(sink -> future.whenComplete((res, ex)->{
         return Mono.create(sink -> future.whenComplete((res, ex)->{
             if (ex!=null) {
             if (ex!=null) {
@@ -127,4 +139,38 @@ public class ClusterUtil {
         return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
         return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
     }
     }
 
 
+    public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord) {
+        OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
+        TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
+        Map<String, String> headers = new HashMap<>();
+        consumerRecord.headers().iterator()
+                .forEachRemaining(header -> headers.put(header.key(), new String(header.value())));
+
+        TopicMessage topicMessage = new TopicMessage();
+
+        topicMessage.setPartition(consumerRecord.partition());
+        topicMessage.setOffset(consumerRecord.offset());
+        topicMessage.setTimestamp(timestamp);
+        topicMessage.setTimestampType(timestampType);
+        if (consumerRecord.key() != null) {
+            topicMessage.setKey(consumerRecord.key().toString());
+        }
+        topicMessage.setHeaders(headers);
+        topicMessage.setContent(consumerRecord.value().toString());
+
+        return topicMessage;
+    }
+
+    private static TopicMessage.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
+        switch (timestampType) {
+            case CREATE_TIME:
+                return TopicMessage.TimestampTypeEnum.CREATE_TIME;
+            case LOG_APPEND_TIME:
+                return TopicMessage.TimestampTypeEnum.LOG_APPEND_TIME;
+            case NO_TIMESTAMP_TYPE:
+                return TopicMessage.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
+            default:
+                throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
+        }
+    }
 }
 }

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -13,9 +13,13 @@ import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.clients.admin.*;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple2;
@@ -240,6 +244,16 @@ public class KafkaService {
                         .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
                         .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
     }
     }
 
 
+    public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+
+        return new KafkaConsumer<>(props);
+    }
+
 
 
     @SneakyThrows
     @SneakyThrows
     private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {
     private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {

+ 3 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -61,7 +61,9 @@ public class MetricsRestController implements ApiClustersApi {
 
 
     @Override
     @Override
     public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid Integer partition, @Valid Long offset, @Valid OffsetDateTime timestamp, ServerWebExchange exchange) {
     public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid Integer partition, @Valid Long offset, @Valid OffsetDateTime timestamp, ServerWebExchange exchange) {
-        return Mono.error(new UnsupportedOperationException());
+        return Mono.just(
+                ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, partition, offset, timestamp))
+        );
     }
     }
 
 
     @Override
     @Override