123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package com.provectus.kafka.ui.model;
- import static java.util.stream.Collectors.toMap;
- import com.provectus.kafka.ui.exception.ValidationException;
- import java.util.List;
- import java.util.Map;
- import java.util.Optional;
- import java.util.stream.Collectors;
- import java.util.stream.Stream;
- import javax.annotation.Nullable;
- import org.apache.commons.lang3.tuple.Pair;
- import org.apache.kafka.common.TopicPartition;
- import org.springframework.util.StringUtils;
- public record ConsumerPosition(PollingModeDTO pollingMode,
- String topic,
- List<TopicPartition> partitions, //all partitions if list is empty
- @Nullable Long timestamp,
- @Nullable Offsets offsets) {
- // one of properties will be null
- public record Offsets(@Nullable Long offset, //should be applied to all partitions
- @Nullable Map<TopicPartition, Long> tpOffsets) {
- }
- public static ConsumerPosition create(PollingModeDTO pollingMode,
- String topic,
- @Nullable List<Integer> partitions,
- @Nullable Long timestamp,
- @Nullable String offsetsStr) {
- @Nullable var offsets = parseAndValidateOffsets(pollingMode, topic, offsetsStr);
- var topicPartitions = Optional.ofNullable(partitions).orElse(List.of())
- .stream()
- .map(p -> new TopicPartition(topic, p))
- .collect(Collectors.toList());
- // if offsets are specified - inferring partitions list from there
- topicPartitions = (offsets != null && offsets.tpOffsets() != null)
- ? List.copyOf(offsets.tpOffsets().keySet())
- : topicPartitions;
- return new ConsumerPosition(
- pollingMode,
- topic,
- topicPartitions,
- validateTimestamp(pollingMode, timestamp),
- offsets
- );
- }
- private static Long validateTimestamp(PollingModeDTO pollingMode, @Nullable Long ts) {
- if (pollingMode == PollingModeDTO.FROM_TIMESTAMP || pollingMode == PollingModeDTO.TO_TIMESTAMP) {
- if (ts == null) {
- throw new ValidationException("timestamp not provided for " + pollingMode);
- }
- }
- return ts;
- }
- private static Offsets parseAndValidateOffsets(PollingModeDTO pollingMode,
- String topic,
- @Nullable String offsetsStr) {
- Offsets offsets = null;
- if (pollingMode == PollingModeDTO.FROM_OFFSET || pollingMode == PollingModeDTO.TO_OFFSET) {
- if (!StringUtils.hasText(offsetsStr)) {
- throw new ValidationException("offsets not provided for " + pollingMode);
- }
- if (!offsetsStr.contains(":")) {
- offsets = new Offsets(Long.parseLong(offsetsStr), null);
- } else {
- Map<TopicPartition, Long> tpOffsets = Stream.of(offsetsStr.split(","))
- .map(p -> {
- String[] split = p.split(":");
- if (split.length != 2) {
- throw new IllegalArgumentException(
- "Wrong seekTo argument format. See API docs for details");
- }
- return Pair.of(
- new TopicPartition(topic, Integer.parseInt(split[0])),
- Long.parseLong(split[1])
- );
- })
- .collect(toMap(Pair::getKey, Pair::getValue));
- offsets = new Offsets(null, tpOffsets);
- }
- }
- return offsets;
- }
- }
|