OffsetsResetService.java 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package com.provectus.kafka.ui.service;
  2. import static java.util.stream.Collectors.toMap;
  3. import static java.util.stream.Collectors.toSet;
  4. import static org.apache.kafka.common.ConsumerGroupState.DEAD;
  5. import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
  6. import com.google.common.base.Preconditions;
  7. import com.provectus.kafka.ui.exception.NotFoundException;
  8. import com.provectus.kafka.ui.exception.ValidationException;
  9. import com.provectus.kafka.ui.model.KafkaCluster;
  10. import java.util.Collection;
  11. import java.util.HashMap;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.Set;
  15. import javax.annotation.Nullable;
  16. import lombok.RequiredArgsConstructor;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.apache.kafka.clients.admin.OffsetSpec;
  19. import org.apache.kafka.common.TopicPartition;
  20. import org.springframework.stereotype.Component;
  21. import reactor.core.publisher.Mono;
  22. /**
  23. * Implementation follows https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
  24. * to works like "kafka-consumer-groups --reset-offsets" console command
  25. * (see kafka.admin.ConsumerGroupCommand)
  26. */
  27. @Slf4j
  28. @Component
  29. @RequiredArgsConstructor
  30. public class OffsetsResetService {
  31. private final AdminClientService adminClientService;
  32. public Mono<Void> resetToEarliest(
  33. KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
  34. return checkGroupCondition(cluster, group)
  35. .flatMap(ac ->
  36. offsets(ac, topic, partitions, OffsetSpec.earliest())
  37. .flatMap(offsets -> resetOffsets(ac, group, offsets)));
  38. }
  39. private Mono<Map<TopicPartition, Long>> offsets(ReactiveAdminClient client,
  40. String topic,
  41. @Nullable Collection<Integer> partitions,
  42. OffsetSpec spec) {
  43. if (partitions == null) {
  44. return client.listTopicOffsets(topic, spec, true);
  45. }
  46. return client.listOffsets(
  47. partitions.stream().map(idx -> new TopicPartition(topic, idx)).collect(toSet()),
  48. spec,
  49. true
  50. );
  51. }
  52. public Mono<Void> resetToLatest(
  53. KafkaCluster cluster, String group, String topic, Collection<Integer> partitions) {
  54. return checkGroupCondition(cluster, group)
  55. .flatMap(ac ->
  56. offsets(ac, topic, partitions, OffsetSpec.latest())
  57. .flatMap(offsets -> resetOffsets(ac, group, offsets)));
  58. }
  59. public Mono<Void> resetToTimestamp(
  60. KafkaCluster cluster, String group, String topic, Collection<Integer> partitions,
  61. long targetTimestamp) {
  62. return checkGroupCondition(cluster, group)
  63. .flatMap(ac ->
  64. offsets(ac, topic, partitions, OffsetSpec.forTimestamp(targetTimestamp))
  65. .flatMap(
  66. foundOffsets -> offsets(ac, topic, partitions, OffsetSpec.latest())
  67. .map(endOffsets -> editTsOffsets(foundOffsets, endOffsets))
  68. )
  69. .flatMap(offsets -> resetOffsets(ac, group, offsets))
  70. );
  71. }
  72. public Mono<Void> resetToOffsets(
  73. KafkaCluster cluster, String group, String topic, Map<Integer, Long> targetOffsets) {
  74. Preconditions.checkNotNull(targetOffsets);
  75. var partitionOffsets = targetOffsets.entrySet().stream()
  76. .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue));
  77. return checkGroupCondition(cluster, group).flatMap(
  78. ac ->
  79. ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.earliest(), true)
  80. .flatMap(earliest ->
  81. ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.latest(), true)
  82. .map(latest -> editOffsetsBounds(partitionOffsets, earliest, latest))
  83. .flatMap(offsetsToCommit -> resetOffsets(ac, group, offsetsToCommit)))
  84. );
  85. }
  86. private Mono<ReactiveAdminClient> checkGroupCondition(KafkaCluster cluster, String groupId) {
  87. return adminClientService.get(cluster)
  88. .flatMap(ac ->
  89. // we need to call listConsumerGroups() to check group existence, because
  90. // describeConsumerGroups() will return consumer group even if it doesn't exist
  91. ac.listConsumerGroupNames()
  92. .filter(cgs -> cgs.stream().anyMatch(g -> g.equals(groupId)))
  93. .flatMap(cgs -> ac.describeConsumerGroups(List.of(groupId)))
  94. .filter(cgs -> cgs.containsKey(groupId))
  95. .map(cgs -> cgs.get(groupId))
  96. .flatMap(cg -> {
  97. if (!Set.of(DEAD, EMPTY).contains(cg.state())) {
  98. return Mono.error(
  99. new ValidationException(
  100. String.format(
  101. "Group's offsets can be reset only if group is inactive,"
  102. + " but group is in %s state",
  103. cg.state()
  104. )
  105. )
  106. );
  107. }
  108. return Mono.just(ac);
  109. })
  110. .switchIfEmpty(Mono.error(new NotFoundException("Consumer group not found")))
  111. );
  112. }
  113. private Map<TopicPartition, Long> editTsOffsets(Map<TopicPartition, Long> foundTsOffsets,
  114. Map<TopicPartition, Long> endOffsets) {
  115. // for partitions where we didnt find offset by timestamp, we use end offsets
  116. Map<TopicPartition, Long> result = new HashMap<>(endOffsets);
  117. result.putAll(foundTsOffsets);
  118. return result;
  119. }
  120. /**
  121. * Checks if submitted offsets is between earliest and latest offsets. If case of range change
  122. * fail we reset offset to either earliest or latest offsets (To follow logic from
  123. * kafka.admin.ConsumerGroupCommand.scala)
  124. */
  125. private Map<TopicPartition, Long> editOffsetsBounds(Map<TopicPartition, Long> offsetsToCheck,
  126. Map<TopicPartition, Long> earliestOffsets,
  127. Map<TopicPartition, Long> latestOffsets) {
  128. var result = new HashMap<TopicPartition, Long>();
  129. offsetsToCheck.forEach((tp, offset) -> {
  130. if (earliestOffsets.get(tp) > offset) {
  131. log.warn("Offset for partition {} is lower than earliest offset, resetting to earliest",
  132. tp);
  133. result.put(tp, earliestOffsets.get(tp));
  134. } else if (latestOffsets.get(tp) < offset) {
  135. log.warn("Offset for partition {} is greater than latest offset, resetting to latest", tp);
  136. result.put(tp, latestOffsets.get(tp));
  137. } else {
  138. result.put(tp, offset);
  139. }
  140. });
  141. return result;
  142. }
  143. private Mono<Void> resetOffsets(ReactiveAdminClient adminClient,
  144. String groupId,
  145. Map<TopicPartition, Long> offsets) {
  146. return adminClient.alterConsumerGroupOffsets(groupId, offsets);
  147. }
  148. }