ISSUE-2680: Smart filters stats propagation to UI (#2771)

ISSUE-2680: Smart filters stats propagation to UI : 
1. DONE event type implemented to flag polling end
2. MessageFilterStats added to emitters context to count filter apply errors
This commit is contained in:
Ilya Kuramshin 2022-10-31 17:41:11 +04:00 committed by GitHub
parent aa6c3083c2
commit 2c74b4e1c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 96 additions and 38 deletions

View file

@ -64,6 +64,18 @@ public abstract class AbstractEmitter {
protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> records,
long elapsed) {
consumingStats.sendConsumingEvt(sink, records, elapsed);
consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
}
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
consumingStats.sendFinishEvent(sink, getFilterApplyErrors(sink));
sink.complete();
}
protected Number getFilterApplyErrors(FluxSink<?> sink) {
return sink.contextView()
.<MessageFilterStats>getOrEmpty(MessageFilterStats.class)
.<Number>map(MessageFilterStats::getFilterApplyErrors)
.orElse(0);
}
}

View file

@ -80,7 +80,7 @@ public class BackwardRecordEmitter
log.debug("sink is cancelled after partitions poll iteration");
}
}
sink.complete();
sendFinishStatsAndCompleteSink(sink);
log.debug("Polling finished");
} catch (Exception e) {
log.error("Error occurred while consuming records", e);

View file

@ -16,7 +16,8 @@ class ConsumingStats {
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed) {
long elapsed,
Number filterApplyErrors) {
for (ConsumerRecord<Bytes, Bytes> rec : polledRecords) {
for (Header header : rec.headers()) {
bytes +=
@ -27,15 +28,28 @@ class ConsumingStats {
}
this.records += polledRecords.count();
this.elapsed += elapsed;
final TopicMessageConsumingDTO consuming = new TopicMessageConsumingDTO()
.bytesConsumed(this.bytes)
.elapsedMs(this.elapsed)
.isCancelled(sink.isCancelled())
.messagesConsumed(this.records);
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
.consuming(consuming)
.consuming(createConsumingStats(sink, filterApplyErrors))
);
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, Number filterApplyErrors) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.consuming(createConsumingStats(sink, filterApplyErrors))
);
}
private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
Number filterApplyErrors) {
return new TopicMessageConsumingDTO()
.bytesConsumed(this.bytes)
.elapsedMs(this.elapsed)
.isCancelled(sink.isCancelled())
.filterApplyErrors(filterApplyErrors.intValue())
.messagesConsumed(this.records);
}
}

View file

@ -54,7 +54,7 @@ public class ForwardRecordEmitter
}
}
}
sink.complete();
sendFinishStatsAndCompleteSink(sink);
log.info("Polling finished");
} catch (Exception e) {
log.error("Error occurred while consuming records", e);

View file

@ -0,0 +1,16 @@
package com.provectus.kafka.ui.emitter;
import java.util.concurrent.atomic.AtomicLong;
import lombok.AccessLevel;
import lombok.Getter;
public class MessageFilterStats {
@Getter(AccessLevel.PACKAGE)
private final AtomicLong filterApplyErrors = new AtomicLong();
public final void incrementApplyErrors() {
filterApplyErrors.incrementAndGet();
}
}

View file

@ -9,6 +9,7 @@ import javax.annotation.Nullable;
import javax.script.CompiledScript;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.groovy.jsr223.GroovyScriptEngineImpl;
@ -40,7 +41,10 @@ public class MessageFilters {
static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
var compiledScript = compileScript(script);
var jsonSlurper = new JsonSlurper();
return msg -> {
return new Predicate<TopicMessageDTO>() {
@SneakyThrows
@Override
public boolean test(TopicMessageDTO msg) {
var bindings = getGroovyEngine().createBindings();
bindings.put("partition", msg.getPartition());
bindings.put("offset", msg.getOffset());
@ -50,15 +54,13 @@ public class MessageFilters {
bindings.put("headers", msg.getHeaders());
bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey()));
bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent()));
try {
var result = compiledScript.eval(bindings);
if (result instanceof Boolean) {
return (Boolean) result;
} else {
throw new ValidationException(
String.format("Unexpected script result: %s, Boolean should be returned instead", result));
}
return false;
} catch (Exception e) {
log.trace("Error executing filter script '{}' on message '{}' ", script, msg, e);
return false;
}
};
}

View file

@ -37,6 +37,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
properties
.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "kafka-ui-app");
return AdminClient.create(properties);
})
.flatMap(ReactiveAdminClient::create)

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessageFilterStats;
import com.provectus.kafka.ui.emitter.MessageFilters;
import com.provectus.kafka.ui.emitter.TailingEmitter;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
@ -172,8 +173,10 @@ public class MessagesService {
recordDeserializer
);
}
MessageFilterStats filterStats = new MessageFilterStats();
return Flux.create(emitter)
.filter(getMsgFilter(query, filterQueryType))
.contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
.filter(getMsgFilter(query, filterQueryType, filterStats))
.takeWhile(createTakeWhilePredicate(seekDirection, limit))
.subscribeOn(Schedulers.boundedElastic())
.share();
@ -186,7 +189,9 @@ public class MessagesService {
: new ResultSizeLimiter(limit);
}
private Predicate<TopicMessageEventDTO> getMsgFilter(String query, MessageFilterTypeDTO filterQueryType) {
private Predicate<TopicMessageEventDTO> getMsgFilter(String query,
MessageFilterTypeDTO filterQueryType,
MessageFilterStats filterStats) {
if (StringUtils.isEmpty(query)) {
return evt -> true;
}
@ -194,7 +199,13 @@ public class MessagesService {
return evt -> {
// we only apply filter for message events
if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
try {
return messageFilter.test(evt.getMessage());
} catch (Exception e) {
filterStats.incrementApplyErrors();
log.trace("Error applying filter '{}' for message {}", query, evt.getMessage());
return false;
}
}
return true;
};

View file

@ -120,17 +120,17 @@ class RecordEmitterTest extends AbstractIntegrationTest {
RECORD_DESERIALIZER
);
StepVerifier.create(
Flux.create(forwardEmitter)
.filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
.take(100)
).expectNextCount(0).expectComplete().verify();
StepVerifier.create(Flux.create(forwardEmitter))
.expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.PHASE))
.expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.DONE))
.expectComplete()
.verify();
StepVerifier.create(
Flux.create(backwardEmitter)
.filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
.take(100)
).expectNextCount(0).expectComplete().verify();
StepVerifier.create(Flux.create(backwardEmitter))
.expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.PHASE))
.expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.DONE))
.expectComplete()
.verify();
}
@Test

View file

@ -2408,6 +2408,8 @@ components:
type: boolean
messagesConsumed:
type: integer
filterApplyErrors:
type: integer
TopicMessage: