minor improvements

This commit is contained in:
iliax 2023-08-14 01:26:19 +04:00
parent 89104ce1c3
commit c685bf057a
3 changed files with 3 additions and 5 deletions

View file

@ -5,7 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final MessagesProcessing messagesProcessing;
private final PollingSettings pollingSettings;

View file

@ -14,7 +14,7 @@ import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public abstract class RangePollingEmitter extends AbstractEmitter {
abstract class RangePollingEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier;
protected final ConsumerPosition consumerPosition;

View file

@ -2,7 +2,6 @@ package com.provectus.kafka.ui.service;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.AbstractEmitter;
import com.provectus.kafka.ui.emitter.BackwardEmitter;
import com.provectus.kafka.ui.emitter.ForwardEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters;
@ -18,7 +17,6 @@ import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.time.Instant;
@ -232,7 +230,7 @@ public class MessagesService {
var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
var filter = getMsgFilter(query, filterQueryType);
AbstractEmitter emitter = switch (seekDirection) {
var emitter = switch (seekDirection) {
case FORWARD -> new ForwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()