KsqlClient.java 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package com.provectus.kafka.ui.client;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.provectus.kafka.ui.exception.UnprocessableEntityException;
  5. import com.provectus.kafka.ui.model.KafkaCluster;
  6. import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
  7. import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
  8. import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
  9. import lombok.RequiredArgsConstructor;
  10. import lombok.SneakyThrows;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.http.HttpStatus;
  13. import org.springframework.http.MediaType;
  14. import org.springframework.stereotype.Service;
  15. import org.springframework.web.reactive.function.BodyInserters;
  16. import org.springframework.web.reactive.function.client.ClientResponse;
  17. import org.springframework.web.reactive.function.client.WebClient;
  18. import reactor.core.publisher.Mono;
  19. @Service
  20. @RequiredArgsConstructor
  21. @Slf4j
  22. public class KsqlClient {
  23. private final WebClient webClient;
  24. private final ObjectMapper mapper;
  25. public Mono<KsqlCommandResponseDTO> execute(BaseStrategy ksqlStatement, KafkaCluster cluster) {
  26. return webClient.post()
  27. .uri(ksqlStatement.getUri())
  28. .headers(httpHeaders -> KsqlApiClient.setBasicAuthIfEnabled(httpHeaders, cluster))
  29. .accept(new MediaType("application", "vnd.ksql.v1+json"))
  30. .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
  31. .retrieve()
  32. .onStatus(HttpStatus::isError, this::getErrorMessage)
  33. .bodyToMono(byte[].class)
  34. .map(this::toJson)
  35. .map(ksqlStatement::serializeResponse);
  36. }
  37. private Mono<Throwable> getErrorMessage(ClientResponse response) {
  38. return response
  39. .bodyToMono(byte[].class)
  40. .map(this::toJson)
  41. .map(jsonNode -> jsonNode.get("message").asText())
  42. .flatMap(error -> Mono.error(new UnprocessableEntityException(error)));
  43. }
  44. @SneakyThrows
  45. private JsonNode toJson(byte[] content) {
  46. return this.mapper.readTree(content);
  47. }
  48. }