1234567891011121314151617181920212223242526272829 |
- package com.provectus.kafka.ui.client;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.provectus.kafka.ui.strategy.ksqlStatement.KsqlStatementStrategy;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.log4j.Log4j2;
- import org.springframework.http.MediaType;
- import org.springframework.stereotype.Service;
- import org.springframework.web.reactive.function.BodyInserters;
- import org.springframework.web.reactive.function.client.WebClient;
- import reactor.core.publisher.Mono;
- @Service
- @RequiredArgsConstructor
- @Log4j2
- public final class KsqlClient {
- private final WebClient webClient;
- public Mono<Object> execute(KsqlStatementStrategy ksqlStatement) {
- return webClient.post()
- .uri(ksqlStatement.getUri())
- .accept(new MediaType("application","vnd.ksql.v1+json"))
- .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
- .retrieve()
- .bodyToMono(String.class)
- .map(ksqlStatement::serializeResponse)
- .doOnError(log::error);
- }
- }
|