package com.provectus.kafka.ui.controller; import static com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction.DELETE; import static com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction.RESET_OFFSETS; import static com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction.VIEW; import static java.util.stream.Collectors.toMap; import com.provectus.kafka.ui.api.ConsumerGroupsApi; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ConsumerGroupMapper; import com.provectus.kafka.ui.model.ConsumerGroupDTO; import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; import com.provectus.kafka.ui.model.ConsumerGroupOffsetsResetDTO; import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO; import com.provectus.kafka.ui.model.ConsumerGroupsPageResponseDTO; import com.provectus.kafka.ui.model.PartitionOffsetDTO; import com.provectus.kafka.ui.model.SortOrderDTO; import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.permission.TopicAction; import com.provectus.kafka.ui.service.ConsumerGroupService; import com.provectus.kafka.ui.service.OffsetsResetService; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.ResponseEntity; import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController @RequiredArgsConstructor @Slf4j public class ConsumerGroupsController extends AbstractController implements ConsumerGroupsApi { private final ConsumerGroupService consumerGroupService; private final OffsetsResetService offsetsResetService; @Value("${consumer.groups.page.size:25}") private int defaultConsumerGroupsPageSize; @Override public Mono> deleteConsumerGroup(String clusterName, String id, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) .consumerGroup(id) .consumerGroupActions(DELETE) .operationName("deleteConsumerGroup") .build(); return validateAccess(context) .then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id)) .doOnEach(sig -> audit(context, sig)) .thenReturn(ResponseEntity.ok().build()); } @Override public Mono> getConsumerGroup(String clusterName, String consumerGroupId, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) .consumerGroup(consumerGroupId) .consumerGroupActions(VIEW) .operationName("getConsumerGroup") .build(); return validateAccess(context) .then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId) .map(ConsumerGroupMapper::toDetailsDto) .map(ResponseEntity::ok)) .doOnEach(sig -> audit(context, sig)); } @Override public Mono>> getTopicConsumerGroups(String clusterName, String topicName, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) .topic(topicName) .topicActions(TopicAction.VIEW) .operationName("getTopicConsumerGroups") .build(); Mono>> job = consumerGroupService.getConsumerGroupsForTopic(getCluster(clusterName), topicName) .flatMapMany(Flux::fromIterable) .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.getGroupId(), clusterName)) .map(ConsumerGroupMapper::toDto) .collectList() .map(Flux::fromIterable) .map(ResponseEntity::ok) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); return validateAccess(context) .then(job) .doOnEach(sig -> audit(context, sig)); } @Override public Mono> getConsumerGroupsPage( String clusterName, Integer page, Integer perPage, String search, ConsumerGroupOrderingDTO orderBy, SortOrderDTO sortOrderDto, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) // consumer group access validation is within the service .operationName("getConsumerGroupsPage") .build(); return validateAccess(context).then( consumerGroupService.getConsumerGroupsPage( getCluster(clusterName), Optional.ofNullable(page).filter(i -> i > 0).orElse(1), Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize), search, Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME), Optional.ofNullable(sortOrderDto).orElse(SortOrderDTO.ASC) ) .map(this::convertPage) .map(ResponseEntity::ok) ).doOnEach(sig -> audit(context, sig)); } @Override public Mono> resetConsumerGroupOffsets(String clusterName, String group, Mono resetDto, ServerWebExchange exchange) { return resetDto.flatMap(reset -> { var context = AccessContext.builder() .cluster(clusterName) .topic(reset.getTopic()) .topicActions(TopicAction.VIEW) .consumerGroupActions(RESET_OFFSETS) .operationName("resetConsumerGroupOffsets") .build(); Supplier> mono = () -> { var cluster = getCluster(clusterName); switch (reset.getResetType()) { case EARLIEST: return offsetsResetService .resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions()); case LATEST: return offsetsResetService .resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions()); case TIMESTAMP: if (reset.getResetToTimestamp() == null) { return Mono.error( new ValidationException( "resetToTimestamp is required when TIMESTAMP reset type used" ) ); } return offsetsResetService .resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(), reset.getResetToTimestamp()); case OFFSET: if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) { return Mono.error( new ValidationException( "partitionsOffsets is required when OFFSET reset type used" ) ); } Map offsets = reset.getPartitionsOffsets().stream() .collect(toMap(PartitionOffsetDTO::getPartition, PartitionOffsetDTO::getOffset)); return offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets); default: return Mono.error( new ValidationException("Unknown resetType " + reset.getResetType()) ); } }; return validateAccess(context) .then(mono.get()) .doOnEach(sig -> audit(context, sig)); }).thenReturn(ResponseEntity.ok().build()); } private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage consumerGroupConsumerGroupsPage) { return new ConsumerGroupsPageResponseDTO() .pageCount(consumerGroupConsumerGroupsPage.totalPages()) .consumerGroups(consumerGroupConsumerGroupsPage.consumerGroups() .stream() .map(ConsumerGroupMapper::toDto) .collect(Collectors.toList())); } }