This commit is contained in:
iliax 2023-07-28 16:48:08 +04:00
parent 0dd25eb507
commit 3ddd31e327
3 changed files with 52 additions and 38 deletions

View file

@ -4,60 +4,79 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
@RequiredArgsConstructor
public class EnhancedConsumer implements Consumer<Bytes, Bytes> {
public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {
@Delegate
private final Consumer<Bytes, Bytes> consumer;
private final PollingThrottler throttler;
private final ApplicationMetrics metrics;
private String pollingTopic;
public static EnhancedConsumer create(Properties properties,
PollingThrottler throttler,
ApplicationMetrics metrics) {
return new EnhancedConsumer(createInternalConsumer(properties, metrics), throttler, metrics);
}
private static KafkaConsumer<Bytes, Bytes> createInternalConsumer(Properties properties, ApplicationMetrics metrics) {
public EnhancedConsumer(Properties properties,
PollingThrottler throttler,
ApplicationMetrics metrics) {
super(properties, new BytesDeserializer(), new BytesDeserializer());
this.throttler = throttler;
this.metrics = metrics;
metrics.activeConsumers().incrementAndGet();
try {
return new KafkaConsumer<>(properties) {
@Override
public void close(Duration timeout) {
metrics.activeConsumers().decrementAndGet();
super.close(timeout);
}
};
} catch (Exception e) {
metrics.activeConsumers().decrementAndGet();
throw e;
}
}
public PolledRecords pollEnhanced(Duration dur) {
var stopwatch = Stopwatch.createStarted();
ConsumerRecords<Bytes, Bytes> polled = consumer.poll(dur);
ConsumerRecords<Bytes, Bytes> polled = poll(dur);
PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
metrics.meterPolledRecords(topic(), polledEnhanced, throttled);
metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
return polledEnhanced;
}
private String topic() {
var topics = consumer.assignment().stream().map(TopicPartition::topic).toList();
// we assume that consumer will always read single topic
Preconditions.checkArgument(topics.size() == 1);
return topics.get(0);
@Override
public void assign(Collection<TopicPartition> partitions) {
super.assign(partitions);
Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
Preconditions.checkState(assignedTopics.size() == 1);
this.pollingTopic = assignedTopics.iterator().next();
}
@Override
public void subscribe(Pattern pattern) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Collection<String> topics) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void close(Duration timeout) {
metrics.activeConsumers().decrementAndGet();
super.close(timeout);
}
}

View file

@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@ -259,14 +258,12 @@ public class ConsumerGroupService {
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
props.putAll(properties);
return EnhancedConsumer.create(
return new EnhancedConsumer(
props,
cluster.getPollingSettings().getPollingThrottler(),
ApplicationMetrics.forCluster(cluster)

View file

@ -335,14 +335,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
final Map<String, ? extends Serializable> map = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19 // to check multiple polls
);
Properties props = new Properties();
props.putAll(map);
props.putAll(properties);
return new EnhancedConsumer(new KafkaConsumer<>(props), PollingThrottler.noop(), ApplicationMetrics.noop());
return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
}
@Value