ConsumerGroupsController.java 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package com.provectus.kafka.ui.controller;
  2. import static java.util.stream.Collectors.toMap;
  3. import com.provectus.kafka.ui.api.ConsumerGroupsApi;
  4. import com.provectus.kafka.ui.exception.ValidationException;
  5. import com.provectus.kafka.ui.mapper.ConsumerGroupMapper;
  6. import com.provectus.kafka.ui.model.ConsumerGroupDTO;
  7. import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
  8. import com.provectus.kafka.ui.model.ConsumerGroupOffsetsResetDTO;
  9. import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
  10. import com.provectus.kafka.ui.model.ConsumerGroupsPageResponseDTO;
  11. import com.provectus.kafka.ui.model.PartitionOffsetDTO;
  12. import com.provectus.kafka.ui.model.SortOrderDTO;
  13. import com.provectus.kafka.ui.service.ConsumerGroupService;
  14. import com.provectus.kafka.ui.service.OffsetsResetService;
  15. import java.util.Map;
  16. import java.util.Optional;
  17. import java.util.stream.Collectors;
  18. import lombok.RequiredArgsConstructor;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.springframework.beans.factory.annotation.Value;
  21. import org.springframework.http.ResponseEntity;
  22. import org.springframework.util.CollectionUtils;
  23. import org.springframework.web.bind.annotation.RestController;
  24. import org.springframework.web.server.ServerWebExchange;
  25. import reactor.core.publisher.Flux;
  26. import reactor.core.publisher.Mono;
  27. @RestController
  28. @RequiredArgsConstructor
  29. @Slf4j
  30. public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi {
  31. private final ConsumerGroupService consumerGroupService;
  32. private final OffsetsResetService offsetsResetService;
  33. @Value("${consumer.groups.page.size:25}")
  34. private int defaultConsumerGroupsPageSize;
  35. @Override
  36. public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName, String id,
  37. ServerWebExchange exchange) {
  38. return consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id)
  39. .thenReturn(ResponseEntity.ok().build());
  40. }
  41. @Override
  42. public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(
  43. String clusterName, String consumerGroupId, ServerWebExchange exchange) {
  44. return consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
  45. .map(ConsumerGroupMapper::toDetailsDto)
  46. .map(ResponseEntity::ok);
  47. }
  48. @Override
  49. public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getConsumerGroups(String clusterName,
  50. ServerWebExchange exchange) {
  51. return consumerGroupService.getAllConsumerGroups(getCluster(clusterName))
  52. .map(Flux::fromIterable)
  53. .map(f -> f.map(ConsumerGroupMapper::toDto))
  54. .map(ResponseEntity::ok)
  55. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
  56. }
  57. @Override
  58. public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(
  59. String clusterName, String topicName, ServerWebExchange exchange) {
  60. return consumerGroupService.getConsumerGroupsForTopic(getCluster(clusterName), topicName)
  61. .map(Flux::fromIterable)
  62. .map(f -> f.map(ConsumerGroupMapper::toDto))
  63. .map(ResponseEntity::ok)
  64. .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
  65. }
  66. @Override
  67. public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage(
  68. String clusterName,
  69. Integer page,
  70. Integer perPage,
  71. String search,
  72. ConsumerGroupOrderingDTO orderBy,
  73. SortOrderDTO sortOrderDto,
  74. ServerWebExchange exchange) {
  75. return consumerGroupService.getConsumerGroupsPage(
  76. getCluster(clusterName),
  77. Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
  78. Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize),
  79. search,
  80. Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME),
  81. Optional.ofNullable(sortOrderDto).orElse(SortOrderDTO.ASC)
  82. )
  83. .map(this::convertPage)
  84. .map(ResponseEntity::ok);
  85. }
  86. private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage
  87. consumerGroupConsumerGroupsPage) {
  88. return new ConsumerGroupsPageResponseDTO()
  89. .pageCount(consumerGroupConsumerGroupsPage.getTotalPages())
  90. .consumerGroups(consumerGroupConsumerGroupsPage.getConsumerGroups()
  91. .stream()
  92. .map(ConsumerGroupMapper::toDto)
  93. .collect(Collectors.toList()));
  94. }
  95. @Override
  96. public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName, String group,
  97. Mono<ConsumerGroupOffsetsResetDTO>
  98. consumerGroupOffsetsReset,
  99. ServerWebExchange exchange) {
  100. return consumerGroupOffsetsReset.flatMap(reset -> {
  101. var cluster = getCluster(clusterName);
  102. switch (reset.getResetType()) {
  103. case EARLIEST:
  104. return offsetsResetService
  105. .resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions());
  106. case LATEST:
  107. return offsetsResetService
  108. .resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions());
  109. case TIMESTAMP:
  110. if (reset.getResetToTimestamp() == null) {
  111. return Mono.error(
  112. new ValidationException(
  113. "resetToTimestamp is required when TIMESTAMP reset type used"
  114. )
  115. );
  116. }
  117. return offsetsResetService
  118. .resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(),
  119. reset.getResetToTimestamp());
  120. case OFFSET:
  121. if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) {
  122. return Mono.error(
  123. new ValidationException(
  124. "partitionsOffsets is required when OFFSET reset type used"
  125. )
  126. );
  127. }
  128. Map<Integer, Long> offsets = reset.getPartitionsOffsets().stream()
  129. .collect(toMap(PartitionOffsetDTO::getPartition, PartitionOffsetDTO::getOffset));
  130. return offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets);
  131. default:
  132. return Mono.error(
  133. new ValidationException("Unknown resetType " + reset.getResetType())
  134. );
  135. }
  136. }).thenReturn(ResponseEntity.ok().build());
  137. }
  138. }