This commit is contained in:
iliax 2023-03-22 23:29:48 +04:00 committed by gokhanimral
parent e43e62bcb7
commit 1037e41889

View file

@ -20,7 +20,8 @@ public record ConsumerPosition(PollingModeDTO pollingMode,
@Nullable Long timestamp,
@Nullable Offsets offsets) {
public record Offsets(@Nullable Long offset,
// one of properties will be null
public record Offsets(@Nullable Long offset, //should be applied to all partitions
@Nullable Map<TopicPartition, Long> tpOffsets) {
}
@ -29,20 +30,22 @@ public record ConsumerPosition(PollingModeDTO pollingMode,
@Nullable List<Integer> partitions,
@Nullable Long timestamp,
@Nullable String offsetsStr) {
Offsets offsets = parseAndValidateOffsets(pollingMode, topic, offsetsStr);
@Nullable var offsets = parseAndValidateOffsets(pollingMode, topic, offsetsStr);
var topicPartitions = Optional.ofNullable(partitions).orElse(List.of())
.stream()
.map(p -> new TopicPartition(topic, p))
.collect(Collectors.toList());
// if offsets are specified -inferring partitions list from there
topicPartitions = offsets.tpOffsets == null ? topicPartitions : List.copyOf(offsets.tpOffsets.keySet());
// if offsets are specified - inferring partitions list from there
topicPartitions = (offsets != null && offsets.tpOffsets() != null)
? List.copyOf(offsets.tpOffsets().keySet())
: topicPartitions;
return new ConsumerPosition(
pollingMode,
topic,
Optional.ofNullable(topicPartitions).orElse(List.of()),
topicPartitions,
validateTimestamp(pollingMode, timestamp),
offsets
);
@ -65,7 +68,7 @@ public record ConsumerPosition(PollingModeDTO pollingMode,
if (!StringUtils.hasText(offsetsStr)) {
throw new ValidationException("offsets not provided for " + pollingMode);
}
if (offsetsStr.contains(":")) {
if (!offsetsStr.contains(":")) {
offsets = new Offsets(Long.parseLong(offsetsStr), null);
} else {
Map<TopicPartition, Long> tpOffsets = Stream.of(offsetsStr.split(","))