|
@@ -3,14 +3,24 @@ package com.provectus.kafka.ui.cluster.util;
|
|
import com.provectus.kafka.ui.cluster.model.*;
|
|
import com.provectus.kafka.ui.cluster.model.*;
|
|
import com.provectus.kafka.ui.model.*;
|
|
import com.provectus.kafka.ui.model.*;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import com.provectus.kafka.ui.model.TopicMessage;
|
|
|
|
+
|
|
import org.apache.kafka.clients.admin.*;
|
|
import org.apache.kafka.clients.admin.*;
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
import org.apache.kafka.common.Node;
|
|
import org.apache.kafka.common.Node;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
|
|
+import org.apache.kafka.common.record.TimestampType;
|
|
|
|
+import org.apache.kafka.common.utils.Bytes;
|
|
|
|
+
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
+import java.time.Instant;
|
|
|
|
+import java.time.OffsetDateTime;
|
|
|
|
+import java.time.ZoneId;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.Stream;
|
|
@@ -23,6 +33,8 @@ public class ClusterUtil {
|
|
|
|
|
|
private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
|
|
private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
|
|
|
|
|
|
|
|
+ private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
|
|
|
|
+
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future){
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future){
|
|
return Mono.create(sink -> future.whenComplete((res, ex)->{
|
|
return Mono.create(sink -> future.whenComplete((res, ex)->{
|
|
if (ex!=null) {
|
|
if (ex!=null) {
|
|
@@ -137,6 +149,40 @@ public class ClusterUtil {
|
|
return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
|
|
return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord) {
|
|
|
|
+ OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
|
|
|
|
+ TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
|
|
|
|
+ Map<String, String> headers = new HashMap<>();
|
|
|
|
+ consumerRecord.headers().iterator()
|
|
|
|
+ .forEachRemaining(header -> headers.put(header.key(), new String(header.value())));
|
|
|
|
+
|
|
|
|
+ TopicMessage topicMessage = new TopicMessage();
|
|
|
|
+
|
|
|
|
+ topicMessage.setPartition(consumerRecord.partition());
|
|
|
|
+ topicMessage.setOffset(consumerRecord.offset());
|
|
|
|
+ topicMessage.setTimestamp(timestamp);
|
|
|
|
+ topicMessage.setTimestampType(timestampType);
|
|
|
|
+ if (consumerRecord.key() != null) {
|
|
|
|
+ topicMessage.setKey(consumerRecord.key().toString());
|
|
|
|
+ }
|
|
|
|
+ topicMessage.setHeaders(headers);
|
|
|
|
+ topicMessage.setContent(consumerRecord.value().toString());
|
|
|
|
+
|
|
|
|
+ return topicMessage;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static TopicMessage.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
|
|
|
|
+ switch (timestampType) {
|
|
|
|
+ case CREATE_TIME:
|
|
|
|
+ return TopicMessage.TimestampTypeEnum.CREATE_TIME;
|
|
|
|
+ case LOG_APPEND_TIME:
|
|
|
|
+ return TopicMessage.TimestampTypeEnum.LOG_APPEND_TIME;
|
|
|
|
+ case NO_TIMESTAMP_TYPE:
|
|
|
|
+ return TopicMessage.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
|
|
|
|
+ default:
|
|
|
|
+ throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(AdminClient adminClient) {
|
|
public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(AdminClient adminClient) {
|
|
return ClusterUtil.toMono(adminClient.describeCluster().controller())
|
|
return ClusterUtil.toMono(adminClient.describeCluster().controller())
|
|
.map(Node::id)
|
|
.map(Node::id)
|