diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 25e8496abc..2b6c5bd32c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -8,6 +8,7 @@ import com.provectus.kafka.ui.kafka.KafkaService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; + import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -19,6 +20,7 @@ import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.OffsetDateTime; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,6 +32,7 @@ public class ClusterService { private final ClustersStorage clustersStorage; private final ClusterMapper clusterMapper; private final KafkaService kafkaService; + private final ConsumingService consumingService; public List getClusters() { return clustersStorage.getKafkaClusters() @@ -54,15 +57,17 @@ public class ClusterService { } public Optional getTopicDetails(String name, String topicName) { - return clustersStorage.getClusterByName(name).flatMap( - c -> Optional.ofNullable(c.getTopics().get(topicName)) - ).map(clusterMapper::toTopicDetails); + return clustersStorage.getClusterByName(name) + .map(KafkaCluster::getTopics) + .map(t -> t.get(topicName)) + .map(clusterMapper::toTopicDetails); } public Optional> getTopicConfigs(String name, String topicName) { - return clustersStorage.getClusterByName(name).flatMap( - c -> Optional.ofNullable(c.getTopics().get(topicName)) - ).map( t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList())); + return clustersStorage.getClusterByName(name) + .map(KafkaCluster::getTopics) + .map(t -> t.get(topicName)) + .map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList())); } public Mono createTopic(String name, Mono topicFormData) { @@ -135,4 +140,11 @@ public class ClusterService { }).collect(Collectors.toList()))) .flatMapMany(Flux::fromIterable); } + + public Flux getMessages(String clusterName, String topicName, Integer partition, Long offset, OffsetDateTime timestamp) { + return clustersStorage.getClusterByName(clusterName) + .map(c -> consumingService.loadMessages(c, topicName)) + .orElse(Flux.empty()); + + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java new file mode 100644 index 0000000000..0fcf15f23b --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -0,0 +1,83 @@ +package com.provectus.kafka.ui.cluster.service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.springframework.stereotype.Service; + +import com.provectus.kafka.ui.cluster.model.InternalTopic; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.cluster.util.ClusterUtil; +import com.provectus.kafka.ui.kafka.KafkaService; +import com.provectus.kafka.ui.model.TopicMessage; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.scheduler.Schedulers; + +@Service +@Log4j2 +@RequiredArgsConstructor +public class ConsumingService { + + + // TODO: make this configurable + private static final int BATCH_SIZE = 20; + + private final KafkaService kafkaService; + + public Flux loadMessages(KafkaCluster cluster, String topic) { + RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic); + return Flux.create(emitter::emit) + .subscribeOn(Schedulers.boundedElastic()) + .map(ClusterUtil::mapToTopicMessage) + .limitRequest(BATCH_SIZE); + } + + @RequiredArgsConstructor + private static class RecordEmitter { + + private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); + + private final KafkaService kafkaService; + private final KafkaCluster cluster; + private final String topic; + + public void emit(FluxSink> sink) { + try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { + assignPartitions(consumer, topic); + while (!sink.isCancelled()) { + ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); + log.info("{} records polled", records.count()); + records.iterator() + .forEachRemaining(sink::next); + } + } catch (Exception e) { + log.error("Error occurred while consuming records", e); + throw new RuntimeException(e); + } + } + + private void assignPartitions(KafkaConsumer consumer, String topicName) { + List partitions = Optional.ofNullable(cluster.getTopics().get(topicName)) + .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topicName)) + .getPartitions().stream() + .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.getPartition())) + .collect(Collectors.toList()); + + consumer.assign(partitions); + // TODO: seek to requested offsets + consumer.seekToBeginning(partitions); + } + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java index 1f73a14851..af10406c20 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java @@ -4,15 +4,25 @@ import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail; import com.provectus.kafka.ui.model.ServerStatus; +import com.provectus.kafka.ui.model.TopicMessage; + import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.Bytes; + import reactor.core.publisher.Mono; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -23,6 +33,8 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_ public class ClusterUtil { + private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); + public static Mono toMono(KafkaFuture future){ return Mono.create(sink -> future.whenComplete((res, ex)->{ if (ex!=null) { @@ -127,4 +139,38 @@ public class ClusterUtil { return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0; } + public static TopicMessage mapToTopicMessage(ConsumerRecord consumerRecord) { + OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID); + TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType()); + Map headers = new HashMap<>(); + consumerRecord.headers().iterator() + .forEachRemaining(header -> headers.put(header.key(), new String(header.value()))); + + TopicMessage topicMessage = new TopicMessage(); + + topicMessage.setPartition(consumerRecord.partition()); + topicMessage.setOffset(consumerRecord.offset()); + topicMessage.setTimestamp(timestamp); + topicMessage.setTimestampType(timestampType); + if (consumerRecord.key() != null) { + topicMessage.setKey(consumerRecord.key().toString()); + } + topicMessage.setHeaders(headers); + topicMessage.setContent(consumerRecord.value().toString()); + + return topicMessage; + } + + private static TopicMessage.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) { + switch (timestampType) { + case CREATE_TIME: + return TopicMessage.TimestampTypeEnum.CREATE_TIME; + case LOG_APPEND_TIME: + return TopicMessage.TimestampTypeEnum.LOG_APPEND_TIME; + case NO_TIMESTAMP_TYPE: + return TopicMessage.TimestampTypeEnum.NO_TIMESTAMP_TYPE; + default: + throw new IllegalArgumentException("Unknown timestampType: " + timestampType); + } + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index a4990b6f5d..c98ec42a0f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -13,9 +13,13 @@ import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -240,6 +244,16 @@ public class KafkaService { .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList())); } + public KafkaConsumer createConsumer(KafkaCluster cluster) { + Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + + return new KafkaConsumer<>(props); + } + @SneakyThrows private Mono createTopic(AdminClient adminClient, NewTopic newTopic) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 1e9d6e1083..fce720040b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -61,7 +61,9 @@ public class MetricsRestController implements ApiClustersApi { @Override public Mono>> getTopicMessages(String clusterName, String topicName, @Valid Integer partition, @Valid Long offset, @Valid OffsetDateTime timestamp, ServerWebExchange exchange) { - return Mono.error(new UnsupportedOperationException()); + return Mono.just( + ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, partition, offset, timestamp)) + ); } @Override