KsqlClient.java 1007 B

1234567891011121314151617181920212223242526272829
  1. package com.provectus.kafka.ui.client;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
  4. import lombok.RequiredArgsConstructor;
  5. import lombok.extern.log4j.Log4j2;
  6. import org.springframework.http.MediaType;
  7. import org.springframework.stereotype.Service;
  8. import org.springframework.web.reactive.function.BodyInserters;
  9. import org.springframework.web.reactive.function.client.WebClient;
  10. import reactor.core.publisher.Mono;
  11. @Service
  12. @RequiredArgsConstructor
  13. @Log4j2
  14. public final class KsqlClient {
  15. private final WebClient webClient;
  16. public Mono<Object> execute(KsqlStatementStrategy ksqlStatement) {
  17. return webClient.post()
  18. .uri(ksqlStatement.getUri())
  19. .accept(new MediaType("application","vnd.ksql.v1+json"))
  20. .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
  21. .retrieve()
  22. .bodyToMono(String.class)
  23. .map(ksqlStatement::serializeResponse)
  24. .doOnError(log::error);
  25. }
  26. }