ClientQuotasController.java 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package com.provectus.kafka.ui.controller;
  2. import static java.util.stream.Collectors.toMap;
  3. import com.provectus.kafka.ui.api.ClientQuotasApi;
  4. import com.provectus.kafka.ui.model.ClientQuotasDTO;
  5. import com.provectus.kafka.ui.service.audit.AuditService;
  6. import com.provectus.kafka.ui.service.quota.ClientQuotaRecord;
  7. import com.provectus.kafka.ui.service.quota.QuotaService;
  8. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  9. import java.math.BigDecimal;
  10. import java.util.Map;
  11. import java.util.Optional;
  12. import java.util.stream.Collectors;
  13. import lombok.RequiredArgsConstructor;
  14. import org.springframework.http.ResponseEntity;
  15. import org.springframework.web.bind.annotation.RestController;
  16. import org.springframework.web.server.ServerWebExchange;
  17. import reactor.core.publisher.Flux;
  18. import reactor.core.publisher.Mono;
  19. @RestController
  20. @RequiredArgsConstructor
  21. public class ClientQuotasController extends AbstractController implements ClientQuotasApi {
  22. private final QuotaService quotaService;
  23. private final AccessControlService accessControlService;
  24. private final AuditService auditService;
  25. @Override
  26. public Mono<ResponseEntity<Flux<ClientQuotasDTO>>> listQuotas(String clusterName,
  27. ServerWebExchange exchange) {
  28. return Mono.just(quotaService.all(getCluster(clusterName)).map(this::map))
  29. .map(ResponseEntity::ok);
  30. }
  31. @Override
  32. public Mono<ResponseEntity<Void>> upsertClientQuotas(String clusterName,
  33. Mono<ClientQuotasDTO> clientQuotasDTO,
  34. ServerWebExchange exchange) {
  35. return clientQuotasDTO.flatMap(
  36. quotas ->
  37. quotaService.upsert(
  38. getCluster(clusterName),
  39. quotas.getUser(),
  40. quotas.getClientId(),
  41. quotas.getIp(),
  42. Optional.ofNullable(quotas.getQuotas()).orElse(Map.of())
  43. .entrySet()
  44. .stream()
  45. .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue()))
  46. )
  47. ).map(statusCode -> ResponseEntity.status(statusCode).build());
  48. }
  49. private ClientQuotasDTO map(ClientQuotaRecord quotaRecord) {
  50. return new ClientQuotasDTO()
  51. .user(quotaRecord.user())
  52. .clientId(quotaRecord.clientId())
  53. .ip(quotaRecord.ip())
  54. .quotas(
  55. quotaRecord.quotas().entrySet().stream()
  56. .collect(toMap(Map.Entry::getKey, e -> BigDecimal.valueOf(e.getValue())))
  57. );
  58. }
  59. }