diff --git a/kafka-ui-api/Dockerfile b/kafka-ui-api/Dockerfile index 147e42bd914cf48d01e0789540b54b645ed9a0dc..15cb59717a07219420a89be29d319fcb990e1d2b 100644 --- a/kafka-ui-api/Dockerfile +++ b/kafka-ui-api/Dockerfile @@ -1,4 +1,4 @@ -FROM openjdk:13 +FROM adoptopenjdk/openjdk13:x86_64-alpine-jre-13.0.2_8 VOLUME /tmp ARG JAR_FILE COPY "/target/${JAR_FILE}" "/kafka-ui-api.jar" diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 11c07f77d537ed2bda686c467a6daf30f059a858..28caf8b9f3c5764e40e75cacf069be2e873b7891 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -46,20 +46,15 @@ kafka-ui-contract ${project.version} - - org.springdoc - springdoc-openapi-webflux-ui - ${springdoc-openapi-webflux-ui.version} - org.apache.kafka kafka-clients ${kafka-clients.version} - org.apache.kafka - kafka_2.13 - ${kafka.version} + org.apache.commons + commons-lang3 + 3.9 org.apache.zookeeper diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CorsGlobalConfiguration.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CorsGlobalConfiguration.java index b54f9b06f668a61e6623162813d07e373681d603..bbb743daac1c829f92f190f7effbeb869440cb67 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CorsGlobalConfiguration.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CorsGlobalConfiguration.java @@ -1,14 +1,10 @@ package com.provectus.kafka.ui.config; -import static org.springdoc.core.Constants.CLASSPATH_RESOURCE_LOCATION; - import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.web.reactive.config.CorsRegistry; import org.springframework.web.reactive.config.EnableWebFlux; -import org.springframework.web.reactive.config.ResourceHandlerRegistry; import org.springframework.web.reactive.config.WebFluxConfigurer; -import org.springframework.web.reactive.resource.WebJarsResourceResolver; @Configuration @EnableWebFlux @@ -23,12 +19,4 @@ public class CorsGlobalConfiguration implements WebFluxConfigurer { .allowedHeaders("*") .allowCredentials(true); } - - @Override - public void addResourceHandlers(ResourceHandlerRegistry registry) { - registry.addResourceHandler("/webjars/**") - .addResourceLocations(CLASSPATH_RESOURCE_LOCATION + "/webjars/") - .resourceChain(true) - .addResolver(new WebJarsResourceResolver()); - } } \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index 830168559cf14e41548c9f951f7f30837e5a1624..e706c2132e3e1ea82f25d84ea6a7a481a51b6f61 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -5,7 +5,7 @@ import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.SeekType; -import com.provectus.kafka.ui.model.TopicMessage; +import com.provectus.kafka.ui.model.TopicMessageEvent; import com.provectus.kafka.ui.model.TopicMessageSchema; import com.provectus.kafka.ui.service.ClusterService; import java.util.Collections; @@ -40,15 +40,17 @@ public class MessagesController implements MessagesApi { ).map(ResponseEntity::ok); } - @Override - public Mono>> getTopicMessages( + public Mono>> getTopicMessages( String clusterName, String topicName, @Valid SeekType seekType, @Valid List seekTo, @Valid Integer limit, @Valid String q, @Valid SeekDirection seekDirection, ServerWebExchange exchange) { return parseConsumerPosition(topicName, seekType, seekTo, seekDirection) - .map(consumerPosition -> ResponseEntity - .ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit))); + .map(position -> + ResponseEntity.ok( + clusterService.getMessages(clusterName, topicName, position, q, limit) + ) + ); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java new file mode 100644 index 0000000000000000000000000000000000000000..c1e139660d1e730bb58572c623ebb452ab0877cd --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -0,0 +1,81 @@ +package com.provectus.kafka.ui.emitter; + +import com.provectus.kafka.ui.model.TopicMessage; +import com.provectus.kafka.ui.model.TopicMessageConsuming; +import com.provectus.kafka.ui.model.TopicMessageEvent; +import com.provectus.kafka.ui.model.TopicMessagePhase; +import com.provectus.kafka.ui.serde.RecordSerDe; +import com.provectus.kafka.ui.util.ClusterUtil; +import java.time.Duration; +import java.time.Instant; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.utils.Bytes; +import reactor.core.publisher.FluxSink; + +public abstract class AbstractEmitter { + private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); + + private final RecordSerDe recordDeserializer; + private long bytes = 0; + private int records = 0; + private long elapsed = 0; + + public AbstractEmitter(RecordSerDe recordDeserializer) { + this.recordDeserializer = recordDeserializer; + } + + protected ConsumerRecords poll( + FluxSink sink, Consumer consumer) { + Instant start = Instant.now(); + ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); + Instant finish = Instant.now(); + sendConsuming(sink, records, Duration.between(start, finish).toMillis()); + return records; + } + + protected FluxSink sendMessage(FluxSink sink, + ConsumerRecord msg) { + final TopicMessage topicMessage = ClusterUtil.mapToTopicMessage(msg, recordDeserializer); + return sink.next( + new TopicMessageEvent() + .type(TopicMessageEvent.TypeEnum.MESSAGE) + .message(topicMessage) + ); + } + + protected void sendPhase(FluxSink sink, String name) { + sink.next( + new TopicMessageEvent() + .type(TopicMessageEvent.TypeEnum.PHASE) + .phase(new TopicMessagePhase().name(name)) + ); + } + + protected void sendConsuming(FluxSink sink, + ConsumerRecords records, + long elapsed) { + for (ConsumerRecord record : records) { + for (Header header : record.headers()) { + bytes += + (header.key() != null ? header.key().getBytes().length : 0L) + + (header.value() != null ? header.value().length : 0L); + } + bytes += record.serializedKeySize() + record.serializedValueSize(); + } + this.records += records.count(); + this.elapsed += elapsed; + final TopicMessageConsuming consuming = new TopicMessageConsuming() + .bytesConsumed(this.bytes) + .elapsedMs(this.elapsed) + .isCancelled(sink.isCancelled()) + .messagesConsumed(this.records); + sink.next( + new TopicMessageEvent() + .type(TopicMessageEvent.TypeEnum.CONSUMING) + .consuming(consuming) + ); + } +} \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index 8bf2bab547fe2be51c5a64be572ce46c69fab5ee..034ab4b84953a62c603562cfdf7a5a3f626327e9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -1,13 +1,16 @@ package com.provectus.kafka.ui.emitter; +import com.provectus.kafka.ui.model.TopicMessageEvent; +import com.provectus.kafka.ui.serde.RecordSerDe; import com.provectus.kafka.ui.util.OffsetsSeekBackward; -import java.time.Duration; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.function.Function; import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -17,36 +20,50 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import reactor.core.publisher.FluxSink; -@RequiredArgsConstructor @Log4j2 public class BackwardRecordEmitter - implements java.util.function.Consumer>> { - - private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); + extends AbstractEmitter + implements java.util.function.Consumer> { private final Function, KafkaConsumer> consumerSupplier; private final OffsetsSeekBackward offsetsSeek; + public BackwardRecordEmitter( + Function, KafkaConsumer> consumerSupplier, + OffsetsSeekBackward offsetsSeek, + RecordSerDe recordDeserializer) { + super(recordDeserializer); + this.offsetsSeek = offsetsSeek; + this.consumerSupplier = consumerSupplier; + } + @Override - public void accept(FluxSink> sink) { + public void accept(FluxSink sink) { try (KafkaConsumer configConsumer = consumerSupplier.apply(Map.of())) { final List requestedPartitions = offsetsSeek.getRequestedPartitions(configConsumer); + sendPhase(sink, "Request partitions"); final int msgsPerPartition = offsetsSeek.msgsPerPartition(requestedPartitions.size()); try (KafkaConsumer consumer = consumerSupplier.apply( Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, msgsPerPartition) ) ) { - final Map partitionsOffsets = - offsetsSeek.getPartitionsOffsets(consumer); + sendPhase(sink, "Created consumer"); + + SortedMap partitionsOffsets = + new TreeMap<>(Comparator.comparingInt(TopicPartition::partition)); + partitionsOffsets.putAll(offsetsSeek.getPartitionsOffsets(consumer)); + + sendPhase(sink, "Requested partitions offsets"); log.debug("partition offsets: {}", partitionsOffsets); var waitingOffsets = offsetsSeek.waitingOffsets(consumer, partitionsOffsets.keySet()); - log.debug("waittin offsets {} {}", + log.debug("waiting offsets {} {}", waitingOffsets.getBeginOffsets(), waitingOffsets.getEndOffsets() ); + while (!sink.isCancelled() && !waitingOffsets.beginReached()) { for (Map.Entry entry : partitionsOffsets.entrySet()) { final Long lowest = waitingOffsets.getBeginOffsets().get(entry.getKey().partition()); @@ -55,7 +72,10 @@ public class BackwardRecordEmitter final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition); log.debug("Polling {} from {}", entry.getKey(), offset); consumer.seek(entry.getKey(), offset); - ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); + sendPhase(sink, + String.format("Consuming partition: %s from %s", entry.getKey(), offset) + ); + final ConsumerRecords records = poll(sink, consumer); final List> partitionRecords = records.records(entry.getKey()).stream() .filter(r -> r.offset() < partitionsOffsets.get(entry.getKey())) @@ -73,7 +93,7 @@ public class BackwardRecordEmitter for (ConsumerRecord msg : partitionRecords) { if (!sink.isCancelled() && !waitingOffsets.beginReached()) { - sink.next(msg); + sendMessage(sink, msg); waitingOffsets.markPolled(msg); } else { log.info("Begin reached"); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java index 2b007dcc7090413477abc142bf75808ce6d5f118..4df7428107ed983172cf0d67f33dde902ba91d8d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -1,9 +1,11 @@ package com.provectus.kafka.ui.emitter; +import com.provectus.kafka.ui.model.TopicMessageEvent; +import com.provectus.kafka.ui.serde.RecordSerDe; import com.provectus.kafka.ui.util.OffsetsSeek; import java.time.Duration; +import java.time.Instant; import java.util.function.Supplier; -import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -11,33 +13,43 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.utils.Bytes; import reactor.core.publisher.FluxSink; -@RequiredArgsConstructor @Log4j2 public class ForwardRecordEmitter - implements java.util.function.Consumer>> { + extends AbstractEmitter + implements java.util.function.Consumer> { private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); private final Supplier> consumerSupplier; private final OffsetsSeek offsetsSeek; + public ForwardRecordEmitter( + Supplier> consumerSupplier, + OffsetsSeek offsetsSeek, + RecordSerDe recordDeserializer) { + super(recordDeserializer); + this.consumerSupplier = consumerSupplier; + this.offsetsSeek = offsetsSeek; + } + @Override - public void accept(FluxSink> sink) { + public void accept(FluxSink sink) { try (KafkaConsumer consumer = consumerSupplier.get()) { + sendPhase(sink, "Assigning partitions"); var waitingOffsets = offsetsSeek.assignAndSeek(consumer); while (!sink.isCancelled() && !waitingOffsets.endReached()) { - ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); + sendPhase(sink, "Polling"); + ConsumerRecords records = poll(sink, consumer); log.info("{} records polled", records.count()); for (ConsumerRecord msg : records) { if (!sink.isCancelled() && !waitingOffsets.endReached()) { - sink.next(msg); + sendMessage(sink, msg); waitingOffsets.markPolled(msg); } else { break; } } - } sink.complete(); log.info("Polling finished"); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java index 6a1129d7f784614bc1027aa6e981d5ac0816bea3..0726c3cc7d0bd6b609c611b5b719c33ec715d72a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java @@ -5,6 +5,9 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.util.JsonFormat; import com.provectus.kafka.ui.model.MessageSchema; import com.provectus.kafka.ui.model.TopicMessageSchema; +import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat; +import com.provectus.kafka.ui.serde.schemaregistry.MessageFormatter; +import com.provectus.kafka.ui.util.ConsumerRecordUtil; import com.provectus.kafka.ui.util.jsonschema.JsonSchema; import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; @@ -44,10 +47,16 @@ public class ProtobufFileRecordSerDe implements RecordSerDe { @Override public DeserializedKeyValue deserialize(ConsumerRecord msg) { try { - return new DeserializedKeyValue( - msg.key() != null ? new String(msg.key().get()) : null, - msg.value() != null ? parse(msg.value().get()) : null - ); + var builder = DeserializedKeyValue.builder(); + if (msg.key() != null) { + builder.key(new String(msg.key().get())); + builder.keyFormat(MessageFormat.UNKNOWN); + } + if (msg.value() != null) { + builder.value(parse(msg.value().get())); + builder.valueFormat(MessageFormat.PROTOBUF); + } + return builder.build(); } catch (Throwable e) { throw new RuntimeException("Failed to parse record from topic " + msg.topic(), e); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/RecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/RecordSerDe.java index 60a71081846fc9ff330371a06a195463c122eba5..432cf4c9df0bc19a78963eb898ff3091cdb7305a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/RecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/RecordSerDe.java @@ -1,7 +1,9 @@ package com.provectus.kafka.ui.serde; import com.provectus.kafka.ui.model.TopicMessageSchema; +import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat; import javax.annotation.Nullable; +import lombok.Builder; import lombok.Value; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; @@ -10,9 +12,14 @@ import org.apache.kafka.common.utils.Bytes; public interface RecordSerDe { @Value + @Builder class DeserializedKeyValue { @Nullable String key; @Nullable String value; + @Nullable MessageFormat keyFormat; + @Nullable MessageFormat valueFormat; + @Nullable String keySchemaId; + @Nullable String valueSchemaId; } DeserializedKeyValue deserialize(ConsumerRecord msg); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java index b838709eb52dec30d9571b85bf16e31721b05ae0..47233e899900bb00339f2b7953b25be01c657062 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java @@ -3,6 +3,8 @@ package com.provectus.kafka.ui.serde; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.model.MessageSchema; import com.provectus.kafka.ui.model.TopicMessageSchema; +import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat; +import com.provectus.kafka.ui.util.ConsumerRecordUtil; import com.provectus.kafka.ui.util.jsonschema.JsonSchema; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -13,10 +15,16 @@ public class SimpleRecordSerDe implements RecordSerDe { @Override public DeserializedKeyValue deserialize(ConsumerRecord msg) { - return new DeserializedKeyValue( - msg.key() != null ? new String(msg.key().get()) : null, - msg.value() != null ? new String(msg.value().get()) : null - ); + var builder = DeserializedKeyValue.builder(); + if (msg.key() != null) { + builder.key(new String(msg.key().get())) + .keyFormat(MessageFormat.UNKNOWN); + } + if (msg.value() != null) { + builder.value(new String(msg.value().get())) + .valueFormat(MessageFormat.UNKNOWN); + } + return builder.build(); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageFormatter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageFormatter.java index b0bc920359fe8c62537a9c0efda526bdda4c9cbf..817ade0fb51a4dade9b139d03ccf552c6c6d23ec 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageFormatter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/AvroMessageFormatter.java @@ -20,4 +20,9 @@ public class AvroMessageFormatter implements MessageFormatter { byte[] jsonBytes = AvroSchemaUtils.toJson(avroRecord); return new String(jsonBytes); } + + @Override + public MessageFormat getFormat() { + return MessageFormat.AVRO; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageFormatter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageFormatter.java index 3435851b4ee6dcc8a7b0436526bf5810f35e8c5d..5127285e509a17d28c3e7909583ecb9018a80d66 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageFormatter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonSchemaMessageFormatter.java @@ -17,4 +17,9 @@ public class JsonSchemaMessageFormatter implements MessageFormatter { JsonNode json = jsonSchemaDeserializer.deserialize(topic, value); return json.toString(); } + + @Override + public MessageFormat getFormat() { + return MessageFormat.JSON; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java index b1e875b76099f9568a5f572df6f1cf162603f507..db0e6a4a36034329e29c679dad31fa02943e9fc6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java @@ -3,5 +3,6 @@ package com.provectus.kafka.ui.serde.schemaregistry; public enum MessageFormat { AVRO, JSON, - PROTOBUF + PROTOBUF, + UNKNOWN } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormatter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormatter.java index c50b17774498627f4f5d1f40f609027dc2a32d6e..e7fec895e73e36f96296c86ac5de47e8caeb620d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormatter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormatter.java @@ -2,4 +2,8 @@ package com.provectus.kafka.ui.serde.schemaregistry; public interface MessageFormatter { String format(String topic, byte[] value); + + default MessageFormat getFormat() { + return MessageFormat.UNKNOWN; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageFormatter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageFormatter.java index 119b98ed5f7a862420752cf5ae3cf21ac4bd5f4f..adfdaf031130ee3c7be32601dbf336014112310e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageFormatter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageFormatter.java @@ -20,4 +20,9 @@ public class ProtobufMessageFormatter implements MessageFormatter { byte[] jsonBytes = ProtobufSchemaUtils.toJson(message); return new String(jsonBytes); } + + @Override + public MessageFormat getFormat() { + return MessageFormat.PROTOBUF; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java index 747f5c61afa857cc6ae562cfc6d7a4201d302701..fd58804760e494acc26bee66f01ec1634af661f5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java @@ -10,6 +10,7 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.MessageSchema; import com.provectus.kafka.ui.model.TopicMessageSchema; import com.provectus.kafka.ui.serde.RecordSerDe; +import com.provectus.kafka.ui.util.ConsumerRecordUtil; import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter; import com.provectus.kafka.ui.util.jsonschema.JsonSchema; import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter; @@ -114,14 +115,28 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { public DeserializedKeyValue deserialize(ConsumerRecord msg) { try { - return new DeserializedKeyValue( - msg.key() != null - ? getMessageFormatter(msg, true).format(msg.topic(), msg.key().get()) - : null, - msg.value() != null - ? getMessageFormatter(msg, false).format(msg.topic(), msg.value().get()) - : null - ); + var builder = DeserializedKeyValue.builder(); + if (msg.key() != null) { + MessageFormatter messageFormatter = getMessageFormatter(msg, true); + builder.key(messageFormatter.format(msg.topic(), msg.key().get())); + builder.keyFormat(messageFormatter.getFormat()); + builder.keySchemaId( + getSchemaId(msg.key(), messageFormatter.getFormat()) + .map(String::valueOf) + .orElse(null) + ); + } + if (msg.value() != null) { + MessageFormatter messageFormatter = getMessageFormatter(msg, false); + builder.value(messageFormatter.format(msg.topic(), msg.value().get())); + builder.valueFormat(messageFormatter.getFormat()); + builder.valueSchemaId( + getSchemaId(msg.value(), messageFormatter.getFormat()) + .map(String::valueOf) + .orElse(null) + ); + } + return builder.build(); } catch (Throwable e) { throw new RuntimeException("Failed to parse record from topic " + msg.topic(), e); } @@ -293,6 +308,16 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { return result; } + private Optional getSchemaId(Bytes value, MessageFormat format) { + if (format != MessageFormat.AVRO + && format != MessageFormat.PROTOBUF + && format != MessageFormat.JSON) { + return Optional.empty(); + } + ByteBuffer buffer = ByteBuffer.wrap(value.get()); + return buffer.get() == 0 ? Optional.of(buffer.getInt()) : Optional.empty(); + } + @SneakyThrows private Optional getSchemaBySubject(String topic, boolean isKey) { return Optional.ofNullable(schemaRegistryClient) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 37938f9c73c74346b3f844a2e55d45a825e2c7b3..df417b234011cce140fe98a471a73af0a2dfcac3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -32,6 +32,7 @@ import com.provectus.kafka.ui.model.TopicConfig; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicDetails; import com.provectus.kafka.ui.model.TopicMessage; +import com.provectus.kafka.ui.model.TopicMessageEvent; import com.provectus.kafka.ui.model.TopicMessageSchema; import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.model.TopicsResponse; @@ -160,9 +161,7 @@ public class ClusterService { public Optional getTopicDetails(String name, String topicName) { return clustersStorage.getClusterByName(name) .flatMap(c -> - Optional.ofNullable( - c.getTopics().get(topicName) - ).map( + Optional.ofNullable(c.getTopics()).map(l -> l.get(topicName)).map( t -> t.toBuilder().partitions( kafkaService.getTopicPartitions(c, t) ).build() @@ -275,7 +274,7 @@ public class ClusterService { .orElse(Mono.error(new ClusterNotFoundException())); } - public Flux getMessages(String clusterName, String topicName, + public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) { return clustersStorage.getClusterByName(clusterName) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java index 91ce6b534ac339fccdbceb14e7e6aeaa1b2f5eb0..f209650936adc4dc82ad3aad04197fd0a8dafdf9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java @@ -7,9 +7,10 @@ import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.TopicMessage; +import com.provectus.kafka.ui.model.TopicMessageEvent; import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.serde.RecordSerDe; -import com.provectus.kafka.ui.util.ClusterUtil; +import com.provectus.kafka.ui.util.FilterTopicMessageEvents; import com.provectus.kafka.ui.util.OffsetsSeekBackward; import com.provectus.kafka.ui.util.OffsetsSeekForward; import java.util.Collection; @@ -19,13 +20,12 @@ import java.util.Optional; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; @@ -43,32 +43,34 @@ public class ConsumingService { private final DeserializationService deserializationService; private final ObjectMapper objectMapper = new ObjectMapper(); - public Flux loadMessages(KafkaCluster cluster, String topic, - ConsumerPosition consumerPosition, String query, - Integer limit) { + public Flux loadMessages(KafkaCluster cluster, String topic, + ConsumerPosition consumerPosition, String query, + Integer limit) { int recordsLimit = Optional.ofNullable(limit) .map(s -> Math.min(s, MAX_RECORD_LIMIT)) .orElse(DEFAULT_RECORD_LIMIT); - java.util.function.Consumer>> emitter; + java.util.function.Consumer> emitter; + RecordSerDe recordDeserializer = + deserializationService.getRecordDeserializerForCluster(cluster); if (consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD)) { emitter = new ForwardRecordEmitter( () -> kafkaService.createConsumer(cluster), - new OffsetsSeekForward(topic, consumerPosition) + new OffsetsSeekForward(topic, consumerPosition), + recordDeserializer ); } else { emitter = new BackwardRecordEmitter( (Map props) -> kafkaService.createConsumer(cluster, props), - new OffsetsSeekBackward(topic, consumerPosition, recordsLimit) + new OffsetsSeekBackward(topic, consumerPosition, recordsLimit), + recordDeserializer ); } - RecordSerDe recordDeserializer = - deserializationService.getRecordDeserializerForCluster(cluster); return Flux.create(emitter) - .subscribeOn(Schedulers.boundedElastic()) - .map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer)) .filter(m -> filterTopicMessage(m, query)) - .limitRequest(recordsLimit); + .takeWhile(new FilterTopicMessageEvents(recordsLimit)) + .subscribeOn(Schedulers.elastic()) + .share(); } public Mono> offsetsForDeletion(KafkaCluster cluster, String topicName, @@ -102,12 +104,16 @@ public class ConsumingService { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - private boolean filterTopicMessage(TopicMessage message, String query) { - if (StringUtils.isEmpty(query)) { + private boolean filterTopicMessage(TopicMessageEvent message, String query) { + log.info("filter"); + if (StringUtils.isEmpty(query) + || !message.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) { return true; } - return (StringUtils.isNotEmpty(message.getKey()) && message.getKey().contains(query)) - || (StringUtils.isNotEmpty(message.getContent()) && message.getContent().contains(query)); + + final TopicMessage msg = message.getMessage(); + return (!StringUtils.isEmpty(msg.getKey()) && msg.getKey().contains(query)) + || (!StringUtils.isEmpty(msg.getContent()) && msg.getContent().contains(query)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 4829b5789573f968d54bf362cfe6fdec84d6fbf5..109ed6d35610e731378005323a62eb25a83fbeb3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -30,10 +30,11 @@ import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; @Service @Log4j2 @@ -59,7 +60,7 @@ public class KafkaConnectService { return getConnects(clusterName) .flatMapMany(Function.identity()) .flatMap(connect -> getConnectorNames(clusterName, connect)) - .flatMap(pair -> getConnector(clusterName, pair.getLeft(), pair.getRight())) + .flatMap(pair -> getConnector(clusterName, pair.getT1(), pair.getT2())) .flatMap(connector -> getConnectorConfig(clusterName, connector.getConnect(), connector.getName()) .map(config -> InternalConnectInfo.builder() @@ -96,19 +97,19 @@ public class KafkaConnectService { private Predicate matchesSearchTerm(final String search) { return (connector) -> getSearchValues(connector) - .anyMatch(value -> value.contains( - StringUtils.defaultString( - search, - StringUtils.EMPTY) - .toUpperCase())); + .anyMatch(value -> value.contains( + StringUtils.defaultString( + search, + StringUtils.EMPTY) + .toUpperCase())); } private Stream getSearchValues(FullConnectorInfo fullConnectorInfo) { return Stream.of( - fullConnectorInfo.getName(), - fullConnectorInfo.getStatus().getState().getValue(), - fullConnectorInfo.getType().getValue()) - .map(String::toUpperCase); + fullConnectorInfo.getName(), + fullConnectorInfo.getStatus().getState().getValue(), + fullConnectorInfo.getType().getValue()) + .map(String::toUpperCase); } private Mono getConnectorTopics(String clusterName, String connectClusterName, @@ -121,13 +122,13 @@ public class KafkaConnectService { ); } - private Flux> getConnectorNames(String clusterName, Connect connect) { + private Flux> getConnectorNames(String clusterName, Connect connect) { return getConnectors(clusterName, connect.getName()) .collectList().map(e -> e.get(0)) // for some reason `getConnectors` method returns the response as a single string .map(this::parseToList) .flatMapMany(Flux::fromIterable) - .map(connector -> Pair.of(connect.getName(), connector)); + .map(connector -> Tuples.of(connect.getName(), connector)); } @SneakyThrows diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index 3b51318ab54499bff18d2bfc7c3ae4aa3ae1f35d..5e3aa8aa190584fdd503363a133c038f061fdf71 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -81,6 +81,9 @@ import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.LogDirNotFoundException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesDeserializer; @@ -110,7 +113,10 @@ public class KafkaService { private int clientTimeout; public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) { - final Map topics = new HashMap<>(cluster.getTopics()); + final Map topics = + Optional.ofNullable(cluster.getTopics()).map( + t -> new HashMap<>(cluster.getTopics()) + ).orElse(new HashMap<>()); topics.put(updatedTopic.getName(), updatedTopic); return cluster.toBuilder().topics(topics).build(); } @@ -160,8 +166,8 @@ public class KafkaService { Throwable zookeeperException = null; try { zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster) - ? ServerStatus.ONLINE - : ServerStatus.OFFLINE; + ? ServerStatus.ONLINE + : ServerStatus.OFFLINE; } catch (Throwable e) { zookeeperException = e; } @@ -338,7 +344,7 @@ public class KafkaService { .collect(Collectors.toList()); return ClusterUtil.toMono(adminClient.describeConfigs(resources, - new DescribeConfigsOptions().includeSynonyms(true)).all()) + new DescribeConfigsOptions().includeSynonyms(true)).all()) .map(configs -> configs.entrySet().stream().collect(Collectors.toMap( c -> c.getKey().name(), @@ -391,8 +397,8 @@ public class KafkaService { getConsumerGroupsInternal( cluster, s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())) - ) - ); + ) + ); } public Mono> getConsumerGroupsInternal( @@ -425,17 +431,17 @@ public class KafkaService { } return consumerGroups.map(c -> - c.stream() - .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic)) - .filter(Optional::isPresent) - .map(Optional::get) - .map(g -> - g.toBuilder().endOffsets( - topicPartitionsEndOffsets(cluster, g.getOffsets().keySet()) - ).build() - ) - .collect(Collectors.toList()) - ); + c.stream() + .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(g -> + g.toBuilder().endOffsets( + topicPartitionsEndOffsets(cluster, g.getOffsets().keySet()) + ).build() + ) + .collect(Collectors.toList()) + ); } public Mono> groupMetadata(KafkaCluster cluster, @@ -736,12 +742,18 @@ public class KafkaService { properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); try (KafkaProducer producer = new KafkaProducer<>(properties)) { - final ProducerRecord producerRecord = serde.serialize( + ProducerRecord producerRecord = serde.serialize( topic, msg.getKey(), msg.getContent(), msg.getPartition() ); + producerRecord = new ProducerRecord<>( + producerRecord.topic(), + producerRecord.partition(), + producerRecord.key(), + producerRecord.value(), + createHeaders(msg.getHeaders())); CompletableFuture cf = new CompletableFuture<>(); producer.send(producerRecord, (metadata, exception) -> { @@ -755,6 +767,15 @@ public class KafkaService { } } + private Iterable
createHeaders(Map clientHeaders) { + if (clientHeaders == null) { + return null; + } + RecordHeaders headers = new RecordHeaders(); + clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes()))); + return headers; + } + private Mono increaseTopicPartitions(AdminClient adminClient, String topicName, Map newPartitionsMap @@ -949,7 +970,7 @@ public class KafkaService { } public Mono updateBrokerLogDir(KafkaCluster cluster, Integer broker, - BrokerLogdirUpdate brokerLogDir) { + BrokerLogdirUpdate brokerLogDir) { return getOrCreateAdminClient(cluster) .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker)); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index 4cd10f4207b513f8bf334bf9d82f9c5687ebf557..4b67335a5ad6e16d94eec8bf412ba525a47e505a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -15,6 +15,7 @@ import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; +import com.provectus.kafka.ui.model.MessageFormat; import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.serde.RecordSerDe; @@ -299,6 +300,17 @@ public class ClusterUtil { var parsed = recordDeserializer.deserialize(consumerRecord); topicMessage.setKey(parsed.getKey()); topicMessage.setContent(parsed.getValue()); + topicMessage.setKeyFormat(parsed.getKeyFormat() != null + ? MessageFormat.valueOf(parsed.getKeyFormat().name()) + : null); + topicMessage.setValueFormat(parsed.getValueFormat() != null + ? MessageFormat.valueOf(parsed.getValueFormat().name()) + : null); + topicMessage.setKeySize(ConsumerRecordUtil.getKeySize(consumerRecord)); + topicMessage.setValueSize(ConsumerRecordUtil.getValueSize(consumerRecord)); + topicMessage.setKeySchemaId(parsed.getKeySchemaId()); + topicMessage.setValueSchemaId(parsed.getValueSchemaId()); + topicMessage.setHeadersSize(ConsumerRecordUtil.getHeadersSize(consumerRecord)); return topicMessage; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..a671659989111b07a0efa12b92780571636d7fd5 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordUtil.java @@ -0,0 +1,37 @@ +package com.provectus.kafka.ui.util; + +import java.util.Arrays; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.Bytes; + +public class ConsumerRecordUtil { + + private ConsumerRecordUtil() { + } + + public static Long getHeadersSize(ConsumerRecord consumerRecord) { + Headers headers = consumerRecord.headers(); + if (headers != null) { + return Arrays.stream(consumerRecord.headers().toArray()) + .mapToLong(ConsumerRecordUtil::headerSize) + .sum(); + } + return 0L; + } + + public static Long getKeySize(ConsumerRecord consumerRecord) { + return consumerRecord.key() != null ? (long) consumerRecord.key().get().length : null; + } + + public static Long getValueSize(ConsumerRecord consumerRecord) { + return consumerRecord.value() != null ? (long) consumerRecord.value().get().length : null; + } + + private static int headerSize(Header header) { + int key = header.key() != null ? header.key().getBytes().length : 0; + int val = header.value() != null ? header.value().length : 0; + return key + val; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/FilterTopicMessageEvents.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/FilterTopicMessageEvents.java new file mode 100644 index 0000000000000000000000000000000000000000..b8cdb75f1bc325c95ab9aee757e70fb19966fd7b --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/FilterTopicMessageEvents.java @@ -0,0 +1,25 @@ +package com.provectus.kafka.ui.util; + +import com.provectus.kafka.ui.model.TopicMessageEvent; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +public class FilterTopicMessageEvents implements Predicate { + private final AtomicInteger processed = new AtomicInteger(); + private final int limit; + + public FilterTopicMessageEvents(int limit) { + this.limit = limit; + } + + @Override + public boolean test(TopicMessageEvent event) { + if (event.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) { + final int i = processed.incrementAndGet(); + if (i > limit) { + return false; + } + } + return true; + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java index 6948c64684d16c6a377c3aa7c8713f56a6047f38..d4b914ee4b8b426c46c7d8779fff23683157f2b0 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java @@ -1,12 +1,15 @@ package com.provectus.kafka.ui; +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.http.MediaType.TEXT_EVENT_STREAM; + import com.provectus.kafka.ui.api.model.TopicConfig; import com.provectus.kafka.ui.model.BrokerConfig; import com.provectus.kafka.ui.model.PartitionsIncrease; import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicDetails; -import com.provectus.kafka.ui.model.TopicMessage; +import com.provectus.kafka.ui.model.TopicMessageEvent; import com.provectus.kafka.ui.producer.KafkaTestProducer; import java.util.List; import java.util.Map; @@ -49,13 +52,20 @@ public class KafkaConsumerTests extends AbstractBaseTest { .forEach(value -> producer.send(topicName, value)); } - webTestClient.get() + long count = webTestClient.get() .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName) + .accept(TEXT_EVENT_STREAM) .exchange() .expectStatus() .isOk() - .expectBodyList(TopicMessage.class) - .hasSize(4); + .expectBodyList(TopicMessageEvent.class) + .returnResult() + .getResponseBody() + .stream() + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .count(); + + assertThat(count).isEqualTo(4); webTestClient.delete() .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName) @@ -63,13 +73,19 @@ public class KafkaConsumerTests extends AbstractBaseTest { .expectStatus() .isOk(); - webTestClient.get() + count = webTestClient.get() .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName) .exchange() .expectStatus() .isOk() - .expectBodyList(TopicMessage.class) - .hasSize(0); + .expectBodyList(TopicMessageEvent.class) + .returnResult() + .getResponseBody() + .stream() + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .count(); + + assertThat(count).isZero(); } @Test diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java index 1f891b6819b7d1109f321faba68dbf969ac763ae..7aaa94ecd7c8029682fdc102d3644ad1970c5398 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat; import com.provectus.kafka.ui.serde.schemaregistry.SchemaRegistryAwareRecordSerDe; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.utils.Bytes; @@ -25,13 +26,23 @@ class SchemaRegistryRecordDeserializerTest { var deserializedRecord = deserializer.deserialize( new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), Bytes.wrap(value.getBytes()))); - assertEquals(new DeserializedKeyValue("key", value), deserializedRecord); + DeserializedKeyValue expected = DeserializedKeyValue.builder() + .key("key") + .keyFormat(MessageFormat.UNKNOWN) + .value(value) + .valueFormat(MessageFormat.UNKNOWN) + .build(); + assertEquals(expected, deserializedRecord); } @Test public void shouldDeserializeNullValueRecordToEmptyMap() { var deserializedRecord = deserializer .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), null)); - assertEquals(new DeserializedKeyValue("key", null), deserializedRecord); + DeserializedKeyValue expected = DeserializedKeyValue.builder() + .key("key") + .keyFormat(MessageFormat.UNKNOWN) + .build(); + assertEquals(expected, deserializedRecord); } } \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java index ff075c772016f1b4f4bb06858a75c390402cca53..0a5d5f613bc2bcc5953f7c8cd87d90cb84f43f01 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java @@ -29,7 +29,9 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.test.context.ContextConfiguration; +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) public class OffsetsResetServiceTest extends AbstractBaseTest { private static final int PARTITIONS = 5; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java index 65346a17a6b8fd95c6148ea22a0b3517b3840816..39f57c797e4ab2aa7b8e6919f91e08d9bd2c669f 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -8,7 +8,9 @@ import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.SeekType; +import com.provectus.kafka.ui.model.TopicMessageEvent; import com.provectus.kafka.ui.producer.KafkaTestProducer; +import com.provectus.kafka.ui.serde.SimpleRecordSerDe; import com.provectus.kafka.ui.util.OffsetsSeekBackward; import com.provectus.kafka.ui.util.OffsetsSeekForward; import java.io.Serializable; @@ -24,19 +26,19 @@ import lombok.Value; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.BytesDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.test.context.ContextConfiguration; import reactor.core.publisher.Flux; @Log4j2 +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) class RecordEmitterTest extends AbstractBaseTest { static final int PARTITIONS = 5; @@ -80,7 +82,7 @@ class RecordEmitterTest extends AbstractBaseTest { this::createConsumer, new OffsetsSeekForward(EMPTY_TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD) - ) + ), new SimpleRecordSerDe() ); var backwardEmitter = new BackwardRecordEmitter( @@ -89,10 +91,11 @@ class RecordEmitterTest extends AbstractBaseTest { EMPTY_TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.BACKWARD), 100 - ) + ), new SimpleRecordSerDe() ); Long polledValues = Flux.create(forwardEmitter) + .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) .limitRequest(100) .count() .block(); @@ -100,6 +103,7 @@ class RecordEmitterTest extends AbstractBaseTest { assertThat(polledValues).isZero(); polledValues = Flux.create(backwardEmitter) + .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) .limitRequest(100) .count() .block(); @@ -114,7 +118,7 @@ class RecordEmitterTest extends AbstractBaseTest { this::createConsumer, new OffsetsSeekForward(TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD) - ) + ), new SimpleRecordSerDe() ); var backwardEmitter = new BackwardRecordEmitter( @@ -122,12 +126,15 @@ class RecordEmitterTest extends AbstractBaseTest { new OffsetsSeekBackward(TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD), PARTITIONS * MSGS_PER_PARTITION - ) + ), new SimpleRecordSerDe() ); var polledValues = Flux.create(forwardEmitter) - .map(this::deserialize) + .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) .limitRequest(Long.MAX_VALUE) + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .map(m -> m.getContent().toString()) .collect(Collectors.toList()) .block(); @@ -135,8 +142,11 @@ class RecordEmitterTest extends AbstractBaseTest { SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList())); polledValues = Flux.create(backwardEmitter) - .map(this::deserialize) + .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) .limitRequest(Long.MAX_VALUE) + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .map(m -> m.getContent().toString()) .collect(Collectors.toList()) .block(); @@ -157,7 +167,7 @@ class RecordEmitterTest extends AbstractBaseTest { this::createConsumer, new OffsetsSeekForward(TOPIC, new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.FORWARD) - ) + ), new SimpleRecordSerDe() ); var backwardEmitter = new BackwardRecordEmitter( @@ -165,12 +175,15 @@ class RecordEmitterTest extends AbstractBaseTest { new OffsetsSeekBackward(TOPIC, new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.BACKWARD), PARTITIONS * MSGS_PER_PARTITION - ) + ), new SimpleRecordSerDe() ); var polledValues = Flux.create(forwardEmitter) - .map(this::deserialize) + .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) .limitRequest(Long.MAX_VALUE) + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .map(m -> m.getContent().toString()) .collect(Collectors.toList()) .block(); @@ -187,8 +200,11 @@ class RecordEmitterTest extends AbstractBaseTest { .collect(Collectors.toList()); polledValues = Flux.create(backwardEmitter) - .map(this::deserialize) + .filter(m -> m.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) .limitRequest(Long.MAX_VALUE) + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .map(m -> m.getContent().toString()) .collect(Collectors.toList()) .block(); @@ -214,7 +230,7 @@ class RecordEmitterTest extends AbstractBaseTest { this::createConsumer, new OffsetsSeekForward(TOPIC, new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.FORWARD) - ) + ), new SimpleRecordSerDe() ); var backwardEmitter = new BackwardRecordEmitter( @@ -222,11 +238,13 @@ class RecordEmitterTest extends AbstractBaseTest { new OffsetsSeekBackward(TOPIC, new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.BACKWARD), PARTITIONS * MSGS_PER_PARTITION - ) + ), new SimpleRecordSerDe() ); var polledValues = Flux.create(forwardEmitter) - .map(this::deserialize) + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .map(m -> m.getContent().toString()) .limitRequest(Long.MAX_VALUE) .collect(Collectors.toList()) .block(); @@ -239,7 +257,9 @@ class RecordEmitterTest extends AbstractBaseTest { assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); polledValues = Flux.create(backwardEmitter) - .map(this::deserialize) + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .map(m -> m.getContent().toString()) .limitRequest(Long.MAX_VALUE) .collect(Collectors.toList()) .block(); @@ -266,11 +286,13 @@ class RecordEmitterTest extends AbstractBaseTest { new OffsetsSeekBackward(TOPIC, new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.BACKWARD), numMessages - ) + ), new SimpleRecordSerDe() ); var polledValues = Flux.create(backwardEmitter) - .map(this::deserialize) + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .map(m -> m.getContent().toString()) .limitRequest(numMessages) .collect(Collectors.toList()) .block(); @@ -297,11 +319,13 @@ class RecordEmitterTest extends AbstractBaseTest { new OffsetsSeekBackward(TOPIC, new ConsumerPosition(SeekType.OFFSET, offsets, SeekDirection.BACKWARD), 100 - ) + ), new SimpleRecordSerDe() ); var polledValues = Flux.create(backwardEmitter) - .map(this::deserialize) + .filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .map(m -> m.getContent().toString()) .limitRequest(Long.MAX_VALUE) .collect(Collectors.toList()) .block(); @@ -327,10 +351,6 @@ class RecordEmitterTest extends AbstractBaseTest { return new KafkaConsumer<>(props); } - private String deserialize(ConsumerRecord rec) { - return new StringDeserializer().deserialize(TOPIC, rec.value().get()); - } - @Value static class Record { String value; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index f3b1988f5ca32359b323077a5e16f79ce99f82d2..7bd16af27253848009af05ddbd977606ffd12d83 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -7,9 +7,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.AbstractBaseTest; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessage; +import com.provectus.kafka.ui.model.MessageFormat; import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.model.TopicMessage; +import com.provectus.kafka.ui.model.TopicMessageEvent; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.json.JsonSchema; @@ -24,7 +26,9 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) public class SendAndReadTests extends AbstractBaseTest { private static final AvroSchema AVRO_SCHEMA_1 = new AvroSchema( @@ -358,6 +362,73 @@ public class SendAndReadTests extends AbstractBaseTest { .assertSendThrowsException(); } + @Test + void topicMessageMetadataAvro() { + new SendAndReadSpec() + .withKeySchema(AVRO_SCHEMA_1) + .withValueSchema(AVRO_SCHEMA_2) + .withMsgToSend( + new CreateTopicMessage() + .key(AVRO_SCHEMA_1_JSON_RECORD) + .content(AVRO_SCHEMA_2_JSON_RECORD) + ) + .doAssert(polled -> { + assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD); + assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD); + assertThat(polled.getKeySize()).isEqualTo(15L); + assertThat(polled.getValueSize()).isEqualTo(15L); + assertThat(polled.getKeyFormat()).isEqualTo(MessageFormat.AVRO); + assertThat(polled.getValueFormat()).isEqualTo(MessageFormat.AVRO); + assertThat(polled.getKeySchemaId()).isNotEmpty(); + assertThat(polled.getValueSchemaId()).isNotEmpty(); + }); + } + + @Test + void topicMessageMetadataProtobuf() { + new SendAndReadSpec() + .withKeySchema(PROTOBUF_SCHEMA) + .withValueSchema(PROTOBUF_SCHEMA) + .withMsgToSend( + new CreateTopicMessage() + .key(PROTOBUF_SCHEMA_JSON_RECORD) + .content(PROTOBUF_SCHEMA_JSON_RECORD) + ) + .doAssert(polled -> { + assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD); + assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD); + assertThat(polled.getKeySize()).isEqualTo(18L); + assertThat(polled.getValueSize()).isEqualTo(18L); + assertThat(polled.getKeyFormat()).isEqualTo(MessageFormat.PROTOBUF); + assertThat(polled.getValueFormat()).isEqualTo(MessageFormat.PROTOBUF); + assertThat(polled.getKeySchemaId()).isNotEmpty(); + assertThat(polled.getValueSchemaId()).isNotEmpty(); + }); + } + + @Test + void topicMessageMetadataJson() { + new SendAndReadSpec() + .withKeySchema(JSON_SCHEMA) + .withValueSchema(JSON_SCHEMA) + .withMsgToSend( + new CreateTopicMessage() + .key(JSON_SCHEMA_RECORD) + .content(JSON_SCHEMA_RECORD) + .headers(Map.of("header1", "value1")) + ) + .doAssert(polled -> { + assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD); + assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD); + assertThat(polled.getKeyFormat()).isEqualTo(MessageFormat.JSON); + assertThat(polled.getValueFormat()).isEqualTo(MessageFormat.JSON); + assertThat(polled.getKeySchemaId()).isNotEmpty(); + assertThat(polled.getValueSchemaId()).isNotEmpty(); + assertThat(polled.getKeySize()).isEqualTo(57L); + assertThat(polled.getValueSize()).isEqualTo(57L); + assertThat(polled.getHeadersSize()).isEqualTo(13L); + }); + } @SneakyThrows private void assertJsonEqual(String actual, String expected) { @@ -396,8 +467,10 @@ public class SendAndReadTests extends AbstractBaseTest { if (valueSchema != null) { schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema); } + // need to update to see new topic & schemas clustersMetricsScheduler.updateMetrics(); + return topic; } @@ -425,7 +498,9 @@ public class SendAndReadTests extends AbstractBaseTest { ), null, 1 - ).blockLast(Duration.ofSeconds(5)); + ).filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE)) + .map(TopicMessageEvent::getMessage) + .blockLast(Duration.ofSeconds(5000)); assertThat(polled).isNotNull(); assertThat(polled.getPartition()).isEqualTo(0); diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 4d74b7c1a97e32d180543a9c5053fcc345b6fbf1..278e79df3157188b9a72d1ad7367f9ec07e8d6e3 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -503,11 +503,11 @@ paths: 200: description: OK content: - application/json: + text/event-stream: schema: type: array items: - $ref: '#/components/schemas/TopicMessage' + $ref: '#/components/schemas/TopicMessageEvent' delete: tags: - Messages @@ -1793,6 +1793,14 @@ components: - DEAD - EMPTY + MessageFormat: + type: string + enum: + - AVRO + - JSON + - PROTOBUF + - UNKNOWN + ConsumerGroup: type: object properties: @@ -1859,6 +1867,44 @@ components: - source - schema + TopicMessageEvent: + type: object + properties: + type: + type: string + enum: + - PHASE + - MESSAGE + - CONSUMING + - DONE + message: + $ref: "#/components/schemas/TopicMessage" + phase: + $ref: "#/components/schemas/TopicMessagePhase" + consuming: + $ref: "#/components/schemas/TopicMessageConsuming" + + TopicMessagePhase: + type: object + properties: + name: + type: string + + TopicMessageConsuming: + type: object + properties: + bytesConsumed: + type: integer + format: int64 + elapsedMs: + type: integer + format: int64 + isCancelled: + type: boolean + messagesConsumed: + type: integer + + TopicMessage: type: object properties: @@ -1884,6 +1930,23 @@ components: type: string content: type: string + keyFormat: + $ref: "#/components/schemas/MessageFormat" + valueFormat: + $ref: "#/components/schemas/MessageFormat" + keySize: + type: integer + format: int64 + valueSize: + type: integer + format: int64 + keySchemaId: + type: string + valueSchemaId: + type: string + headersSize: + type: integer + format: int64 required: - partition - offset diff --git a/kafka-ui-react-app/package-lock.json b/kafka-ui-react-app/package-lock.json index 8b4f2ec6f6f7bc2d9defd55984a1e1b1d0d29206..074466341625888891e96a014ad0401dd6d4d868 100644 --- a/kafka-ui-react-app/package-lock.json +++ b/kafka-ui-react-app/package-lock.json @@ -2732,6 +2732,11 @@ "integrity": "sha512-LfZwXoGUDo0C3me81HXgkBg5CTQYb6xzEl+fNmbO4JdRiSKQ8A0GD1OBBvKAIsbCUgoyAty7m99GqqMQe784ew==", "dev": true }, + "@types/eventsource": { + "version": "1.1.6", + "resolved": "https://registry.npmjs.org/@types/eventsource/-/eventsource-1.1.6.tgz", + "integrity": "sha512-y4xcLJ+lcoZ6mN9ndSdKOWg24Nj5uQc4Z/NRdy3HbiGGt5hfH3RLwAXr6V+RzGzOljAk48a09n6iY4BMNumEng==" + }, "@types/express-serve-static-core": { "version": "4.17.22", "resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.17.22.tgz", @@ -6538,6 +6543,11 @@ "resolved": "https://registry.npmjs.org/date-fns/-/date-fns-2.22.1.tgz", "integrity": "sha512-yUFPQjrxEmIsMqlHhAhmxkuH769baF21Kk+nZwZGyrMoyLA+LugaQtC0+Tqf9CBUUULWwUJt6Q5ySI3LJDDCGg==" }, + "dayjs": { + "version": "1.10.6", + "resolved": "https://registry.npmjs.org/dayjs/-/dayjs-1.10.6.tgz", + "integrity": "sha512-AztC/IOW4L1Q41A86phW5Thhcrco3xuAA+YX/BLpLWWjRcTj5TOt/QImBLmCKlrF7u7k47arTnOyL6GnbG8Hvw==" + }, "debug": { "version": "2.6.9", "resolved": "https://registry.npmjs.org/debug/-/debug-2.6.9.tgz", diff --git a/kafka-ui-react-app/package.json b/kafka-ui-react-app/package.json index 886ce5dc174af98837ae9811c0bea0d24c2bb701..aeac4a837360bdcc6f1a190d30b74d3466ef13c0 100644 --- a/kafka-ui-react-app/package.json +++ b/kafka-ui-react-app/package.json @@ -8,13 +8,14 @@ "@hookform/error-message": "^2.0.0", "@hookform/resolvers": "^2.5.1", "@rooks/use-outside-click-ref": "^4.10.1", + "@types/eventsource": "^1.1.6", "@types/yup": "^0.29.13", "@testing-library/react": "^12.0.0", "ace-builds": "^1.4.12", "bulma": "^0.9.3", "bulma-switch": "^2.0.0", "classnames": "^2.2.6", - "date-fns": "^2.19.0", + "dayjs": "^1.10.6", "eslint-import-resolver-node": "^0.3.4", "eslint-import-resolver-typescript": "^2.4.0", "json-schema-yup-transformer": "^1.6.0", diff --git a/kafka-ui-react-app/src/components/App.scss b/kafka-ui-react-app/src/components/App.scss index 3e920f736a6103e84ff33c210dc0632f906bf808..de58a7477dca1cc86f03f1084bca3327d866d656 100644 --- a/kafka-ui-react-app/src/components/App.scss +++ b/kafka-ui-react-app/src/components/App.scss @@ -2,6 +2,8 @@ $header-height: 52px; $navbar-width: 250px; .Layout { + min-width: 1200px; + &__header { box-shadow: 0 0.46875rem 2.1875rem rgba(4,9,20,0.03), 0 0.9375rem 1.40625rem rgba(4,9,20,0.03), @@ -63,6 +65,8 @@ $navbar-width: 250px; @media screen and (max-width: 1023px) { .Layout { + min-width: initial; + &__container { margin-left: initial; margin-top: 1.5rem; diff --git a/kafka-ui-react-app/src/components/Connect/List/__tests__/__snapshots__/ListItem.spec.tsx.snap b/kafka-ui-react-app/src/components/Connect/List/__tests__/__snapshots__/ListItem.spec.tsx.snap index f15a2368b2f7a2d17c2ce087047e5c5398571718..26e55d350ebebaee2ea88307d7bcca4c8345cf87 100644 --- a/kafka-ui-react-app/src/components/Connect/List/__tests__/__snapshots__/ListItem.spec.tsx.snap +++ b/kafka-ui-react-app/src/components/Connect/List/__tests__/__snapshots__/ListItem.spec.tsx.snap @@ -176,7 +176,7 @@ exports[`Connectors ListItem matches snapshot 1`] = ` + ) : ( + + )} + + +
+
+
+ + +
+
+
+
+
{isFetching && phaseMessage}
+
+ + + + {Math.max(elapsedMs || 0, 0)}ms +
+
+ + + + +
+
+ + + + {messagesConsumed} +
+
+
+
+ + ); +}; + +export default Filters; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Filters/FiltersContainer.ts b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Filters/FiltersContainer.ts new file mode 100644 index 0000000000000000000000000000000000000000..0f02b863f7c8219b55b308116e56916063db3403 --- /dev/null +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Filters/FiltersContainer.ts @@ -0,0 +1,67 @@ +import { connect } from 'react-redux'; +import { Action, ClusterName, RootState, TopicName } from 'redux/interfaces'; +import { withRouter, RouteComponentProps } from 'react-router-dom'; +import { ThunkDispatch } from 'redux-thunk'; +import { + addTopicMessage, + resetTopicMessages, + updateTopicMessagesMeta, + updateTopicMessagesPhase, + setTopicMessagesFetchingStatus, +} from 'redux/actions'; +import { TopicMessage, TopicMessageConsuming } from 'generated-sources'; +import { + getTopicMessgesMeta, + getTopicMessgesPhase, + getIsTopicMessagesFetching, +} from 'redux/reducers/topicMessages/selectors'; +import { getPartitionsByTopicName } from 'redux/reducers/topics/selectors'; + +import Filters from './Filters'; + +interface RouteProps { + clusterName: ClusterName; + topicName: TopicName; +} + +type OwnProps = RouteComponentProps; + +const mapStateToProps = ( + state: RootState, + { + match: { + params: { topicName, clusterName }, + }, + }: OwnProps +) => ({ + clusterName, + topicName, + phaseMessage: getTopicMessgesPhase(state), + partitions: getPartitionsByTopicName(state, topicName), + meta: getTopicMessgesMeta(state), + isFetching: getIsTopicMessagesFetching(state), +}); + +const mapDispatchToProps = ( + dispatch: ThunkDispatch +) => ({ + addMessage: (message: TopicMessage) => { + dispatch(addTopicMessage(message)); + }, + resetMessages: () => { + dispatch(resetTopicMessages()); + }, + updatePhase: (phase: string) => { + dispatch(updateTopicMessagesPhase(phase)); + }, + updateMeta: (meta: TopicMessageConsuming) => { + dispatch(updateTopicMessagesMeta(meta)); + }, + setIsFetching: (status: boolean) => { + dispatch(setTopicMessagesFetchingStatus(status)); + }, +}); + +export default withRouter( + connect(mapStateToProps, mapDispatchToProps)(Filters) +); diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Filters/utils.ts b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Filters/utils.ts new file mode 100644 index 0000000000000000000000000000000000000000..6328d96ac9b2c8548394ec0ad814eaf866deb28c --- /dev/null +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Filters/utils.ts @@ -0,0 +1,70 @@ +import { Partition, SeekType } from 'generated-sources'; +import { compact } from 'lodash'; +import { Option } from 'react-multi-select-component/dist/lib/interfaces'; + +export const filterOptions = (options: Option[], filter: string) => { + if (!filter) { + return options; + } + return options.filter( + ({ value }) => value.toString() && value.toString() === filter + ); +}; + +export const getOffsetFromSeekToParam = (params: URLSearchParams) => { + if (params.get('seekType') === SeekType.OFFSET) { + // seekTo format = ?seekTo=0::123,1::123,2::0 + const offsets = params + .get('seekTo') + ?.split(',') + .map((item) => Number(item.split('::')[1])); + return String(Math.max(...(offsets || []), 0)); + } + + return ''; +}; + +export const getTimestampFromSeekToParam = (params: URLSearchParams) => { + if (params.get('seekType') === SeekType.TIMESTAMP) { + // seekTo format = ?seekTo=0::1627333200000,1::1627333200000 + const offsets = params + .get('seekTo') + ?.split(',') + .map((item) => Number(item.split('::')[1])); + + return new Date(Math.max(...(offsets || []), 0)); + } + + return null; +}; + +export const getSelectedPartitionsFromSeekToParam = ( + params: URLSearchParams, + partitions: Partition[] +) => { + const seekTo = params.get('seekTo'); + + if (seekTo) { + const selectedPartitionIds = seekTo + .split(',') + .map((item) => Number(item.split('::')[0])); + + return compact( + partitions.map(({ partition }) => { + if (selectedPartitionIds?.includes(partition)) { + return { + value: partition, + label: partition.toString(), + }; + } + + return undefined; + }) + ); + } + + return partitions.map(({ partition }) => ({ + value: partition, + label: partition.toString(), + })); +}; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Message.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Message.tsx new file mode 100644 index 0000000000000000000000000000000000000000..0e17f4742765a30a59299d9c8ccb7ffbd4fb5708 --- /dev/null +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Message.tsx @@ -0,0 +1,147 @@ +import * as React from 'react'; +import dayjs from 'dayjs'; +import { TopicMessage } from 'generated-sources'; +import JSONViewer from 'components/common/JSONViewer/JSONViewer'; +import Dropdown from 'components/common/Dropdown/Dropdown'; +import DropdownItem from 'components/common/Dropdown/DropdownItem'; +import useDataSaver from 'lib/hooks/useDataSaver'; + +type Tab = 'key' | 'content' | 'headers'; + +const Message: React.FC<{ message: TopicMessage }> = ({ + message: { + timestamp, + timestampType, + offset, + key, + partition, + content, + headers, + }, +}) => { + const [isOpen, setIsOpen] = React.useState(false); + const [activeTab, setActiveTab] = React.useState('content'); + const { copyToClipboard, saveFile } = useDataSaver( + 'topic-message', + content || '' + ); + + const toggleIsOpen = () => setIsOpen(!isOpen); + const handleKeyTabClick = (e: React.MouseEvent) => { + e.preventDefault(); + setActiveTab('key'); + }; + const handleContentTabClick = (e: React.MouseEvent) => { + e.preventDefault(); + setActiveTab('content'); + }; + const handleHeadersTabClick = (e: React.MouseEvent) => { + e.preventDefault(); + setActiveTab('headers'); + }; + + const activeTabContent = () => { + switch (activeTab) { + case 'content': + return content; + case 'key': + return key; + default: + return JSON.stringify(headers); + } + }; + + return ( + <> + + + + + + + {offset} + {partition} + + {key} + + +
+ {dayjs(timestamp).format('MM.DD.YYYY HH:mm:ss')} +
+ + + {content} + + + + + + } + right + > + + Copy to clipboard + + Save as a file + + + + {isOpen && ( + + + +
Timestamp Type
+
{timestampType}
+
Timestamp
+
{timestamp}
+ + + + + + )} + + ); +}; + +export default Message; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessageContent.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessageContent.tsx deleted file mode 100644 index 9d1b1c3cda3ea130b879e64696ba905956dbfe5e..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessageContent.tsx +++ /dev/null @@ -1,37 +0,0 @@ -import React from 'react'; - -import FullMessage from './FullMessage'; - -interface MessageContentProps { - message: string; -} - -const MessageContent: React.FC = ({ message }) => { - const [isFolded, setIsFolded] = React.useState(message.length > 250); - - return ( -
- {isFolded ? ( -

- {`${message.slice(0, 250)}...`} -

- ) : ( - - )} - {isFolded && ( - - )} -
- ); -}; - -export default MessageContent; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessageItem.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessageItem.tsx deleted file mode 100644 index 9614da98ad3433a864570f4d679605f7162a65fc..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessageItem.tsx +++ /dev/null @@ -1,57 +0,0 @@ -import React from 'react'; -import { format } from 'date-fns'; -import { TopicMessage } from 'generated-sources'; -import Dropdown from 'components/common/Dropdown/Dropdown'; -import DropdownItem from 'components/common/Dropdown/DropdownItem'; -import useDataSaver from 'lib/hooks/useDataSaver'; - -import MessageContent from './MessageContent'; - -export interface MessageItemProp { - partition: TopicMessage['partition']; - offset: TopicMessage['offset']; - timestamp: TopicMessage['timestamp']; - content?: TopicMessage['content']; - messageKey?: TopicMessage['key']; -} - -const MessageItem: React.FC = ({ - partition, - offset, - timestamp, - content, - messageKey, -}) => { - const { copyToClipboard, saveFile } = useDataSaver( - 'topic-message', - content || '' - ); - return ( - - {format(timestamp, 'yyyy-MM-dd HH:mm:ss')} - {messageKey} - {offset} - {partition} - - {content && } - - - - - - } - right - > - - Copy to clipboard - - Save as a file - - - - ); -}; - -export default MessageItem; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Messages.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Messages.tsx index 63d193dd90bdfaf1668716cced739384e5bea3c2..605fbc45e99b67edf08bc3c75d6e68e6a8658ab7 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Messages.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/Messages.tsx @@ -1,327 +1,13 @@ -import 'react-datepicker/dist/react-datepicker.css'; -import React, { useCallback, useEffect, useRef } from 'react'; -import { groupBy, map, concat, maxBy, minBy } from 'lodash'; -import DatePicker from 'react-datepicker'; -import MultiSelect from 'react-multi-select-component'; -import { Option } from 'react-multi-select-component/dist/lib/interfaces'; -import { useDebouncedCallback } from 'use-debounce'; -import { - ClusterName, - TopicMessageQueryParams, - TopicName, -} from 'redux/interfaces'; -import { - TopicMessage, - Partition, - SeekType, - SeekDirection, -} from 'generated-sources'; -import PageLoader from 'components/common/PageLoader/PageLoader'; +import React from 'react'; +import FiltersContainer from './Filters/FiltersContainer'; import MessagesTable from './MessagesTable'; -export interface Props { - clusterName: ClusterName; - topicName: TopicName; - isFetched: boolean; - fetchTopicMessages: ( - clusterName: ClusterName, - topicName: TopicName, - queryParams: Partial - ) => void; - messages: TopicMessage[]; - partitions: Partition[]; -} - -interface FilterProps { - offset: TopicMessage['offset']; - partition: TopicMessage['partition']; -} - -function usePrevious(value: Date | null) { - const ref = useRef(); - useEffect(() => { - ref.current = value; - }); - return ref.current; -} - -const Messages: React.FC = ({ - isFetched, - clusterName, - topicName, - messages, - partitions, - fetchTopicMessages, -}) => { - const [searchQuery, setSearchQuery] = React.useState(''); - const [searchTimestamp, setSearchTimestamp] = React.useState( - null - ); - const [filterProps, setFilterProps] = React.useState([]); - const [selectedSeekType, setSelectedSeekType] = React.useState( - SeekType.OFFSET - ); - const [searchOffset, setSearchOffset] = React.useState(); - const [selectedPartitions, setSelectedPartitions] = React.useState( - partitions.map((p) => ({ - value: p.partition, - label: p.partition.toString(), - })) - ); - const [queryParams, setQueryParams] = React.useState< - Partial - >({ limit: 100 }); - const debouncedCallback = useDebouncedCallback( - (query: Partial) => - setQueryParams({ ...queryParams, ...query }), - 1000 - ); - const [selectedSeekDirection, setSelectedSeekDirection] = - React.useState(SeekDirection.FORWARD); - - const prevSearchTimestamp = usePrevious(searchTimestamp); - - const getUniqueDataForEachPartition: FilterProps[] = React.useMemo(() => { - const partitionUniqs: FilterProps[] = partitions.map((p) => ({ - offset: 0, - partition: p.partition, - })); - const messageUniqs: FilterProps[] = map( - groupBy(messages, 'partition'), - (v) => - selectedSeekDirection === SeekDirection.FORWARD - ? maxBy(v, 'offset') - : minBy(v, 'offset') - ).map((v) => ({ - offset: v ? v.offset : 0, - partition: v ? v.partition : 0, - })); - - return map( - groupBy(concat(partitionUniqs, messageUniqs), 'partition'), - (v) => maxBy(v, 'offset') as FilterProps - ); - }, [messages, partitions]); - - const getSeekToValuesForPartitions = (partition: Option) => { - const foundedValues = filterProps.find( - (prop) => prop.partition === partition.value - ); - if (selectedSeekType === SeekType.OFFSET) { - return foundedValues ? foundedValues.offset : 0; - } - return searchTimestamp ? searchTimestamp.getTime() : null; - }; - - React.useEffect(() => { - fetchTopicMessages(clusterName, topicName, queryParams); - }, []); - - React.useEffect(() => { - setFilterProps(getUniqueDataForEachPartition); - }, [messages, partitions]); - - const handleQueryChange = (event: React.ChangeEvent) => { - const query = event.target.value; - - setSearchQuery(query); - debouncedCallback({ q: query }); - }; - - const handleDateTimeChange = () => { - if (searchTimestamp !== prevSearchTimestamp) { - if (searchTimestamp) { - const timestamp: number = searchTimestamp.getTime(); - - setSearchTimestamp(searchTimestamp); - setQueryParams({ - ...queryParams, - seekType: SeekType.TIMESTAMP, - seekTo: selectedPartitions.map((p) => `${p.value}::${timestamp}`), - }); - } else { - setSearchTimestamp(null); - const { seekTo, seekType, ...queryParamsWithoutSeek } = queryParams; - setQueryParams(queryParamsWithoutSeek); - } - } - }; - - const handleSeekTypeChange = ( - event: React.ChangeEvent - ) => { - setSelectedSeekType(event.target.value as SeekType); - }; - - const handleOffsetChange = (event: React.ChangeEvent) => { - const offset = event.target.value || '0'; - setSearchOffset(offset); - debouncedCallback({ - seekType: SeekType.OFFSET, - seekTo: selectedPartitions.map((p) => `${p.value}::${offset}`), - }); - }; - - const handlePartitionsChange = (options: Option[]) => { - setSelectedPartitions(options); - - debouncedCallback({ - seekType: options.length > 0 ? selectedSeekType : undefined, - seekTo: - options.length > 0 - ? options.map((p) => `${p.value}::${getSeekToValuesForPartitions(p)}`) - : undefined, - }); - }; - - const handleFiltersSubmit = useCallback(() => { - fetchTopicMessages(clusterName, topicName, queryParams); - }, [clusterName, topicName, queryParams]); - - const onNext = (event: React.MouseEvent) => { - event.preventDefault(); - - const seekTo: string[] = filterProps - .filter( - (value) => - selectedPartitions.findIndex((p) => p.value === value.partition) > -1 - ) - .map((p) => `${p.partition}::${p.offset}`); - - fetchTopicMessages(clusterName, topicName, { - ...queryParams, - seekType: SeekType.OFFSET, - seekTo, - }); - }; - - const filterOptions = (options: Option[], filter: string) => { - if (!filter) { - return options; - } - return options.filter( - ({ value }) => value.toString() && value.toString() === filter - ); - }; - - const toggleSeekDirection = () => { - const nextSeekDirectionValue = - selectedSeekDirection === SeekDirection.FORWARD - ? SeekDirection.BACKWARD - : SeekDirection.FORWARD; - setSelectedSeekDirection(nextSeekDirectionValue); - - debouncedCallback({ - seekDirection: nextSeekDirectionValue, - }); - - fetchTopicMessages(clusterName, topicName, { - ...queryParams, - seekDirection: nextSeekDirectionValue, - }); - }; - - if (!isFetched) { - return ; - } - - return ( -
-
-
- - ({ - label: `Partition #${p.partition.toString()}`, - value: p.partition, - }))} - filterOptions={filterOptions} - value={selectedPartitions} - onChange={handlePartitionsChange} - labelledBy="Select partitions" - /> -
-
- -
- -
-
-
- {selectedSeekType === SeekType.OFFSET ? ( - <> - - - - ) : ( - <> - - setSearchTimestamp(date)} - onCalendarClose={handleDateTimeChange} - showTimeInput - timeInputLabel="Time:" - dateFormat="MMMM d, yyyy h:mm aa" - className="input" - /> - - )} -
-
- - -
-
- -
-
-
-
-
- - -
-
-
- -
- ); -}; +const Messages: React.FC = () => ( +
+ + +
+); export default Messages; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessagesContainer.ts b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessagesContainer.ts deleted file mode 100644 index b56a1a8459c6a915b74d81978dd59d8a2a8ff8bd..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessagesContainer.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { connect } from 'react-redux'; -import { ClusterName, RootState, TopicName } from 'redux/interfaces'; -import { RouteComponentProps, withRouter } from 'react-router-dom'; -import { fetchTopicMessages } from 'redux/actions'; -import { - getIsTopicMessagesFetched, - getPartitionsByTopicName, - getTopicMessages, -} from 'redux/reducers/topics/selectors'; - -import Messages from './Messages'; - -interface RouteProps { - clusterName: ClusterName; - topicName: TopicName; -} - -type OwnProps = RouteComponentProps; - -const mapStateToProps = ( - state: RootState, - { - match: { - params: { topicName, clusterName }, - }, - }: OwnProps -) => ({ - clusterName, - topicName, - isFetched: getIsTopicMessagesFetched(state), - messages: getTopicMessages(state), - partitions: getPartitionsByTopicName(state, topicName), -}); - -const mapDispatchToProps = { - fetchTopicMessages, -}; - -export default withRouter( - connect(mapStateToProps, mapDispatchToProps)(Messages) -); diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessagesTable.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessagesTable.tsx index 27f0ab4c67402e66d91cb4f5c7b13be96305c66c..8f7c0beaeeb5e1ff685ec6ce42701eb32d08c099 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessagesTable.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/MessagesTable.tsx @@ -1,58 +1,135 @@ -import React from 'react'; -import { TopicMessage } from 'generated-sources'; +import PageLoader from 'components/common/PageLoader/PageLoader'; import CustomParamButton from 'components/Topics/shared/Form/CustomParams/CustomParamButton'; +import { + Partition, + SeekDirection, + TopicMessage, + TopicMessageConsuming, +} from 'generated-sources'; +import { compact, concat, groupBy, map, maxBy, minBy } from 'lodash'; +import React from 'react'; +import { useSelector } from 'react-redux'; +import { useHistory, useLocation } from 'react-router'; +import { ClusterName, TopicName } from 'redux/interfaces'; +import { + getTopicMessges, + getIsTopicMessagesFetching, +} from 'redux/reducers/topicMessages/selectors'; -import MessageItem from './MessageItem'; +import Message from './Message'; -export interface MessagesTableProp { +export interface MessagesProps { + clusterName: ClusterName; + topicName: TopicName; messages: TopicMessage[]; - onNext(event: React.MouseEvent): void; + phaseMessage?: string; + partitions: Partition[]; + meta: TopicMessageConsuming; + addMessage(message: TopicMessage): void; + resetMessages(): void; + updatePhase(phase: string): void; + updateMeta(meta: TopicMessageConsuming): void; } -const MessagesTable: React.FC = ({ messages, onNext }) => ( - <> - - - - - - - - - - - - - {messages.map( - ({ partition, offset, timestamp, content, key }: TopicMessage) => ( - - ) - )} - {messages.length === 0 && ( +const MessagesTable: React.FC = () => { + const location = useLocation(); + const history = useHistory(); + + const searchParams = React.useMemo( + () => new URLSearchParams(location.search), + [location, history] + ); + + const messages = useSelector(getTopicMessges); + const isFetching = useSelector(getIsTopicMessagesFetching); + + const handleNextClick = React.useCallback(() => { + const seekTo = searchParams.get('seekTo'); + + if (seekTo) { + const selectedPartitions = seekTo.split(',').map((item) => { + const [partition] = item.split('::'); + return { offset: 0, partition: parseInt(partition, 10) }; + }); + + const messageUniqs = map(groupBy(messages, 'partition'), (v) => + searchParams.get('seekDirection') === SeekDirection.BACKWARD + ? minBy(v, 'offset') + : maxBy(v, 'offset') + ).map((message) => ({ + offset: message?.offset || 0, + partition: message?.partition || 0, + })); + + const nextSeekTo = compact( + map( + groupBy(concat(selectedPartitions, messageUniqs), 'partition'), + (v) => maxBy(v, 'offset') + ) + ) + .map(({ offset, partition }) => `${partition}::${offset}`) + .join(','); + + searchParams.set('seekTo', nextSeekTo); + + history.push({ + search: `?${searchParams.toString()}`, + }); + } + }, [searchParams, history, messages]); + + return ( + <> +
TimestampKeyOffsetPartitionContent
+ - + + + + + + + - )} - -
No messages at selected topic OffsetPartitionKeyTimestampContent
-
-
- + + + {messages.map((message: TopicMessage) => ( + + ))} + {isFetching && ( + + + + + + )} + {messages.length === 0 && !isFetching && ( + + No messages found + + )} + + +
+
+ +
-
- -); + + ); +}; export default MessagesTable; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/FiltersContainer.spec.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/FiltersContainer.spec.tsx new file mode 100644 index 0000000000000000000000000000000000000000..e4c8e6e37bdd683019de967534c0a24101080366 --- /dev/null +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/FiltersContainer.spec.tsx @@ -0,0 +1,27 @@ +import React from 'react'; +import { mount } from 'enzyme'; +import { Provider } from 'react-redux'; +import { StaticRouter } from 'react-router-dom'; +import configureStore from 'redux/store/configureStore'; +import FiltersContainer from 'components/Topics/Topic/Details/Messages/Filters/FiltersContainer'; + +const store = configureStore(); + +jest.mock( + 'components/Topics/Topic/Details/Messages/Filters/Filters', + () => 'mock-Filters' +); + +describe('FiltersContainer', () => { + it('renders view with initial state of storage', () => { + const wrapper = mount( + + + + + + ); + + expect(wrapper.exists('mock-Filters')).toBeTruthy(); + }); +}); diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessageContent.spec.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessageContent.spec.tsx deleted file mode 100644 index b0fc36d099b7be9bdaf66e1083d44521a4718ef1..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessageContent.spec.tsx +++ /dev/null @@ -1,21 +0,0 @@ -import { shallow } from 'enzyme'; -import React from 'react'; -import MessageContent from 'components/Topics/Topic/Details/Messages/MessageContent'; - -import { messageContent } from './fixtures'; - -describe('MessageContent', () => { - const component = shallow(); - describe('when it is folded', () => { - it('matches the snapshot', () => { - expect(component).toMatchSnapshot(); - }); - }); - - describe('when it is unfolded', () => { - it('matches the snapshot', () => { - component.find('button').simulate('click'); - expect(component).toMatchSnapshot(); - }); - }); -}); diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessageItem.spec.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessageItem.spec.tsx deleted file mode 100644 index b723f7517a5206b94c91c3f5851c98be41bec6ca..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessageItem.spec.tsx +++ /dev/null @@ -1,31 +0,0 @@ -import React from 'react'; -import { shallow } from 'enzyme'; -import MessageItem from 'components/Topics/Topic/Details/Messages/MessageItem'; - -import { messages } from './fixtures'; - -jest.mock('date-fns', () => ({ - format: () => `mocked date`, -})); - -describe('MessageItem', () => { - describe('when content is defined', () => { - it('renders table row with MessageContent', () => { - const wrapper = shallow(); - - expect(wrapper.find('tr').length).toEqual(1); - expect(wrapper.find('td').length).toEqual(6); - expect(wrapper.find('MessageContent').length).toEqual(1); - }); - - it('matches snapshot', () => { - expect(shallow()).toMatchSnapshot(); - }); - }); - - describe('when content is undefined', () => { - it('matches snapshot', () => { - expect(shallow()).toMatchSnapshot(); - }); - }); -}); diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/Messages.spec.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/Messages.spec.tsx deleted file mode 100644 index d3ca29b347a3d7ba6593b37439f5814b71afb31e..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/Messages.spec.tsx +++ /dev/null @@ -1,168 +0,0 @@ -import React from 'react'; -import { Provider } from 'react-redux'; -import { mount, shallow } from 'enzyme'; -import DatePicker from 'react-datepicker'; -import Messages, { - Props, -} from 'components/Topics/Topic/Details/Messages/Messages'; -import MessagesContainer from 'components/Topics/Topic/Details/Messages/MessagesContainer'; -import PageLoader from 'components/common/PageLoader/PageLoader'; -import configureStore from 'redux/store/configureStore'; - -describe('Messages', () => { - describe('Container', () => { - const store = configureStore(); - - it('renders view', () => { - const component = shallow( - - - - ); - expect(component.exists()).toBeTruthy(); - }); - }); - - describe('View', () => { - beforeEach(() => { - jest.restoreAllMocks(); - }); - - const setupWrapper = (props: Partial = {}) => ( - - ); - - describe('Initial state', () => { - it('renders PageLoader', () => { - expect( - shallow(setupWrapper({ isFetched: false })).exists(PageLoader) - ).toBeTruthy(); - }); - }); - - describe('Table', () => { - describe('With messages', () => { - const messagesWrapper = mount( - setupWrapper({ - messages: [ - { - partition: 1, - offset: 2, - timestamp: new Date('05-05-1994'), - content: '[1, 2, 3]', - }, - ], - }) - ); - it('renders table', () => { - expect(messagesWrapper.exists('.table.is-fullwidth')).toBeTruthy(); - }); - it('renders MessageContent', () => { - expect(messagesWrapper.find('MessageContent').length).toEqual(1); - }); - it('parses message content correctly', () => { - const messages = [ - { - partition: 1, - offset: 2, - timestamp: new Date('05-05-1994'), - content: '[1, 2, 3]', - }, - ]; - const content = JSON.stringify(messages[0].content); - expect(JSON.parse(content)).toEqual(messages[0].content); - }); - }); - describe('Without messages', () => { - it('renders string', () => { - const wrapper = mount(setupWrapper()); - expect(wrapper.text()).toContain('No messages at selected topic'); - }); - }); - }); - - describe('Offset field', () => { - describe('Seek Type dependency', () => { - const wrapper = mount(setupWrapper()); - it('renders DatePicker', () => { - wrapper - .find('[id="selectSeekType"]') - .simulate('change', { target: { value: 'TIMESTAMP' } }); - expect( - wrapper.find('[id="selectSeekType"]').first().props().value - ).toEqual('TIMESTAMP'); - expect(wrapper.exists(DatePicker)).toBeTruthy(); - }); - }); - - describe('With defined offset value', () => { - const wrapper = shallow(setupWrapper()); - it('shows offset value in input', () => { - const offset = '10'; - wrapper - .find('#searchOffset') - .simulate('change', { target: { value: offset } }); - expect(wrapper.find('#searchOffset').first().props().value).toEqual( - offset - ); - }); - }); - describe('With invalid offset value', () => { - const wrapper = shallow(setupWrapper()); - it('shows 0 in input', () => { - wrapper - .find('#searchOffset') - .simulate('change', { target: { value: null } }); - expect(wrapper.find('#searchOffset').first().props().value).toBe('0'); - }); - }); - }); - - describe('Search field', () => { - it('renders input correctly', () => { - const query = 20; - const wrapper = shallow(setupWrapper()); - expect(wrapper.exists('#searchText')).toBeTruthy(); - wrapper - .find('#searchText') - .simulate('change', { target: { value: query } }); - expect(wrapper.find('#searchText').at(0).props().value).toEqual(query); - }); - }); - - describe('Submit button', () => { - it('fetches topic messages', () => { - const mockedfetchTopicMessages = jest.fn(); - const wrapper = mount( - setupWrapper({ fetchTopicMessages: mockedfetchTopicMessages }) - ); - - wrapper.find('[type="submit"]').simulate('click'); - expect(mockedfetchTopicMessages).toHaveBeenCalled(); - }); - }); - - describe('Seek Direction', () => { - it('fetches topic messages', () => { - const mockedfetchTopicMessages = jest.fn(); - const wrapper = mount( - setupWrapper({ fetchTopicMessages: mockedfetchTopicMessages }) - ); - - wrapper.find('input[type="checkbox"]').simulate('change'); - expect(mockedfetchTopicMessages).toHaveBeenCalled(); - - wrapper.find('input[type="checkbox"]').simulate('change'); - expect(mockedfetchTopicMessages).toHaveBeenCalled(); - }); - }); - }); -}); diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessagesTable.spec.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessagesTable.spec.tsx deleted file mode 100644 index 25063a97fff6958f8fbb43f72fe0962a29dd89df..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/MessagesTable.spec.tsx +++ /dev/null @@ -1,50 +0,0 @@ -import React from 'react'; -import { shallow } from 'enzyme'; -import MessagesTable, { - MessagesTableProp, -} from 'components/Topics/Topic/Details/Messages/MessagesTable'; - -import { messages } from './fixtures'; - -jest.mock('date-fns', () => ({ - format: () => `mocked date`, -})); - -describe('MessagesTable', () => { - const setupWrapper = (props: Partial = {}) => ( - - ); - - describe('when topic is empty', () => { - it('renders table row with JSONEditor', () => { - const wrapper = shallow(setupWrapper()); - expect(wrapper.find('td').text()).toEqual( - 'No messages at selected topic' - ); - }); - - it('matches snapshot', () => { - expect(shallow(setupWrapper())).toMatchSnapshot(); - }); - }); - - describe('when topic contains messages', () => { - const onNext = jest.fn(); - const wrapper = shallow(setupWrapper({ messages, onNext })); - - it('renders table row without JSONEditor', () => { - expect(wrapper.exists('table')).toBeTruthy(); - expect(wrapper.exists('CustomParamButton')).toBeTruthy(); - expect(wrapper.find('MessageItem').length).toEqual(2); - }); - - it('handles CustomParamButton click', () => { - wrapper.find('CustomParamButton').simulate('click'); - expect(onNext).toHaveBeenCalled(); - }); - - it('matches snapshot', () => { - expect(shallow(setupWrapper({ messages, onNext }))).toMatchSnapshot(); - }); - }); -}); diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessageContent.spec.tsx.snap b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessageContent.spec.tsx.snap deleted file mode 100644 index ba722d2ed016e03fca21f10c37415e03c9f6441b..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessageContent.spec.tsx.snap +++ /dev/null @@ -1,54 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`MessageContent when it is folded matches the snapshot 1`] = ` -
-

- { - "_id": "609fab8aed527f514f4e648d", - "name": "in nostrud", - "desc": "Dolore nostrud commodo magna velit ut magna voluptate sint aute. Excepteur aute culpa culpa dolor ipsum. Tempor est ut officia tempor laborum consectetur. -Amet officia eu veni... -

- -
-`; - -exports[`MessageContent when it is unfolded matches the snapshot 1`] = ` -
- -
-`; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessageItem.spec.tsx.snap b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessageItem.spec.tsx.snap deleted file mode 100644 index be58a15360488cdd986f0c33cc6ee9a08b108642..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessageItem.spec.tsx.snap +++ /dev/null @@ -1,139 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`MessageItem when content is defined matches snapshot 1`] = ` - - - mocked date - - - - 2 - - - 1 - - - - - - - - - } - right={true} - > - - Copy to clipboard - - - Save as a file - - - - -`; - -exports[`MessageItem when content is undefined matches snapshot 1`] = ` - - - mocked date - - - - 20 - - - 2 - - - - - - - } - right={true} - > - - Copy to clipboard - - - Save as a file - - - - -`; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessagesTable.spec.tsx.snap b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessagesTable.spec.tsx.snap deleted file mode 100644 index 1dc6d4bdbb05d8678714f983c4ba60e61be46d6a..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/__snapshots__/MessagesTable.spec.tsx.snap +++ /dev/null @@ -1,117 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`MessagesTable when topic contains messages matches snapshot 1`] = ` - - - - - - - - - - - - - - - - -
- Timestamp - - Key - - Offset - - Partition - - Content - - -
-
-
- -
-
-
-`; - -exports[`MessagesTable when topic is empty matches snapshot 1`] = ` - - - - - - - - - - - - - - - - - -
- Timestamp - - Key - - Offset - - Partition - - Content - - -
- No messages at selected topic -
-
-
- -
-
-
-`; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/fixtures.ts b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/fixtures.ts deleted file mode 100644 index ee58b342df1e7ce292b6d66bc47a8321895a36fd..0000000000000000000000000000000000000000 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/fixtures.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { TopicMessage } from 'generated-sources'; - -export const messages: TopicMessage[] = [ - { - partition: 1, - offset: 2, - timestamp: new Date(Date.UTC(1995, 5, 5)), - content: JSON.stringify({ - foo: 'bar', - key: 'val', - }), - key: '1', - }, - { - partition: 2, - offset: 20, - timestamp: new Date(Date.UTC(2020, 7, 5)), - content: undefined, - key: '1', - }, -]; - -export const messageContent = `{ - "_id": "609fab8aed527f514f4e648d", - "name": "in nostrud", - "desc": "Dolore nostrud commodo magna velit ut magna voluptate sint aute. Excepteur aute culpa culpa dolor ipsum. Tempor est ut officia tempor laborum consectetur.\r\nAmet officia eu veniam Lorem enim aliqua aute voluptate elit do sunt in magna occaecat. Nisi sit non est adipisicing adipisicing consequat duis duis tempor consequat deserunt ea quis ad. Veniam sunt culpa nostrud adipisicing cillum voluptate non est cupidatat. Eiusmod tempor officia irure et deserunt est ex laboris occaecat adipisicing occaecat in aliquip aliqua. Do laboris culpa cupidatat cillum non. Ullamco excepteur mollit voluptate anim in nisi anim elit culpa aute. Ad officia sunt proident ut ullamco officia ea fugiat culpa cillum et fugiat aliquip.\r\nAmet non labore anim in ipsum. Et Lorem velit dolor ipsum. Irure id proident excepteur aliquip deserunt id officia dolor deserunt amet in sint. Aute in nostrud nulla ut laboris Lorem commodo nulla ipsum. Aliqua nulla commodo Lorem labore magna esse proident id ea in pariatur consectetur sint Lorem.\r\nCupidatat deserunt mollit tempor aliqua. Fugiat ullamco magna pariatur quis nulla magna. Esse duis labore ipsum nisi ullamco qui aute duis duis amet est laborum adipisicing magna. Est aliquip quis qui do aliquip nisi elit tempor ex aliquip. Excepteur aliquip ea deserunt amet adipisicing voluptate eiusmod sit sint exercitation exercitation. Id labore amet mollit ex commodo. Proident ex adipisicing deserunt esse Lorem tempor laborum nostrud commodo incididunt ea id.\r\n", - "semster": "spring19", - "profile": "cs", - "degree": "bachelor", - "degreee": "master", - "degreeeee": "bachelor" -}`; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/utils.spec.ts b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/utils.spec.ts new file mode 100644 index 0000000000000000000000000000000000000000..526682ae56a144b4f1366153fc1269ff9a4a11b3 --- /dev/null +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/Messages/__test__/utils.spec.ts @@ -0,0 +1,33 @@ +import { Option } from 'react-multi-select-component/dist/lib/interfaces'; +import { filterOptions } from 'components/Topics/Topic/Details/Messages/Filters/utils'; + +const options: Option[] = [ + { + value: 0, + label: 'Partition #0', + }, + { + value: 1, + label: 'Partition #1', + }, + { + value: 11, + label: 'Partition #11', + }, + { + value: 21, + label: 'Partition #21', + }, +]; + +describe('utils', () => { + describe('filterOptions', () => { + it('returns options if no filter is defined', () => { + expect(filterOptions(options, '')).toEqual(options); + }); + + it('returns filtered options', () => { + expect(filterOptions(options, '11')).toEqual([options[2]]); + }); + }); +}); diff --git a/kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamButton.tsx b/kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamButton.tsx index 27e1f6dc60567526fd0d6d8f562dafe55012bec2..9ef74cff53add791e3079a19c3e7ebc2aae4d07c 100644 --- a/kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamButton.tsx +++ b/kafka-ui-react-app/src/components/Topics/shared/Form/CustomParams/CustomParamButton.tsx @@ -5,6 +5,7 @@ interface Props { className: string; type: 'fa-plus' | 'fa-minus' | 'fa-chevron-right'; btnText?: string; + disabled?: boolean; } const CustomParamButton: React.FC = ({ diff --git a/kafka-ui-react-app/src/components/common/Dropdown/Dropdown.tsx b/kafka-ui-react-app/src/components/common/Dropdown/Dropdown.tsx index 76bfbb050569fc572e01b98594c32613f596916b..e0f36b9c5e86fd46928c85faa3c55ece6be7b090 100644 --- a/kafka-ui-react-app/src/components/common/Dropdown/Dropdown.tsx +++ b/kafka-ui-react-app/src/components/common/Dropdown/Dropdown.tsx @@ -27,7 +27,7 @@ const Dropdown: React.FC = ({ label, right, up, children }) => {