using AdminClient for offsets retrieval instead of Consumer (#1123)

Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2021-11-24 14:44:26 +03:00 committed by GitHub
parent 5e1e87140a
commit 0cc39fcd7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 33 deletions

View file

@ -39,7 +39,7 @@ public class MessagesController extends AbstractController implements MessagesAp
getCluster(clusterName), getCluster(clusterName),
topicName, topicName,
Optional.ofNullable(partitions).orElse(List.of()) Optional.ofNullable(partitions).orElse(List.of())
).map(ResponseEntity::ok); ).thenReturn(ResponseEntity.ok().build());
} }
@Override @Override

View file

@ -15,7 +15,6 @@ import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.util.FilterTopicMessageEvents; import com.provectus.kafka.ui.util.FilterTopicMessageEvents;
import com.provectus.kafka.ui.util.OffsetsSeekBackward; import com.provectus.kafka.ui.util.OffsetsSeekBackward;
import com.provectus.kafka.ui.util.OffsetsSeekForward; import com.provectus.kafka.ui.util.OffsetsSeekForward;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -25,8 +24,7 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@ -36,7 +34,6 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -69,14 +66,17 @@ public class MessagesService {
private Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName, private Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName,
List<Integer> partitionsToInclude) { List<Integer> partitionsToInclude) {
return Mono.fromSupplier(() -> { return adminClientService.get(cluster).flatMap(ac ->
try (KafkaConsumer<Bytes, Bytes> consumer = consumerGroupService.createConsumer(cluster)) { ac.listOffsets(topicName, OffsetSpec.earliest())
return significantOffsets(consumer, topicName, partitionsToInclude); .zipWith(ac.listOffsets(topicName, OffsetSpec.latest()),
} catch (Exception e) { (start, end) ->
log.error("Error occurred while consuming records", e); end.entrySet().stream()
throw new RuntimeException(e); .filter(e -> partitionsToInclude.isEmpty()
} || partitionsToInclude.contains(e.getKey().partition()))
}); // we only need non-empty partitions (where start offset != end offset)
.filter(entry -> !entry.getValue().equals(start.get(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
);
} }
public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic, public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
@ -162,25 +162,6 @@ public class MessagesService {
.share(); .share();
} }
/**
* returns end offsets for partitions where start offset != end offsets.
* This is useful when we need to verify that partition is not empty.
*/
public static Map<TopicPartition, Long> significantOffsets(Consumer<?, ?> consumer,
String topicName,
Collection<Integer>
partitionsToInclude) {
var partitions = consumer.partitionsFor(topicName).stream()
.filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
.map(p -> new TopicPartition(topicName, p.partition()))
.collect(Collectors.toList());
var beginningOffsets = consumer.beginningOffsets(partitions);
var endOffsets = consumer.endOffsets(partitions);
return endOffsets.entrySet().stream()
.filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private boolean filterTopicMessage(TopicMessageEventDTO message, String query) { private boolean filterTopicMessage(TopicMessageEventDTO message, String query) {
if (StringUtils.isEmpty(query) if (StringUtils.isEmpty(query)
|| !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) { || !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) {