Merge branch 'master' into audit-be-minor-refactoring

This commit is contained in:
Ilya Kuramshin 2023-08-01 19:04:49 +04:00 committed by GitHub
commit 66de0245df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 282 additions and 128 deletions

View file

@ -114,6 +114,11 @@
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View file

@ -13,6 +13,7 @@ abstract class AbstractAuthSecurityConfig {
"/resources/**",
"/actuator/health/**",
"/actuator/info",
"/actuator/prometheus",
"/auth",
"/login",
"/logout",

View file

@ -2,37 +2,28 @@ package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.time.Duration;
import java.time.Instant;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
public abstract class AbstractEmitter {
private final MessagesProcessing messagesProcessing;
private final PollingThrottler throttler;
protected final PollingSettings pollingSettings;
protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
this.messagesProcessing = messagesProcessing;
this.pollingSettings = pollingSettings;
this.throttler = pollingSettings.getPollingThrottler();
}
protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
protected PolledRecords poll(
FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
return poll(sink, consumer, pollingSettings.getPollTimeout());
}
protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
Instant start = Instant.now();
ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
Instant finish = Instant.now();
int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
throttler.throttleAfterPoll(polledBytes);
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
var records = consumer.pollEnhanced(timeout);
sendConsuming(sink, records);
return records;
}
@ -49,10 +40,8 @@ public abstract class AbstractEmitter {
messagesProcessing.sendPhase(sink, name);
}
protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> records,
long elapsed) {
return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
messagesProcessing.sentConsumingInfo(sink, records);
}
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {

View file

@ -9,9 +9,7 @@ import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
@ -22,12 +20,12 @@ public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;
public BackwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
@ -41,7 +39,7 @@ public class BackwardRecordEmitter
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting backward polling for {}", consumerPosition);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Created consumer");
var seekOperations = SeekOperations.create(consumer, consumerPosition);
@ -91,7 +89,7 @@ public class BackwardRecordEmitter
TopicPartition tp,
long fromOffset,
long toOffset,
Consumer<Bytes, Bytes> consumer,
EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink
) {
consumer.assign(Collections.singleton(tp));
@ -101,13 +99,13 @@ public class BackwardRecordEmitter
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
emptyPolls.count(polledRecords);
emptyPolls.count(polledRecords.count());
log.debug("{} records polled from {}", polledRecords.count(), tp);
@ -115,7 +113,7 @@ public class BackwardRecordEmitter
.filter(r -> r.offset() < toOffset)
.toList();
if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
break;
}

View file

@ -2,9 +2,6 @@ package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
class ConsumingStats {
@ -13,23 +10,17 @@ class ConsumingStats {
private int records = 0;
private long elapsed = 0;
/**
* returns bytes polled.
*/
int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed,
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
PolledRecords polledRecords,
int filterApplyErrors) {
int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
bytes += polledBytes;
bytes += polledRecords.bytes();
this.records += polledRecords.count();
this.elapsed += elapsed;
this.elapsed += polledRecords.elapsed().toMillis();
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
.consuming(createConsumingStats(sink, filterApplyErrors))
);
return polledBytes;
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {

View file

@ -17,8 +17,8 @@ public class EmptyPollsCounter {
this.maxEmptyPolls = maxEmptyPolls;
}
public void count(ConsumerRecords<?, ?> polled) {
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
public void count(int polledCount) {
emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
}
public boolean noDataEmptyPollsReached() {

View file

@ -0,0 +1,82 @@
package com.provectus.kafka.ui.emitter;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {
private final PollingThrottler throttler;
private final ApplicationMetrics metrics;
private String pollingTopic;
public EnhancedConsumer(Properties properties,
PollingThrottler throttler,
ApplicationMetrics metrics) {
super(properties, new BytesDeserializer(), new BytesDeserializer());
this.throttler = throttler;
this.metrics = metrics;
metrics.activeConsumers().incrementAndGet();
}
public PolledRecords pollEnhanced(Duration dur) {
var stopwatch = Stopwatch.createStarted();
ConsumerRecords<Bytes, Bytes> polled = poll(dur);
PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
return polledEnhanced;
}
@Override
public void assign(Collection<TopicPartition> partitions) {
super.assign(partitions);
Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
Preconditions.checkState(assignedTopics.size() == 1);
this.pollingTopic = assignedTopics.iterator().next();
}
@Override
public void subscribe(Pattern pattern) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Collection<String> topics) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void close(Duration timeout) {
metrics.activeConsumers().decrementAndGet();
super.close(timeout);
}
}

View file

@ -5,8 +5,6 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@ -16,11 +14,11 @@ public class ForwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition position;
public ForwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition position,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
@ -32,7 +30,7 @@ public class ForwardRecordEmitter
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting forward polling for {}", position);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions");
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
@ -44,8 +42,8 @@ public class ForwardRecordEmitter
&& !emptyPolls.noDataEmptyPollsReached()) {
sendPhase(sink, "Polling");
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
emptyPolls.count(records);
var records = poll(sink, consumer);
emptyPolls.count(records.count());
log.debug("{} records polled", records.count());

View file

@ -8,7 +8,6 @@ import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@ -54,13 +53,10 @@ public class MessagesProcessing {
}
}
int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed) {
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
if (!sink.isCancelled()) {
return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
}
return 0;
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {

View file

@ -0,0 +1,48 @@
package com.provectus.kafka.ui.emitter;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Bytes;
public record PolledRecords(int count,
int bytes,
Duration elapsed,
ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {
static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
return new PolledRecords(
polled.count(),
calculatePolledRecSize(polled),
pollDuration,
polled
);
}
public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
return records.records(tp);
}
@Override
public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
return records.iterator();
}
private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
int polledBytes = 0;
for (ConsumerRecord<Bytes, Bytes> rec : recs) {
for (Header header : rec.headers()) {
polledBytes +=
(header.key() != null ? header.key().getBytes().length : 0)
+ (header.value() != null ? header.value().length : 0);
}
polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
}
return polledBytes;
}
}

View file

@ -3,11 +3,8 @@ package com.provectus.kafka.ui.emitter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
@Slf4j
public class PollingThrottler {
@ -36,18 +33,17 @@ public class PollingThrottler {
return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
}
public void throttleAfterPoll(int polledBytes) {
//returns true if polling was throttled
public boolean throttleAfterPoll(int polledBytes) {
if (polledBytes > 0) {
double sleptSeconds = rateLimiter.acquire(polledBytes);
if (!throttled && sleptSeconds > 0.0) {
throttled = true;
log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
return true;
}
}
}
public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
return false;
}
}

View file

@ -5,19 +5,17 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.HashMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class TailingEmitter extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
@ -29,7 +27,7 @@ public class TailingEmitter extends AbstractEmitter
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting tailing polling for {}", consumerPosition);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
assignAndSeek(consumer);
while (!sink.isCancelled()) {
sendPhase(sink, "Polling");
@ -47,7 +45,7 @@ public class TailingEmitter extends AbstractEmitter
}
}
private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
private void assignAndSeek(EnhancedConsumer consumer) {
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions

View file

@ -16,6 +16,7 @@ import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
@ -73,6 +74,10 @@ public class Permission {
}
private List<String> getAllActionValues() {
if (resource == null) {
return Collections.emptyList();
}
return switch (this.resource) {
case APPLICATIONCONFIG -> Arrays.stream(ApplicationConfigAction.values()).map(Enum::toString).toList();
case CLUSTERCONFIG -> Arrays.stream(ClusterConfigAction.values()).map(Enum::toString).toList();

View file

@ -2,12 +2,14 @@ package com.provectus.kafka.ui.service;
import com.google.common.collect.Streams;
import com.google.common.collect.Table;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
import com.provectus.kafka.ui.model.InternalConsumerGroup;
import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.ArrayList;
import java.util.Collection;
@ -26,11 +28,8 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@ -248,25 +247,27 @@ public class ConsumerGroupService {
.flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
}
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
public EnhancedConsumer createConsumer(KafkaCluster cluster) {
return createConsumer(cluster, Map.of());
}
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
Map<String, Object> properties) {
public EnhancedConsumer createConsumer(KafkaCluster cluster,
Map<String, Object> properties) {
Properties props = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
props.putAll(properties);
return new KafkaConsumer<>(props);
return new EnhancedConsumer(
props,
cluster.getPollingSettings().getPollingThrottler(),
ApplicationMetrics.forCluster(cluster)
);
}
}

View file

@ -1,9 +1,9 @@
package com.provectus.kafka.ui.service.analyze;
import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.OffsetsInfo;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.PollingThrottler;
import com.provectus.kafka.ui.exception.TopicAnalysisException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
@ -20,11 +20,9 @@ import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -84,12 +82,11 @@ public class TopicAnalysisService {
private final int partitionsCnt;
private final long approxNumberOfMsgs;
private final EmptyPollsCounter emptyPollsCounter;
private final PollingThrottler throttler;
private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
private final KafkaConsumer<Bytes, Bytes> consumer;
private final EnhancedConsumer consumer;
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
long approxNumberOfMsgs, PollingSettings pollingSettings) {
@ -104,7 +101,6 @@ public class TopicAnalysisService {
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
)
);
this.throttler = pollingSettings.getPollingThrottler();
this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
}
@ -127,9 +123,8 @@ public class TopicAnalysisService {
var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
var polled = consumer.poll(Duration.ofSeconds(3));
throttler.throttleAfterPoll(polled);
emptyPollsCounter.count(polled);
var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
emptyPollsCounter.count(polled.count());
polled.forEach(r -> {
totalStats.apply(r);
partitionStats.get(r.partition()).apply(r);

View file

@ -0,0 +1,82 @@
package com.provectus.kafka.ui.util;
import static lombok.AccessLevel.PRIVATE;
import com.google.common.annotations.VisibleForTesting;
import com.provectus.kafka.ui.emitter.PolledRecords;
import com.provectus.kafka.ui.model.KafkaCluster;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor(access = PRIVATE)
public class ApplicationMetrics {
private final String clusterName;
private final MeterRegistry registry;
public static ApplicationMetrics forCluster(KafkaCluster cluster) {
return new ApplicationMetrics(cluster.getName(), Metrics.globalRegistry);
}
@VisibleForTesting
public static ApplicationMetrics noop() {
return new ApplicationMetrics("noop", new SimpleMeterRegistry());
}
public void meterPolledRecords(String topic, PolledRecords polled, boolean throttled) {
pollTimer(topic).record(polled.elapsed());
polledRecords(topic).increment(polled.count());
polledBytes(topic).record(polled.bytes());
if (throttled) {
pollThrottlingActivations().increment();
}
}
private Counter polledRecords(String topic) {
return Counter.builder("topic_records_polled")
.description("Number of records polled from topic")
.tag("cluster", clusterName)
.tag("topic", topic)
.register(registry);
}
private DistributionSummary polledBytes(String topic) {
return DistributionSummary.builder("topic_polled_bytes")
.description("Bytes polled from kafka topic")
.tag("cluster", clusterName)
.tag("topic", topic)
.register(registry);
}
private Timer pollTimer(String topic) {
return Timer.builder("topic_poll_time")
.description("Time spend in polling for topic")
.tag("cluster", clusterName)
.tag("topic", topic)
.register(registry);
}
private Counter pollThrottlingActivations() {
return Counter.builder("poll_throttling_activations")
.description("Number of poll throttling activations")
.tag("cluster", clusterName)
.register(registry);
}
public AtomicInteger activeConsumers() {
var count = new AtomicInteger();
Gauge.builder("active_consumers", () -> count)
.description("Number of active consumers")
.tag("cluster", clusterName)
.register(registry);
return count;
}
}

View file

@ -1,29 +0,0 @@
package com.provectus.kafka.ui.util;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Bytes;
public class ConsumerRecordsUtil {
public static int calculatePolledRecSize(ConsumerRecord<Bytes, Bytes> rec) {
int polledBytes = 0;
for (Header header : rec.headers()) {
polledBytes +=
(header.key() != null ? header.key().getBytes().length : 0)
+ (header.value() != null ? header.value().length : 0);
}
polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
return polledBytes;
}
public static int calculatePolledSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
int polledBytes = 0;
for (ConsumerRecord<Bytes, Bytes> rec : recs) {
polledBytes += calculatePolledRecSize(rec);
}
return polledBytes;
}
}

View file

@ -10,7 +10,7 @@ management:
endpoints:
web:
exposure:
include: "info,health"
include: "info,health,prometheus"
logging:
level:

View file

@ -8,9 +8,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.PollingThrottler;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
@ -18,6 +20,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@ -38,7 +41,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -325,22 +327,20 @@ class RecordEmitterTest extends AbstractIntegrationTest {
assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
}
private KafkaConsumer<Bytes, Bytes> createConsumer() {
private EnhancedConsumer createConsumer() {
return createConsumer(Map.of());
}
private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
private EnhancedConsumer createConsumer(Map<String, Object> properties) {
final Map<String, ? extends Serializable> map = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19 // to check multiple polls
);
Properties props = new Properties();
props.putAll(map);
props.putAll(properties);
return new KafkaConsumer<>(props);
return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
}
@Value

View file

@ -19,8 +19,8 @@ const UserInfo = () => {
</S.Wrapper>
}
>
<DropdownItem>
<S.LogoutLink href={`${window.basePath}/logout`}>Log out</S.LogoutLink>
<DropdownItem href={`${window.basePath}/logout`}>
<S.LogoutLink>Log out</S.LogoutLink>
</DropdownItem>
</Dropdown>
) : null;

View file

@ -34,7 +34,6 @@ describe('UserInfo', () => {
const logout = screen.getByText('Log out');
expect(logout).toBeInTheDocument();
expect(logout).toHaveAttribute('href', '/logout');
});
it('should render correct url during basePath initialization', async () => {
@ -50,7 +49,6 @@ describe('UserInfo', () => {
const logout = screen.getByText('Log out');
expect(logout).toBeInTheDocument();
expect(logout).toHaveAttribute('href', `${baseUrl}/logout`);
});
it('should not render anything if the username does not exists', () => {