Merge branch 'master' into ISSUE_754_acl
This commit is contained in:
commit
fd6fb51eb2
10 changed files with 96 additions and 38 deletions
|
@ -64,6 +64,18 @@ public abstract class AbstractEmitter {
|
|||
protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink,
|
||||
ConsumerRecords<Bytes, Bytes> records,
|
||||
long elapsed) {
|
||||
consumingStats.sendConsumingEvt(sink, records, elapsed);
|
||||
consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
|
||||
}
|
||||
|
||||
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
|
||||
consumingStats.sendFinishEvent(sink, getFilterApplyErrors(sink));
|
||||
sink.complete();
|
||||
}
|
||||
|
||||
protected Number getFilterApplyErrors(FluxSink<?> sink) {
|
||||
return sink.contextView()
|
||||
.<MessageFilterStats>getOrEmpty(MessageFilterStats.class)
|
||||
.<Number>map(MessageFilterStats::getFilterApplyErrors)
|
||||
.orElse(0);
|
||||
}
|
||||
}
|
|
@ -80,7 +80,7 @@ public class BackwardRecordEmitter
|
|||
log.debug("sink is cancelled after partitions poll iteration");
|
||||
}
|
||||
}
|
||||
sink.complete();
|
||||
sendFinishStatsAndCompleteSink(sink);
|
||||
log.debug("Polling finished");
|
||||
} catch (Exception e) {
|
||||
log.error("Error occurred while consuming records", e);
|
||||
|
|
|
@ -16,7 +16,8 @@ class ConsumingStats {
|
|||
|
||||
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
|
||||
ConsumerRecords<Bytes, Bytes> polledRecords,
|
||||
long elapsed) {
|
||||
long elapsed,
|
||||
Number filterApplyErrors) {
|
||||
for (ConsumerRecord<Bytes, Bytes> rec : polledRecords) {
|
||||
for (Header header : rec.headers()) {
|
||||
bytes +=
|
||||
|
@ -27,15 +28,28 @@ class ConsumingStats {
|
|||
}
|
||||
this.records += polledRecords.count();
|
||||
this.elapsed += elapsed;
|
||||
final TopicMessageConsumingDTO consuming = new TopicMessageConsumingDTO()
|
||||
.bytesConsumed(this.bytes)
|
||||
.elapsedMs(this.elapsed)
|
||||
.isCancelled(sink.isCancelled())
|
||||
.messagesConsumed(this.records);
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
|
||||
.consuming(consuming)
|
||||
.consuming(createConsumingStats(sink, filterApplyErrors))
|
||||
);
|
||||
}
|
||||
|
||||
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, Number filterApplyErrors) {
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.DONE)
|
||||
.consuming(createConsumingStats(sink, filterApplyErrors))
|
||||
);
|
||||
}
|
||||
|
||||
private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
|
||||
Number filterApplyErrors) {
|
||||
return new TopicMessageConsumingDTO()
|
||||
.bytesConsumed(this.bytes)
|
||||
.elapsedMs(this.elapsed)
|
||||
.isCancelled(sink.isCancelled())
|
||||
.filterApplyErrors(filterApplyErrors.intValue())
|
||||
.messagesConsumed(this.records);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ public class ForwardRecordEmitter
|
|||
}
|
||||
}
|
||||
}
|
||||
sink.complete();
|
||||
sendFinishStatsAndCompleteSink(sink);
|
||||
log.info("Polling finished");
|
||||
} catch (Exception e) {
|
||||
log.error("Error occurred while consuming records", e);
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Getter;
|
||||
|
||||
public class MessageFilterStats {
|
||||
|
||||
@Getter(AccessLevel.PACKAGE)
|
||||
private final AtomicLong filterApplyErrors = new AtomicLong();
|
||||
|
||||
public final void incrementApplyErrors() {
|
||||
filterApplyErrors.incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
|
@ -9,6 +9,7 @@ import javax.annotation.Nullable;
|
|||
import javax.script.CompiledScript;
|
||||
import javax.script.ScriptEngineManager;
|
||||
import javax.script.ScriptException;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.codehaus.groovy.jsr223.GroovyScriptEngineImpl;
|
||||
|
@ -40,25 +41,26 @@ public class MessageFilters {
|
|||
static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
|
||||
var compiledScript = compileScript(script);
|
||||
var jsonSlurper = new JsonSlurper();
|
||||
return msg -> {
|
||||
var bindings = getGroovyEngine().createBindings();
|
||||
bindings.put("partition", msg.getPartition());
|
||||
bindings.put("offset", msg.getOffset());
|
||||
bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli());
|
||||
bindings.put("keyAsText", msg.getKey());
|
||||
bindings.put("valueAsText", msg.getContent());
|
||||
bindings.put("headers", msg.getHeaders());
|
||||
bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey()));
|
||||
bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent()));
|
||||
try {
|
||||
return new Predicate<TopicMessageDTO>() {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public boolean test(TopicMessageDTO msg) {
|
||||
var bindings = getGroovyEngine().createBindings();
|
||||
bindings.put("partition", msg.getPartition());
|
||||
bindings.put("offset", msg.getOffset());
|
||||
bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli());
|
||||
bindings.put("keyAsText", msg.getKey());
|
||||
bindings.put("valueAsText", msg.getContent());
|
||||
bindings.put("headers", msg.getHeaders());
|
||||
bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey()));
|
||||
bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent()));
|
||||
var result = compiledScript.eval(bindings);
|
||||
if (result instanceof Boolean) {
|
||||
return (Boolean) result;
|
||||
} else {
|
||||
throw new ValidationException(
|
||||
String.format("Unexpected script result: %s, Boolean should be returned instead", result));
|
||||
}
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
log.trace("Error executing filter script '{}' on message '{}' ", script, msg, e);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
|
|||
properties
|
||||
.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
||||
properties.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "kafka-ui-app");
|
||||
return AdminClient.create(properties);
|
||||
})
|
||||
.flatMap(ReactiveAdminClient::create)
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
|
|||
|
||||
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
|
||||
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
|
||||
import com.provectus.kafka.ui.emitter.MessageFilterStats;
|
||||
import com.provectus.kafka.ui.emitter.MessageFilters;
|
||||
import com.provectus.kafka.ui.emitter.TailingEmitter;
|
||||
import com.provectus.kafka.ui.exception.TopicNotFoundException;
|
||||
|
@ -172,8 +173,10 @@ public class MessagesService {
|
|||
recordDeserializer
|
||||
);
|
||||
}
|
||||
MessageFilterStats filterStats = new MessageFilterStats();
|
||||
return Flux.create(emitter)
|
||||
.filter(getMsgFilter(query, filterQueryType))
|
||||
.contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
|
||||
.filter(getMsgFilter(query, filterQueryType, filterStats))
|
||||
.takeWhile(createTakeWhilePredicate(seekDirection, limit))
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.share();
|
||||
|
@ -186,7 +189,9 @@ public class MessagesService {
|
|||
: new ResultSizeLimiter(limit);
|
||||
}
|
||||
|
||||
private Predicate<TopicMessageEventDTO> getMsgFilter(String query, MessageFilterTypeDTO filterQueryType) {
|
||||
private Predicate<TopicMessageEventDTO> getMsgFilter(String query,
|
||||
MessageFilterTypeDTO filterQueryType,
|
||||
MessageFilterStats filterStats) {
|
||||
if (StringUtils.isEmpty(query)) {
|
||||
return evt -> true;
|
||||
}
|
||||
|
@ -194,7 +199,13 @@ public class MessagesService {
|
|||
return evt -> {
|
||||
// we only apply filter for message events
|
||||
if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
|
||||
return messageFilter.test(evt.getMessage());
|
||||
try {
|
||||
return messageFilter.test(evt.getMessage());
|
||||
} catch (Exception e) {
|
||||
filterStats.incrementApplyErrors();
|
||||
log.trace("Error applying filter '{}' for message {}", query, evt.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
|
|
@ -120,17 +120,17 @@ class RecordEmitterTest extends AbstractIntegrationTest {
|
|||
RECORD_DESERIALIZER
|
||||
);
|
||||
|
||||
StepVerifier.create(
|
||||
Flux.create(forwardEmitter)
|
||||
.filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
||||
.take(100)
|
||||
).expectNextCount(0).expectComplete().verify();
|
||||
StepVerifier.create(Flux.create(forwardEmitter))
|
||||
.expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.PHASE))
|
||||
.expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.DONE))
|
||||
.expectComplete()
|
||||
.verify();
|
||||
|
||||
StepVerifier.create(
|
||||
Flux.create(backwardEmitter)
|
||||
.filter(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
|
||||
.take(100)
|
||||
).expectNextCount(0).expectComplete().verify();
|
||||
StepVerifier.create(Flux.create(backwardEmitter))
|
||||
.expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.PHASE))
|
||||
.expectNextMatches(m -> m.getType().equals(TopicMessageEventDTO.TypeEnum.DONE))
|
||||
.expectComplete()
|
||||
.verify();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -2514,6 +2514,8 @@ components:
|
|||
type: boolean
|
||||
messagesConsumed:
|
||||
type: integer
|
||||
filterApplyErrors:
|
||||
type: integer
|
||||
|
||||
|
||||
TopicMessage:
|
||||
|
|
Loading…
Add table
Reference in a new issue