diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index d4e3258bbb..7812e59914 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -147,9 +147,9 @@ public class ClusterService { }); } - public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, Integer limit) { + public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) { return clustersStorage.getClusterByName(clusterName) - .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, limit)) + .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit)) .orElse(Flux.empty()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 90b8166e68..797d30f57b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -4,11 +4,13 @@ import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import java.time.Duration; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -16,6 +18,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.cluster.deserialization.DeserializationService; import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer; import com.provectus.kafka.ui.cluster.model.ConsumerPosition; @@ -36,12 +40,12 @@ public class ConsumingService { private static final int MAX_RECORD_LIMIT = 100; private static final int DEFAULT_RECORD_LIMIT = 20; - private static final int MAX_POLLS_COUNT = 30; private final KafkaService kafkaService; private final DeserializationService deserializationService; + private final ObjectMapper objectMapper = new ObjectMapper(); - public Flux loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) { + public Flux loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, String query, Integer limit) { int recordsLimit = Optional.ofNullable(limit) .map(s -> Math.min(s, MAX_RECORD_LIMIT)) .orElse(DEFAULT_RECORD_LIMIT); @@ -50,12 +54,44 @@ public class ConsumingService { return Flux.create(emitter::emit) .subscribeOn(Schedulers.boundedElastic()) .map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer)) + .filter(m -> filterTopicMessage(m, query)) .limitRequest(recordsLimit); } + private boolean filterTopicMessage(TopicMessage message, String query) { + if (StringUtils.isEmpty(query)) { + return true; + } + + Object content = message.getContent(); + JsonNode tree = objectMapper.valueToTree(content); + return treeContainsValue(tree, query); + } + + private boolean treeContainsValue(JsonNode tree, String query) { + LinkedList nodesForSearch = new LinkedList<>(); + nodesForSearch.add(tree); + + while (!nodesForSearch.isEmpty()) { + JsonNode node = nodesForSearch.removeFirst(); + + if (node.isContainerNode()) { + node.elements().forEachRemaining(nodesForSearch::add); + continue; + } + + String nodeValue = node.asText(); + if (nodeValue.contains(query)) { + return true; + } + } + + return false; + } + @RequiredArgsConstructor private static class RecordEmitter { - + private static final int MAX_POLLS_COUNT = 30; private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); private final KafkaService kafkaService; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 0b8d9396c6..10c559ae64 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -65,9 +65,9 @@ public class MetricsRestController implements ApiClustersApi { } @Override - public Mono>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List seekTo, @Valid Integer limit, ServerWebExchange exchange) { + public Mono>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List seekTo, @Valid Integer limit, @Valid String q, ServerWebExchange exchange) { return parseConsumerPosition(seekType, seekTo) - .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, limit))); + .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit))); } @Override diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 4e515c6c85..18f92deabd 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -229,6 +229,10 @@ paths: in: query schema: type: integer + - name: q + in: query + schema: + type: string responses: 200: description: OK