Fix rebase issues

This commit is contained in:
gokhanimral 2024-02-25 17:18:29 +04:00
parent 6b7c2cfddc
commit 66dc115cab

View file

@ -15,7 +15,7 @@ import org.apache.kafka.common.TopicPartition;
@Slf4j
@Getter
public class OffsetsInfo {
class OffsetsInfo {
private final Consumer<?, ?> consumer;
@ -25,16 +25,15 @@ public class OffsetsInfo {
private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
private final Set<TopicPartition> emptyPartitions = new HashSet<>();
public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
OffsetsInfo(Consumer<?, ?> consumer, String topic) {
this(consumer,
consumer.partitionsFor(topic).stream()
.map(pi -> new TopicPartition(topic, pi.partition()))
.collect(Collectors.toList())
.toList()
);
}
public OffsetsInfo(Consumer<?, ?> consumer,
Collection<TopicPartition> targetPartitions) {
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
this.consumer = consumer;
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
this.endOffsets = consumer.endOffsets(targetPartitions);
@ -48,8 +47,8 @@ public class OffsetsInfo {
});
}
public boolean assignedPartitionsFullyPolled() {
for (var tp: consumer.assignment()) {
boolean assignedPartitionsFullyPolled() {
for (var tp : consumer.assignment()) {
Preconditions.checkArgument(endOffsets.containsKey(tp));
if (endOffsets.get(tp) > consumer.position(tp)) {
return false;
@ -58,13 +57,13 @@ public class OffsetsInfo {
return true;
}
public long summaryOffsetsRange() {
long summaryOffsetsRange() {
MutableLong cnt = new MutableLong();
nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
return cnt.getValue();
}
public Set<TopicPartition> allTargetPartitions() {
Set<TopicPartition> allTargetPartitions() {
return Sets.union(nonEmptyPartitions, emptyPartitions);
}