From 2c74b4e1c2ddedcd0f73843eb630ed98340cf3dd Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Mon, 31 Oct 2022 17:41:11 +0400 Subject: [PATCH] ISSUE-2680: Smart filters stats propagation to UI (#2771) ISSUE-2680: Smart filters stats propagation to UI : 1. DONE event type implemented to flag polling end 2. MessageFilterStats added to emitters context to count filter apply errors --- .../kafka/ui/emitter/AbstractEmitter.java | 14 +++++++- .../ui/emitter/BackwardRecordEmitter.java | 2 +- .../kafka/ui/emitter/ConsumingStats.java | 28 ++++++++++++---- .../ui/emitter/ForwardRecordEmitter.java | 2 +- .../kafka/ui/emitter/MessageFilterStats.java | 16 ++++++++++ .../kafka/ui/emitter/MessageFilters.java | 32 ++++++++++--------- .../ui/service/AdminClientServiceImpl.java | 1 + .../kafka/ui/service/MessagesService.java | 17 ++++++++-- .../kafka/ui/service/RecordEmitterTest.java | 20 ++++++------ .../main/resources/swagger/kafka-ui-api.yaml | 2 ++ 10 files changed, 96 insertions(+), 38 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilterStats.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java index ddb5dc3666..35d8a3bef3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -64,6 +64,18 @@ public abstract class AbstractEmitter { protected void sendConsuming(FluxSink sink, ConsumerRecords records, long elapsed) { - consumingStats.sendConsumingEvt(sink, records, elapsed); + consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink)); + } + + protected void sendFinishStatsAndCompleteSink(FluxSink sink) { + consumingStats.sendFinishEvent(sink, getFilterApplyErrors(sink)); + sink.complete(); + } + + protected Number getFilterApplyErrors(FluxSink sink) { + return sink.contextView() + .getOrEmpty(MessageFilterStats.class) + .map(MessageFilterStats::getFilterApplyErrors) + .orElse(0); } } \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index d2012355db..fe1644676a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -80,7 +80,7 @@ public class BackwardRecordEmitter log.debug("sink is cancelled after partitions poll iteration"); } } - sink.complete(); + sendFinishStatsAndCompleteSink(sink); log.debug("Polling finished"); } catch (Exception e) { log.error("Error occurred while consuming records", e); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java index 830eb87320..bd0754fc68 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java @@ -16,7 +16,8 @@ class ConsumingStats { void sendConsumingEvt(FluxSink sink, ConsumerRecords polledRecords, - long elapsed) { + long elapsed, + Number filterApplyErrors) { for (ConsumerRecord rec : polledRecords) { for (Header header : rec.headers()) { bytes += @@ -27,15 +28,28 @@ class ConsumingStats { } this.records += polledRecords.count(); this.elapsed += elapsed; - final TopicMessageConsumingDTO consuming = new TopicMessageConsumingDTO() - .bytesConsumed(this.bytes) - .elapsedMs(this.elapsed) - .isCancelled(sink.isCancelled()) - .messagesConsumed(this.records); sink.next( new TopicMessageEventDTO() .type(TopicMessageEventDTO.TypeEnum.CONSUMING) - .consuming(consuming) + .consuming(createConsumingStats(sink, filterApplyErrors)) ); } + + void sendFinishEvent(FluxSink sink, Number filterApplyErrors) { + sink.next( + new TopicMessageEventDTO() + .type(TopicMessageEventDTO.TypeEnum.DONE) + .consuming(createConsumingStats(sink, filterApplyErrors)) + ); + } + + private TopicMessageConsumingDTO createConsumingStats(FluxSink sink, + Number filterApplyErrors) { + return new TopicMessageConsumingDTO() + .bytesConsumed(this.bytes) + .elapsedMs(this.elapsed) + .isCancelled(sink.isCancelled()) + .filterApplyErrors(filterApplyErrors.intValue()) + .messagesConsumed(this.records); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java index 69d9801b70..502a8c9a26 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -54,7 +54,7 @@ public class ForwardRecordEmitter } } } - sink.complete(); + sendFinishStatsAndCompleteSink(sink); log.info("Polling finished"); } catch (Exception e) { log.error("Error occurred while consuming records", e); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilterStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilterStats.java new file mode 100644 index 0000000000..3b6df3cdea --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilterStats.java @@ -0,0 +1,16 @@ +package com.provectus.kafka.ui.emitter; + +import java.util.concurrent.atomic.AtomicLong; +import lombok.AccessLevel; +import lombok.Getter; + +public class MessageFilterStats { + + @Getter(AccessLevel.PACKAGE) + private final AtomicLong filterApplyErrors = new AtomicLong(); + + public final void incrementApplyErrors() { + filterApplyErrors.incrementAndGet(); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java index ea0707f102..e48501f6a7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java @@ -9,6 +9,7 @@ import javax.annotation.Nullable; import javax.script.CompiledScript; import javax.script.ScriptEngineManager; import javax.script.ScriptException; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.codehaus.groovy.jsr223.GroovyScriptEngineImpl; @@ -40,25 +41,26 @@ public class MessageFilters { static Predicate groovyScriptFilter(String script) { var compiledScript = compileScript(script); var jsonSlurper = new JsonSlurper(); - return msg -> { - var bindings = getGroovyEngine().createBindings(); - bindings.put("partition", msg.getPartition()); - bindings.put("offset", msg.getOffset()); - bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli()); - bindings.put("keyAsText", msg.getKey()); - bindings.put("valueAsText", msg.getContent()); - bindings.put("headers", msg.getHeaders()); - bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey())); - bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent())); - try { + return new Predicate() { + @SneakyThrows + @Override + public boolean test(TopicMessageDTO msg) { + var bindings = getGroovyEngine().createBindings(); + bindings.put("partition", msg.getPartition()); + bindings.put("offset", msg.getOffset()); + bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli()); + bindings.put("keyAsText", msg.getKey()); + bindings.put("valueAsText", msg.getContent()); + bindings.put("headers", msg.getHeaders()); + bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey())); + bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent())); var result = compiledScript.eval(bindings); if (result instanceof Boolean) { return (Boolean) result; + } else { + throw new ValidationException( + String.format("Unexpected script result: %s, Boolean should be returned instead", result)); } - return false; - } catch (Exception e) { - log.trace("Error executing filter script '{}' on message '{}' ", script, msg, e); - return false; } }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java index 7b93b1420e..9291198df7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java @@ -37,6 +37,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable { properties .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); + properties.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "kafka-ui-app"); return AdminClient.create(properties); }) .flatMap(ReactiveAdminClient::create) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index ecfeda0122..d0bf94c98b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; +import com.provectus.kafka.ui.emitter.MessageFilterStats; import com.provectus.kafka.ui.emitter.MessageFilters; import com.provectus.kafka.ui.emitter.TailingEmitter; import com.provectus.kafka.ui.exception.TopicNotFoundException; @@ -172,8 +173,10 @@ public class MessagesService { recordDeserializer ); } + MessageFilterStats filterStats = new MessageFilterStats(); return Flux.create(emitter) - .filter(getMsgFilter(query, filterQueryType)) + .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats)) + .filter(getMsgFilter(query, filterQueryType, filterStats)) .takeWhile(createTakeWhilePredicate(seekDirection, limit)) .subscribeOn(Schedulers.boundedElastic()) .share(); @@ -186,7 +189,9 @@ public class MessagesService { : new ResultSizeLimiter(limit); } - private Predicate getMsgFilter(String query, MessageFilterTypeDTO filterQueryType) { + private Predicate getMsgFilter(String query, + MessageFilterTypeDTO filterQueryType, + MessageFilterStats filterStats) { if (StringUtils.isEmpty(query)) { return evt -> true; } @@ -194,7 +199,13 @@ public class MessagesService { return evt -> { // we only apply filter for message events if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) { - return messageFilter.test(evt.getMessage()); + try { + return messageFilter.test(evt.getMessage()); + } catch (Exception e) { + filterStats.incrementApplyErrors(); + log.trace("Error applying filter '{}' for message {}", query, evt.getMessage()); + return false; + } } return true; }; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java index 3289d177d2..350386fd7e 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -120,17 +120,17 @@ class RecordEmitterTest extends AbstractIntegrationTest { RECORD_DESERIALIZER ); - StepVerifier.create( - Flux.create(forwardEmitter) - .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .take(100) - ).expectNextCount(0).expectComplete().verify(); + StepVerifier.create(Flux.create(forwardEmitter)) + .expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.PHASE)) + .expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.DONE)) + .expectComplete() + .verify(); - StepVerifier.create( - Flux.create(backwardEmitter) - .filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) - .take(100) - ).expectNextCount(0).expectComplete().verify(); + StepVerifier.create(Flux.create(backwardEmitter)) + .expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.PHASE)) + .expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.DONE)) + .expectComplete() + .verify(); } @Test diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index a1ba87d85a..1034fc3202 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2408,6 +2408,8 @@ components: type: boolean messagesConsumed: type: integer + filterApplyErrors: + type: integer TopicMessage: