|
@@ -4,15 +4,25 @@ import com.provectus.kafka.ui.cluster.model.*;
|
|
import com.provectus.kafka.ui.model.ConsumerGroup;
|
|
import com.provectus.kafka.ui.model.ConsumerGroup;
|
|
import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail;
|
|
import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail;
|
|
import com.provectus.kafka.ui.model.ServerStatus;
|
|
import com.provectus.kafka.ui.model.ServerStatus;
|
|
|
|
+import com.provectus.kafka.ui.model.TopicMessage;
|
|
|
|
+
|
|
import org.apache.kafka.clients.admin.ConfigEntry;
|
|
import org.apache.kafka.clients.admin.ConfigEntry;
|
|
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
|
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
|
import org.apache.kafka.clients.admin.MemberDescription;
|
|
import org.apache.kafka.clients.admin.MemberDescription;
|
|
import org.apache.kafka.clients.admin.TopicDescription;
|
|
import org.apache.kafka.clients.admin.TopicDescription;
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
|
+import org.apache.kafka.common.record.TimestampType;
|
|
|
|
+import org.apache.kafka.common.utils.Bytes;
|
|
|
|
+
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
+import java.time.Instant;
|
|
|
|
+import java.time.OffsetDateTime;
|
|
|
|
+import java.time.ZoneId;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
@@ -23,6 +33,8 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
|
|
|
|
|
|
public class ClusterUtil {
|
|
public class ClusterUtil {
|
|
|
|
|
|
|
|
+ private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
|
|
|
|
+
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future){
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future){
|
|
return Mono.create(sink -> future.whenComplete((res, ex)->{
|
|
return Mono.create(sink -> future.whenComplete((res, ex)->{
|
|
if (ex!=null) {
|
|
if (ex!=null) {
|
|
@@ -127,6 +139,40 @@ public class ClusterUtil {
|
|
return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
|
|
return serverStatus.equals(ServerStatus.ONLINE) ? 1 : 0;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public static TopicMessage mapToTopicMessage(ConsumerRecord<Bytes, Bytes> consumerRecord) {
|
|
|
|
+ OffsetDateTime timestamp = OffsetDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), UTC_ZONE_ID);
|
|
|
|
+ TopicMessage.TimestampTypeEnum timestampType = mapToTimestampType(consumerRecord.timestampType());
|
|
|
|
+ Map<String, String> headers = new HashMap<>();
|
|
|
|
+ consumerRecord.headers().iterator()
|
|
|
|
+ .forEachRemaining(header -> headers.put(header.key(), new String(header.value())));
|
|
|
|
+
|
|
|
|
+ TopicMessage topicMessage = new TopicMessage();
|
|
|
|
+
|
|
|
|
+ topicMessage.setPartition(consumerRecord.partition());
|
|
|
|
+ topicMessage.setOffset(consumerRecord.offset());
|
|
|
|
+ topicMessage.setTimestamp(timestamp);
|
|
|
|
+ topicMessage.setTimestampType(timestampType);
|
|
|
|
+ if (consumerRecord.key() != null) {
|
|
|
|
+ topicMessage.setKey(consumerRecord.key().toString());
|
|
|
|
+ }
|
|
|
|
+ topicMessage.setHeaders(headers);
|
|
|
|
+ topicMessage.setContent(consumerRecord.value().toString());
|
|
|
|
+
|
|
|
|
+ return topicMessage;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static TopicMessage.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
|
|
|
|
+ switch (timestampType) {
|
|
|
|
+ case CREATE_TIME:
|
|
|
|
+ return TopicMessage.TimestampTypeEnum.CREATE_TIME;
|
|
|
|
+ case LOG_APPEND_TIME:
|
|
|
|
+ return TopicMessage.TimestampTypeEnum.LOG_APPEND_TIME;
|
|
|
|
+ case NO_TIMESTAMP_TYPE:
|
|
|
|
+ return TopicMessage.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
|
|
|
|
+ default:
|
|
|
|
+ throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
|
|
public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
|
|
return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|