Browse Source

Text search for topic messages (#62)

* Text search for topic messages

* Code optimization
Anton Petrov 5 years ago
parent
commit
fd60bdafa8

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -147,9 +147,9 @@ public class ClusterService {
                 });
     }
 
-    public Flux<TopicMessage> getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, Integer limit) {
+    public Flux<TopicMessage> getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) {
         return clustersStorage.getClusterByName(clusterName)
-                .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, limit))
+                .map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
                 .orElse(Flux.empty());
 
     }

+ 39 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

@@ -4,11 +4,13 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 
 import java.time.Duration;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -16,6 +18,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.provectus.kafka.ui.cluster.deserialization.DeserializationService;
 import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
 import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
@@ -36,12 +40,12 @@ public class ConsumingService {
 
 	private static final int MAX_RECORD_LIMIT = 100;
 	private static final int DEFAULT_RECORD_LIMIT = 20;
-	private static final int MAX_POLLS_COUNT = 30;
 
 	private final KafkaService kafkaService;
 	private final DeserializationService deserializationService;
+	private final ObjectMapper objectMapper = new ObjectMapper();
 
-	public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, Integer limit) {
+	public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, String query, Integer limit) {
 		int recordsLimit = Optional.ofNullable(limit)
 				.map(s -> Math.min(s, MAX_RECORD_LIMIT))
 				.orElse(DEFAULT_RECORD_LIMIT);
@@ -50,12 +54,44 @@ public class ConsumingService {
 		return Flux.create(emitter::emit)
 				.subscribeOn(Schedulers.boundedElastic())
 				.map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
+				.filter(m -> filterTopicMessage(m, query))
 				.limitRequest(recordsLimit);
 	}
 
+	private boolean filterTopicMessage(TopicMessage message, String query) {
+		if (StringUtils.isEmpty(query)) {
+			return true;
+		}
+
+		Object content = message.getContent();
+		JsonNode tree = objectMapper.valueToTree(content);
+		return treeContainsValue(tree, query);
+	}
+
+	private boolean treeContainsValue(JsonNode tree, String query) {
+		LinkedList<JsonNode> nodesForSearch = new LinkedList<>();
+		nodesForSearch.add(tree);
+
+		while (!nodesForSearch.isEmpty()) {
+			JsonNode node = nodesForSearch.removeFirst();
+
+			if (node.isContainerNode()) {
+				node.elements().forEachRemaining(nodesForSearch::add);
+				continue;
+			}
+
+			String nodeValue = node.asText();
+			if (nodeValue.contains(query)) {
+				return true;
+			}
+		}
+
+		return false;
+	}
+
 	@RequiredArgsConstructor
 	private static class RecordEmitter {
-
+		private static final int MAX_POLLS_COUNT = 30;
 		private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
 
 		private final KafkaService kafkaService;

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -65,9 +65,9 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo, @Valid Integer limit, ServerWebExchange exchange) {
+    public Mono<ResponseEntity<Flux<TopicMessage>>> getTopicMessages(String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo, @Valid Integer limit, @Valid String q, ServerWebExchange exchange) {
         return parseConsumerPosition(seekType, seekTo)
-                .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, limit)));
+                .map(consumerPosition -> ResponseEntity.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
     }
 
     @Override

+ 4 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -229,6 +229,10 @@ paths:
           in: query
           schema:
             type: integer
+        - name: q
+          in: query
+          schema:
+            type: string
       responses:
         200:
           description: OK