ConsumerGroupsController.java 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package com.provectus.kafka.ui.controller;
  2. import static java.util.stream.Collectors.toMap;
  3. import com.provectus.kafka.ui.api.ConsumerGroupsApi;
  4. import com.provectus.kafka.ui.exception.ValidationException;
  5. import com.provectus.kafka.ui.mapper.ConsumerGroupMapper;
  6. import com.provectus.kafka.ui.model.ConsumerGroupDTO;
  7. import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
  8. import com.provectus.kafka.ui.model.ConsumerGroupOffsetsResetDTO;
  9. import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
  10. import com.provectus.kafka.ui.model.ConsumerGroupsPageResponseDTO;
  11. import com.provectus.kafka.ui.model.PartitionOffsetDTO;
  12. import com.provectus.kafka.ui.service.ConsumerGroupService;
  13. import com.provectus.kafka.ui.service.OffsetsResetService;
  14. import java.util.Map;
  15. import java.util.Optional;
  16. import java.util.stream.Collectors;
  17. import lombok.RequiredArgsConstructor;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.springframework.beans.factory.annotation.Value;
  20. import org.springframework.http.ResponseEntity;
  21. import org.springframework.util.CollectionUtils;
  22. import org.springframework.web.bind.annotation.RestController;
  23. import org.springframework.web.server.ServerWebExchange;
  24. import reactor.core.publisher.Flux;
  25. import reactor.core.publisher.Mono;
  26. @RestController
  27. @RequiredArgsConstructor
  28. @Slf4j
  29. public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi {
  30. private final ConsumerGroupService consumerGroupService;
  31. private final OffsetsResetService offsetsResetService;
  32. @Value("${consumer.groups.page.size:25}")
  33. private int defaultConsumerGroupsPageSize;
  34. @Override
  35. public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName, String id,
  36. ServerWebExchange exchange) {
  37. return consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id)
  38. .thenReturn(ResponseEntity.ok().build());
  39. }
  40. @Override
  41. public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(
  42. String clusterName, String consumerGroupId, ServerWebExchange exchange) {
  43. return consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
  44. .map(ConsumerGroupMapper::toDetailsDto)
  45. .map(ResponseEntity::ok);
  46. }
  47. @Override
  48. public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getConsumerGroups(String clusterName,
  49. ServerWebExchange exchange) {
  50. return consumerGroupService.getAllConsumerGroups(getCluster(clusterName))
  51. .map(Flux::fromIterable)
  52. .map(f -> f.map(ConsumerGroupMapper::toDto))
  53. .map(ResponseEntity::ok)
  54. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
  55. }
  56. @Override
  57. public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(
  58. String clusterName, String topicName, ServerWebExchange exchange) {
  59. return consumerGroupService.getConsumerGroupsForTopic(getCluster(clusterName), topicName)
  60. .map(Flux::fromIterable)
  61. .map(f -> f.map(ConsumerGroupMapper::toDto))
  62. .map(ResponseEntity::ok)
  63. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
  64. }
  65. @Override
  66. public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage(
  67. String clusterName,
  68. Integer page,
  69. Integer perPage,
  70. String search,
  71. ConsumerGroupOrderingDTO orderBy,
  72. ServerWebExchange exchange) {
  73. return consumerGroupService.getConsumerGroupsPage(
  74. getCluster(clusterName),
  75. Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
  76. Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize),
  77. search,
  78. Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME)
  79. )
  80. .map(this::convertPage)
  81. .map(ResponseEntity::ok);
  82. }
  83. private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage
  84. consumerGroupConsumerGroupsPage) {
  85. return new ConsumerGroupsPageResponseDTO()
  86. .pageCount(consumerGroupConsumerGroupsPage.getTotalPages())
  87. .consumerGroups(consumerGroupConsumerGroupsPage.getConsumerGroups()
  88. .stream()
  89. .map(ConsumerGroupMapper::toDto)
  90. .collect(Collectors.toList()));
  91. }
  92. @Override
  93. public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName, String group,
  94. Mono<ConsumerGroupOffsetsResetDTO>
  95. consumerGroupOffsetsReset,
  96. ServerWebExchange exchange) {
  97. return consumerGroupOffsetsReset.flatMap(reset -> {
  98. var cluster = getCluster(clusterName);
  99. switch (reset.getResetType()) {
  100. case EARLIEST:
  101. return offsetsResetService
  102. .resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions());
  103. case LATEST:
  104. return offsetsResetService
  105. .resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions());
  106. case TIMESTAMP:
  107. if (reset.getResetToTimestamp() == null) {
  108. return Mono.error(
  109. new ValidationException(
  110. "resetToTimestamp is required when TIMESTAMP reset type used"
  111. )
  112. );
  113. }
  114. return offsetsResetService
  115. .resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(),
  116. reset.getResetToTimestamp());
  117. case OFFSET:
  118. if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) {
  119. return Mono.error(
  120. new ValidationException(
  121. "partitionsOffsets is required when OFFSET reset type used"
  122. )
  123. );
  124. }
  125. Map<Integer, Long> offsets = reset.getPartitionsOffsets().stream()
  126. .collect(toMap(PartitionOffsetDTO::getPartition, PartitionOffsetDTO::getOffset));
  127. return offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets);
  128. default:
  129. return Mono.error(
  130. new ValidationException("Unknown resetType " + reset.getResetType())
  131. );
  132. }
  133. }).thenReturn(ResponseEntity.ok().build());
  134. }
  135. }