ConsumerPosition.java 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package com.provectus.kafka.ui.model;
  2. import static java.util.stream.Collectors.toMap;
  3. import com.provectus.kafka.ui.exception.ValidationException;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.Optional;
  7. import java.util.stream.Collectors;
  8. import java.util.stream.Stream;
  9. import javax.annotation.Nullable;
  10. import org.apache.commons.lang3.tuple.Pair;
  11. import org.apache.kafka.common.TopicPartition;
  12. import org.springframework.util.StringUtils;
  13. public record ConsumerPosition(PollingModeDTO pollingMode,
  14. String topic,
  15. List<TopicPartition> partitions, //all partitions if list is empty
  16. @Nullable Long timestamp,
  17. @Nullable Offsets offsets) {
  18. // one of properties will be null
  19. public record Offsets(@Nullable Long offset, //should be applied to all partitions
  20. @Nullable Map<TopicPartition, Long> tpOffsets) {
  21. }
  22. public static ConsumerPosition create(PollingModeDTO pollingMode,
  23. String topic,
  24. @Nullable List<Integer> partitions,
  25. @Nullable Long timestamp,
  26. @Nullable String offsetsStr) {
  27. @Nullable var offsets = parseAndValidateOffsets(pollingMode, topic, offsetsStr);
  28. var topicPartitions = Optional.ofNullable(partitions).orElse(List.of())
  29. .stream()
  30. .map(p -> new TopicPartition(topic, p))
  31. .collect(Collectors.toList());
  32. // if offsets are specified - inferring partitions list from there
  33. topicPartitions = (offsets != null && offsets.tpOffsets() != null)
  34. ? List.copyOf(offsets.tpOffsets().keySet())
  35. : topicPartitions;
  36. return new ConsumerPosition(
  37. pollingMode,
  38. topic,
  39. topicPartitions,
  40. validateTimestamp(pollingMode, timestamp),
  41. offsets
  42. );
  43. }
  44. private static Long validateTimestamp(PollingModeDTO pollingMode, @Nullable Long ts) {
  45. if (pollingMode == PollingModeDTO.FROM_TIMESTAMP || pollingMode == PollingModeDTO.TO_TIMESTAMP) {
  46. if (ts == null) {
  47. throw new ValidationException("timestamp not provided for " + pollingMode);
  48. }
  49. }
  50. return ts;
  51. }
  52. private static Offsets parseAndValidateOffsets(PollingModeDTO pollingMode,
  53. String topic,
  54. @Nullable String offsetsStr) {
  55. Offsets offsets = null;
  56. if (pollingMode == PollingModeDTO.FROM_OFFSET || pollingMode == PollingModeDTO.TO_OFFSET) {
  57. if (!StringUtils.hasText(offsetsStr)) {
  58. throw new ValidationException("offsets not provided for " + pollingMode);
  59. }
  60. if (!offsetsStr.contains(":")) {
  61. offsets = new Offsets(Long.parseLong(offsetsStr), null);
  62. } else {
  63. Map<TopicPartition, Long> tpOffsets = Stream.of(offsetsStr.split(","))
  64. .map(p -> {
  65. String[] split = p.split(":");
  66. if (split.length != 2) {
  67. throw new IllegalArgumentException(
  68. "Wrong seekTo argument format. See API docs for details");
  69. }
  70. return Pair.of(
  71. new TopicPartition(topic, Integer.parseInt(split[0])),
  72. Long.parseLong(split[1])
  73. );
  74. })
  75. .collect(toMap(Pair::getKey, Pair::getValue));
  76. offsets = new Offsets(null, tpOffsets);
  77. }
  78. }
  79. return offsets;
  80. }
  81. }