KsqlClient.java 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package com.provectus.kafka.ui.client;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.provectus.kafka.ui.exception.UnprocessableEntityException;
  5. import com.provectus.kafka.ui.model.KsqlCommandResponse;
  6. import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
  7. import lombok.RequiredArgsConstructor;
  8. import lombok.SneakyThrows;
  9. import lombok.extern.log4j.Log4j2;
  10. import org.springframework.http.HttpStatus;
  11. import org.springframework.http.MediaType;
  12. import org.springframework.stereotype.Service;
  13. import org.springframework.web.reactive.function.BodyInserters;
  14. import org.springframework.web.reactive.function.client.ClientResponse;
  15. import org.springframework.web.reactive.function.client.WebClient;
  16. import reactor.core.publisher.Mono;
  17. @Service
  18. @RequiredArgsConstructor
  19. @Log4j2
  20. public class KsqlClient {
  21. private final WebClient webClient;
  22. private final ObjectMapper mapper;
  23. public Mono<KsqlCommandResponse> execute(BaseStrategy ksqlStatement) {
  24. return webClient.post()
  25. .uri(ksqlStatement.getUri())
  26. .accept(new MediaType("application", "vnd.ksql.v1+json"))
  27. .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
  28. .retrieve()
  29. .onStatus(HttpStatus::isError, this::getErrorMessage)
  30. .bodyToMono(byte[].class)
  31. .map(this::toJson)
  32. .map(ksqlStatement::serializeResponse);
  33. }
  34. private Mono<Throwable> getErrorMessage(ClientResponse response) {
  35. return response
  36. .bodyToMono(byte[].class)
  37. .map(this::toJson)
  38. .map(jsonNode -> jsonNode.get("message").asText())
  39. .flatMap(error -> Mono.error(new UnprocessableEntityException(error)));
  40. }
  41. @SneakyThrows
  42. private JsonNode toJson(byte[] content) {
  43. return this.mapper.readTree(content);
  44. }
  45. }