diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index b5b72041e7..0b10510262 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -39,7 +39,7 @@ public class MessagesController extends AbstractController implements MessagesAp getCluster(clusterName), topicName, Optional.ofNullable(partitions).orElse(List.of()) - ).map(ResponseEntity::ok); + ).thenReturn(ResponseEntity.ok().build()); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index abc1f6cb57..e09e2865ad 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -15,7 +15,6 @@ import com.provectus.kafka.ui.serde.RecordSerDe; import com.provectus.kafka.ui.util.FilterTopicMessageEvents; import com.provectus.kafka.ui.util.OffsetsSeekBackward; import com.provectus.kafka.ui.util.OffsetsSeekForward; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -25,8 +24,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -36,7 +34,6 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; @@ -68,15 +65,18 @@ public class MessagesService { } private Mono> offsetsForDeletion(KafkaCluster cluster, String topicName, - List partitionsToInclude) { - return Mono.fromSupplier(() -> { - try (KafkaConsumer consumer = consumerGroupService.createConsumer(cluster)) { - return significantOffsets(consumer, topicName, partitionsToInclude); - } catch (Exception e) { - log.error("Error occurred while consuming records", e); - throw new RuntimeException(e); - } - }); + List partitionsToInclude) { + return adminClientService.get(cluster).flatMap(ac -> + ac.listOffsets(topicName, OffsetSpec.earliest()) + .zipWith(ac.listOffsets(topicName, OffsetSpec.latest()), + (start, end) -> + end.entrySet().stream() + .filter(e -> partitionsToInclude.isEmpty() + || partitionsToInclude.contains(e.getKey().partition())) + // we only need non-empty partitions (where start offset != end offset) + .filter(entry -> !entry.getValue().equals(start.get(entry.getKey()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + ); } public Mono sendMessage(KafkaCluster cluster, String topic, @@ -162,25 +162,6 @@ public class MessagesService { .share(); } - /** - * returns end offsets for partitions where start offset != end offsets. - * This is useful when we need to verify that partition is not empty. - */ - public static Map significantOffsets(Consumer consumer, - String topicName, - Collection - partitionsToInclude) { - var partitions = consumer.partitionsFor(topicName).stream() - .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition())) - .map(p -> new TopicPartition(topicName, p.partition())) - .collect(Collectors.toList()); - var beginningOffsets = consumer.beginningOffsets(partitions); - var endOffsets = consumer.endOffsets(partitions); - return endOffsets.entrySet().stream() - .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - private boolean filterTopicMessage(TopicMessageEventDTO message, String query) { if (StringUtils.isEmpty(query) || !message.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) {