|
@@ -6,12 +6,14 @@ import com.provectus.kafka.ui.deserialization.DeserializationService;
|
|
|
import com.provectus.kafka.ui.deserialization.RecordDeserializer;
|
|
|
import com.provectus.kafka.ui.model.ConsumerPosition;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
-import com.provectus.kafka.ui.model.SeekType;
|
|
|
+import com.provectus.kafka.ui.model.SeekDirection;
|
|
|
import com.provectus.kafka.ui.model.TopicMessage;
|
|
|
import com.provectus.kafka.ui.util.ClusterUtil;
|
|
|
+import com.provectus.kafka.ui.util.OffsetsSeek;
|
|
|
+import com.provectus.kafka.ui.util.OffsetsSeekBackward;
|
|
|
+import com.provectus.kafka.ui.util.OffsetsSeekForward;
|
|
|
import java.time.Duration;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -53,7 +55,10 @@ public class ConsumingService {
|
|
|
.orElse(DEFAULT_RECORD_LIMIT);
|
|
|
RecordEmitter emitter = new RecordEmitter(
|
|
|
() -> kafkaService.createConsumer(cluster),
|
|
|
- new OffsetsSeek(topic, consumerPosition));
|
|
|
+ consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD)
|
|
|
+ ? new OffsetsSeekForward(topic, consumerPosition)
|
|
|
+ : new OffsetsSeekBackward(topic, consumerPosition, recordsLimit)
|
|
|
+ );
|
|
|
RecordDeserializer recordDeserializer =
|
|
|
deserializationService.getRecordDeserializerForCluster(cluster);
|
|
|
return Flux.create(emitter)
|
|
@@ -79,7 +84,7 @@ public class ConsumingService {
|
|
|
* returns end offsets for partitions where start offset != end offsets.
|
|
|
* This is useful when we need to verify that partition is not empty.
|
|
|
*/
|
|
|
- private static Map<TopicPartition, Long> significantOffsets(Consumer<?, ?> consumer,
|
|
|
+ public static Map<TopicPartition, Long> significantOffsets(Consumer<?, ?> consumer,
|
|
|
String topicName,
|
|
|
Collection<Integer>
|
|
|
partitionsToInclude) {
|
|
@@ -159,98 +164,4 @@ public class ConsumingService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @RequiredArgsConstructor
|
|
|
- static class OffsetsSeek {
|
|
|
-
|
|
|
- private final String topic;
|
|
|
- private final ConsumerPosition consumerPosition;
|
|
|
-
|
|
|
- public WaitingOffsets assignAndSeek(Consumer<Bytes, Bytes> consumer) {
|
|
|
- SeekType seekType = consumerPosition.getSeekType();
|
|
|
- log.info("Positioning consumer for topic {} with {}", topic, consumerPosition);
|
|
|
- switch (seekType) {
|
|
|
- case OFFSET:
|
|
|
- assignAndSeekForOffset(consumer);
|
|
|
- break;
|
|
|
- case TIMESTAMP:
|
|
|
- assignAndSeekForTimestamp(consumer);
|
|
|
- break;
|
|
|
- case BEGINNING:
|
|
|
- assignAndSeekFromBeginning(consumer);
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new IllegalArgumentException("Unknown seekType: " + seekType);
|
|
|
- }
|
|
|
- log.info("Assignment: {}", consumer.assignment());
|
|
|
- return new WaitingOffsets(topic, consumer);
|
|
|
- }
|
|
|
-
|
|
|
- private List<TopicPartition> getRequestedPartitions(Consumer<Bytes, Bytes> consumer) {
|
|
|
- Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();
|
|
|
- return consumer.partitionsFor(topic).stream()
|
|
|
- .filter(
|
|
|
- p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition()))
|
|
|
- .map(p -> new TopicPartition(p.topic(), p.partition()))
|
|
|
- .collect(Collectors.toList());
|
|
|
- }
|
|
|
-
|
|
|
- private void assignAndSeekForOffset(Consumer<Bytes, Bytes> consumer) {
|
|
|
- List<TopicPartition> partitions = getRequestedPartitions(consumer);
|
|
|
- consumer.assign(partitions);
|
|
|
- consumerPosition.getSeekTo().forEach((partition, offset) -> {
|
|
|
- TopicPartition topicPartition = new TopicPartition(topic, partition);
|
|
|
- consumer.seek(topicPartition, offset);
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- private void assignAndSeekForTimestamp(Consumer<Bytes, Bytes> consumer) {
|
|
|
- Map<TopicPartition, Long> timestampsToSearch =
|
|
|
- consumerPosition.getSeekTo().entrySet().stream()
|
|
|
- .collect(Collectors.toMap(
|
|
|
- partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
|
|
|
- Map.Entry::getValue
|
|
|
- ));
|
|
|
- Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch)
|
|
|
- .entrySet().stream()
|
|
|
- .filter(e -> e.getValue() != null)
|
|
|
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
|
|
|
-
|
|
|
- if (offsetsForTimestamps.isEmpty()) {
|
|
|
- throw new IllegalArgumentException("No offsets were found for requested timestamps");
|
|
|
- }
|
|
|
-
|
|
|
- consumer.assign(offsetsForTimestamps.keySet());
|
|
|
- offsetsForTimestamps.forEach(consumer::seek);
|
|
|
- }
|
|
|
-
|
|
|
- private void assignAndSeekFromBeginning(Consumer<Bytes, Bytes> consumer) {
|
|
|
- List<TopicPartition> partitions = getRequestedPartitions(consumer);
|
|
|
- consumer.assign(partitions);
|
|
|
- consumer.seekToBeginning(partitions);
|
|
|
- }
|
|
|
-
|
|
|
- static class WaitingOffsets {
|
|
|
- final Map<Integer, Long> offsets = new HashMap<>(); // partition number -> offset
|
|
|
-
|
|
|
- WaitingOffsets(String topic, Consumer<?, ?> consumer) {
|
|
|
- var partitions = consumer.assignment().stream()
|
|
|
- .map(TopicPartition::partition)
|
|
|
- .collect(Collectors.toList());
|
|
|
- significantOffsets(consumer, topic, partitions)
|
|
|
- .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1));
|
|
|
- }
|
|
|
-
|
|
|
- void markPolled(ConsumerRecord<?, ?> rec) {
|
|
|
- Long waiting = offsets.get(rec.partition());
|
|
|
- if (waiting != null && waiting <= rec.offset()) {
|
|
|
- offsets.remove(rec.partition());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- boolean endReached() {
|
|
|
- return offsets.isEmpty();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
}
|