This commit is contained in:
iliax 2023-08-07 11:42:46 +04:00
parent 7d1d2fbc47
commit 81999d9369
4 changed files with 34 additions and 37 deletions

View file

@ -8,12 +8,13 @@ import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@Slf4j
@Getter
public class OffsetsInfo {
class OffsetsInfo {
private final Consumer<?, ?> consumer;
@ -23,7 +24,7 @@ public class OffsetsInfo {
private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
private final Set<TopicPartition> emptyPartitions = new HashSet<>();
public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
OffsetsInfo(Consumer<?, ?> consumer, String topic) {
this(consumer,
consumer.partitionsFor(topic).stream()
.map(pi -> new TopicPartition(topic, pi.partition()))
@ -31,8 +32,7 @@ public class OffsetsInfo {
);
}
public OffsetsInfo(Consumer<?, ?> consumer,
Collection<TopicPartition> targetPartitions) {
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
this.consumer = consumer;
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
this.endOffsets = consumer.endOffsets(targetPartitions);
@ -46,8 +46,8 @@ public class OffsetsInfo {
});
}
public boolean assignedPartitionsFullyPolled() {
for (var tp: consumer.assignment()) {
boolean assignedPartitionsFullyPolled() {
for (var tp : consumer.assignment()) {
Preconditions.checkArgument(endOffsets.containsKey(tp));
if (endOffsets.get(tp) > consumer.position(tp)) {
return false;
@ -56,4 +56,10 @@ public class OffsetsInfo {
return true;
}
long summaryOffsetsRange() {
MutableLong cnt = new MutableLong();
nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
return cnt.getValue();
}
}

View file

@ -52,7 +52,12 @@ public class SeekOperations {
return offsetsInfo.assignedPartitionsFullyPolled();
}
//TODO: document
// sum of (end - start) offsets for all partitions
public long summaryOffsetsRange() {
return offsetsInfo.summaryOffsetsRange();
}
// sum of differences between initial consumer seek and current consumer position (across all partitions)
public long offsetsProcessedFromSeek() {
MutableLong count = new MutableLong();
offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));

View file

@ -92,14 +92,12 @@ class AnalysisTasksStore {
.result(completedState);
}
@Value
@Builder(toBuilder = true)
private static class RunningAnalysis {
Instant startedAt;
double completenessPercent;
long msgsScanned;
long bytesScanned;
Closeable task;
private record RunningAnalysis(Instant startedAt,
double completenessPercent,
long msgsScanned,
long bytesScanned,
Closeable task) {
TopicAnalysisProgressDTO toDto() {
return new TopicAnalysisProgressDTO()

View file

@ -47,26 +47,16 @@ public class TopicAnalysisService {
public Mono<Void> analyze(KafkaCluster cluster, String topicName) {
return topicsService.getTopicDetails(cluster, topicName)
.doOnNext(topic ->
startAnalysis(
cluster,
topicName,
topic.getPartitions().values()
.stream()
.mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
.sum()
)
).then();
.doOnNext(topic -> startAnalysis(cluster, topicName))
.then();
}
private synchronized void startAnalysis(KafkaCluster cluster,
String topic,
long approxNumberOfMsgs) {
private synchronized void startAnalysis(KafkaCluster cluster, String topic) {
var topicId = new TopicIdentity(cluster, topic);
if (analysisTasksStore.isAnalysisInProgress(topicId)) {
throw new TopicAnalysisException("Topic is already analyzing");
}
var task = new AnalysisTask(cluster, topicId, approxNumberOfMsgs);
var task = new AnalysisTask(cluster, topicId);
analysisTasksStore.registerNewTask(topicId, task);
SCHEDULER.schedule(task);
}
@ -84,16 +74,14 @@ public class TopicAnalysisService {
private final Instant startedAt = Instant.now();
private final TopicIdentity topicId;
private final long approxNumberOfMsgs;
private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
private final EnhancedConsumer consumer;
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, long approxNumberOfMsgs) {
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId) {
this.topicId = topicId;
this.approxNumberOfMsgs = approxNumberOfMsgs;
this.consumer = consumerGroupService.createConsumer(
cluster,
// to improve polling throughput
@ -114,9 +102,10 @@ public class TopicAnalysisService {
try {
log.info("Starting {} topic analysis", topicId);
consumer.partitionsFor(topicId.topicName)
.forEach(i -> partitionStats.put(i.partition(), new TopicAnalysisStats()));
.forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
seekOperations.assignAndSeekNonEmptyPartitions();
while (!seekOperations.assignedPartitionsFullyPolled()) {
@ -125,7 +114,7 @@ public class TopicAnalysisService {
totalStats.apply(r);
partitionStats.get(r.partition()).apply(r);
});
updateProgress(seekOperations);
updateProgress(seekOperations.offsetsProcessedFromSeek(), summaryOffsetsRange);
}
analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
log.info("{} topic analysis finished", topicId);
@ -141,14 +130,13 @@ public class TopicAnalysisService {
}
}
private void updateProgress(SeekOperations seekOperations) {
long processedOffsets = seekOperations.offsetsProcessedFromSeek();
if (processedOffsets > 0 && approxNumberOfMsgs != 0) {
private void updateProgress(long processedOffsets, long summaryOffsetsRange) {
if (processedOffsets > 0 && summaryOffsetsRange != 0) {
analysisTasksStore.updateProgress(
topicId,
totalStats.totalMsgs,
totalStats.keysSize.sum + totalStats.valuesSize.sum,
Math.min(100.0, (((double) processedOffsets) / approxNumberOfMsgs) * 100)
Math.min(100.0, (((double) processedOffsets) / summaryOffsetsRange) * 100)
);
}
}