KafkaConnectService.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package com.provectus.kafka.ui.service;
  2. import com.fasterxml.jackson.core.type.TypeReference;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.provectus.kafka.ui.client.KafkaConnectClients;
  5. import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
  6. import com.provectus.kafka.ui.connect.model.ConnectorStatus;
  7. import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
  8. import com.provectus.kafka.ui.connect.model.ConnectorTopics;
  9. import com.provectus.kafka.ui.connect.model.TaskStatus;
  10. import com.provectus.kafka.ui.exception.ConnectNotFoundException;
  11. import com.provectus.kafka.ui.exception.ValidationException;
  12. import com.provectus.kafka.ui.mapper.ClusterMapper;
  13. import com.provectus.kafka.ui.mapper.KafkaConnectMapper;
  14. import com.provectus.kafka.ui.model.ConnectDTO;
  15. import com.provectus.kafka.ui.model.ConnectorActionDTO;
  16. import com.provectus.kafka.ui.model.ConnectorDTO;
  17. import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
  18. import com.provectus.kafka.ui.model.ConnectorPluginDTO;
  19. import com.provectus.kafka.ui.model.ConnectorStateDTO;
  20. import com.provectus.kafka.ui.model.ConnectorTaskStatusDTO;
  21. import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
  22. import com.provectus.kafka.ui.model.KafkaCluster;
  23. import com.provectus.kafka.ui.model.KafkaConnectCluster;
  24. import com.provectus.kafka.ui.model.NewConnectorDTO;
  25. import com.provectus.kafka.ui.model.TaskDTO;
  26. import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
  27. import java.util.HashMap;
  28. import java.util.List;
  29. import java.util.Map;
  30. import java.util.function.Function;
  31. import java.util.function.Predicate;
  32. import java.util.stream.Collectors;
  33. import java.util.stream.Stream;
  34. import lombok.RequiredArgsConstructor;
  35. import lombok.SneakyThrows;
  36. import lombok.extern.slf4j.Slf4j;
  37. import org.apache.commons.lang3.StringUtils;
  38. import org.springframework.stereotype.Service;
  39. import org.springframework.web.reactive.function.client.WebClientResponseException;
  40. import reactor.core.publisher.Flux;
  41. import reactor.core.publisher.Mono;
  42. import reactor.util.function.Tuple2;
  43. import reactor.util.function.Tuples;
  44. @Service
  45. @Slf4j
  46. @RequiredArgsConstructor
  47. public class KafkaConnectService {
  48. private final ClusterMapper clusterMapper;
  49. private final KafkaConnectMapper kafkaConnectMapper;
  50. private final ObjectMapper objectMapper;
  51. private final KafkaConfigSanitizer kafkaConfigSanitizer;
  52. public Mono<Flux<ConnectDTO>> getConnects(KafkaCluster cluster) {
  53. return Mono.just(
  54. Flux.fromIterable(
  55. cluster.getKafkaConnect().stream()
  56. .map(clusterMapper::toKafkaConnect)
  57. .collect(Collectors.toList())
  58. )
  59. );
  60. }
  61. public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
  62. final String search) {
  63. return getConnects(cluster)
  64. .flatMapMany(Function.identity())
  65. .flatMap(connect -> getConnectorNames(cluster, connect.getName()))
  66. .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
  67. .flatMap(connector ->
  68. getConnectorConfig(cluster, connector.getConnect(), connector.getName())
  69. .map(config -> InternalConnectInfo.builder()
  70. .connector(connector)
  71. .config(config)
  72. .build()
  73. )
  74. )
  75. .flatMap(connectInfo -> {
  76. ConnectorDTO connector = connectInfo.getConnector();
  77. return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
  78. .collectList()
  79. .map(tasks -> InternalConnectInfo.builder()
  80. .connector(connector)
  81. .config(connectInfo.getConfig())
  82. .tasks(tasks)
  83. .build()
  84. );
  85. })
  86. .flatMap(connectInfo -> {
  87. ConnectorDTO connector = connectInfo.getConnector();
  88. return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
  89. .map(ct -> InternalConnectInfo.builder()
  90. .connector(connector)
  91. .config(connectInfo.getConfig())
  92. .tasks(connectInfo.getTasks())
  93. .topics(ct.getTopics())
  94. .build()
  95. );
  96. })
  97. .map(kafkaConnectMapper::fullConnectorInfoFromTuple)
  98. .filter(matchesSearchTerm(search));
  99. }
  100. private Predicate<FullConnectorInfoDTO> matchesSearchTerm(final String search) {
  101. return connector -> getSearchValues(connector)
  102. .anyMatch(value -> value.contains(
  103. StringUtils.defaultString(
  104. search,
  105. StringUtils.EMPTY)
  106. .toUpperCase()));
  107. }
  108. private Stream<String> getSearchValues(FullConnectorInfoDTO fullConnectorInfo) {
  109. return Stream.of(
  110. fullConnectorInfo.getName(),
  111. fullConnectorInfo.getStatus().getState().getValue(),
  112. fullConnectorInfo.getType().getValue())
  113. .map(String::toUpperCase);
  114. }
  115. private Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
  116. String connectorName) {
  117. return withConnectClient(cluster, connectClusterName)
  118. .flatMap(c -> c.getConnectorTopics(connectorName).map(result -> result.get(connectorName)))
  119. // old connectors don't have this api, setting empty list for
  120. // backward-compatibility
  121. .onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of())));
  122. }
  123. private Flux<Tuple2<String, String>> getConnectorNames(KafkaCluster cluster, String connectName) {
  124. return getConnectors(cluster, connectName)
  125. .collectList().map(e -> e.get(0))
  126. // for some reason `getConnectors` method returns the response as a single string
  127. .map(this::parseToList)
  128. .flatMapMany(Flux::fromIterable)
  129. .map(connector -> Tuples.of(connectName, connector));
  130. }
  131. @SneakyThrows
  132. private List<String> parseToList(String json) {
  133. return objectMapper.readValue(json, new TypeReference<>() {
  134. });
  135. }
  136. public Flux<String> getConnectors(KafkaCluster cluster, String connectName) {
  137. return withConnectClient(cluster, connectName)
  138. .flatMapMany(client ->
  139. client.getConnectors(null)
  140. .doOnError(e -> log.error("Unexpected error upon getting connectors", e))
  141. );
  142. }
  143. public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
  144. Mono<NewConnectorDTO> connector) {
  145. return withConnectClient(cluster, connectName)
  146. .flatMap(client ->
  147. connector
  148. .flatMap(c -> connectorExists(cluster, connectName, c.getName())
  149. .map(exists -> {
  150. if (Boolean.TRUE.equals(exists)) {
  151. throw new ValidationException(
  152. String.format("Connector with name %s already exists", c.getName()));
  153. }
  154. return c;
  155. }))
  156. .map(kafkaConnectMapper::toClient)
  157. .flatMap(client::createConnector)
  158. .flatMap(c -> getConnector(cluster, connectName, c.getName()))
  159. );
  160. }
  161. private Mono<Boolean> connectorExists(KafkaCluster cluster, String connectName,
  162. String connectorName) {
  163. return getConnectorNames(cluster, connectName)
  164. .map(Tuple2::getT2)
  165. .collectList()
  166. .map(connectorNames -> connectorNames.contains(connectorName));
  167. }
  168. public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
  169. String connectorName) {
  170. return withConnectClient(cluster, connectName)
  171. .flatMap(client -> client.getConnector(connectorName)
  172. .map(kafkaConnectMapper::fromClient)
  173. .flatMap(connector ->
  174. client.getConnectorStatus(connector.getName())
  175. // status request can return 404 if tasks not assigned yet
  176. .onErrorResume(WebClientResponseException.NotFound.class,
  177. e -> emptyStatus(connectorName))
  178. .map(connectorStatus -> {
  179. var status = connectorStatus.getConnector();
  180. final Map<String, Object> obfuscatedConfig = connector.getConfig().entrySet()
  181. .stream()
  182. .collect(Collectors.toMap(
  183. Map.Entry::getKey,
  184. e -> kafkaConfigSanitizer.sanitize(e.getKey(), e.getValue())
  185. ));
  186. ConnectorDTO result = (ConnectorDTO) new ConnectorDTO()
  187. .connect(connectName)
  188. .status(kafkaConnectMapper.fromClient(status))
  189. .type(connector.getType())
  190. .tasks(connector.getTasks())
  191. .name(connector.getName())
  192. .config(obfuscatedConfig);
  193. if (connectorStatus.getTasks() != null) {
  194. boolean isAnyTaskFailed = connectorStatus.getTasks().stream()
  195. .map(TaskStatus::getState)
  196. .anyMatch(TaskStatus.StateEnum.FAILED::equals);
  197. if (isAnyTaskFailed) {
  198. result.getStatus().state(ConnectorStateDTO.TASK_FAILED);
  199. }
  200. }
  201. return result;
  202. })
  203. )
  204. );
  205. }
  206. private Mono<ConnectorStatus> emptyStatus(String connectorName) {
  207. return Mono.just(new ConnectorStatus()
  208. .name(connectorName)
  209. .tasks(List.of())
  210. .connector(new ConnectorStatusConnector()
  211. .state(ConnectorStatusConnector.StateEnum.UNASSIGNED)));
  212. }
  213. public Mono<Map<String, Object>> getConnectorConfig(KafkaCluster cluster, String connectName,
  214. String connectorName) {
  215. return withConnectClient(cluster, connectName)
  216. .flatMap(c -> c.getConnectorConfig(connectorName))
  217. .map(connectorConfig -> {
  218. final Map<String, Object> obfuscatedMap = new HashMap<>();
  219. connectorConfig.forEach((key, value) ->
  220. obfuscatedMap.put(key, kafkaConfigSanitizer.sanitize(key, value)));
  221. return obfuscatedMap;
  222. });
  223. }
  224. public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName,
  225. String connectorName, Mono<Object> requestBody) {
  226. return withConnectClient(cluster, connectName)
  227. .flatMap(c ->
  228. requestBody
  229. .flatMap(body -> c.setConnectorConfig(connectorName, (Map<String, Object>) body))
  230. .map(kafkaConnectMapper::fromClient));
  231. }
  232. public Mono<Void> deleteConnector(
  233. KafkaCluster cluster, String connectName, String connectorName) {
  234. return withConnectClient(cluster, connectName)
  235. .flatMap(c -> c.deleteConnector(connectorName));
  236. }
  237. public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
  238. String connectorName, ConnectorActionDTO action) {
  239. return withConnectClient(cluster, connectName)
  240. .flatMap(client -> {
  241. switch (action) {
  242. case RESTART:
  243. return client.restartConnector(connectorName, false, false);
  244. case RESTART_ALL_TASKS:
  245. return restartTasks(cluster, connectName, connectorName, task -> true);
  246. case RESTART_FAILED_TASKS:
  247. return restartTasks(cluster, connectName, connectorName,
  248. t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
  249. case PAUSE:
  250. return client.pauseConnector(connectorName);
  251. case RESUME:
  252. return client.resumeConnector(connectorName);
  253. default:
  254. throw new IllegalStateException("Unexpected value: " + action);
  255. }
  256. });
  257. }
  258. private Mono<Void> restartTasks(KafkaCluster cluster, String connectName,
  259. String connectorName, Predicate<TaskDTO> taskFilter) {
  260. return getConnectorTasks(cluster, connectName, connectorName)
  261. .filter(taskFilter)
  262. .flatMap(t ->
  263. restartConnectorTask(cluster, connectName, connectorName, t.getId().getTask()))
  264. .then();
  265. }
  266. public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName, String connectorName) {
  267. return withConnectClient(cluster, connectName)
  268. .flatMapMany(client ->
  269. client.getConnectorTasks(connectorName)
  270. .onErrorResume(WebClientResponseException.NotFound.class, e -> Flux.empty())
  271. .map(kafkaConnectMapper::fromClient)
  272. .flatMap(task ->
  273. client
  274. .getConnectorTaskStatus(connectorName, task.getId().getTask())
  275. .onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
  276. .map(kafkaConnectMapper::fromClient)
  277. .map(task::status)
  278. ));
  279. }
  280. public Mono<Void> restartConnectorTask(KafkaCluster cluster, String connectName,
  281. String connectorName, Integer taskId) {
  282. return withConnectClient(cluster, connectName)
  283. .flatMap(client -> client.restartConnectorTask(connectorName, taskId));
  284. }
  285. public Mono<Flux<ConnectorPluginDTO>> getConnectorPlugins(KafkaCluster cluster,
  286. String connectName) {
  287. return withConnectClient(cluster, connectName)
  288. .map(client -> client.getConnectorPlugins().map(kafkaConnectMapper::fromClient));
  289. }
  290. public Mono<ConnectorPluginConfigValidationResponseDTO> validateConnectorPluginConfig(
  291. KafkaCluster cluster, String connectName, String pluginName, Mono<Object> requestBody) {
  292. return withConnectClient(cluster, connectName)
  293. .flatMap(client ->
  294. requestBody
  295. .flatMap(body ->
  296. client.validateConnectorPluginConfig(pluginName, (Map<String, Object>) body))
  297. .map(kafkaConnectMapper::fromClient)
  298. );
  299. }
  300. private Mono<KafkaConnectClientApi> withConnectClient(KafkaCluster cluster, String connectName) {
  301. return Mono.justOrEmpty(cluster.getKafkaConnect().stream()
  302. .filter(connect -> connect.getName().equals(connectName))
  303. .findFirst()
  304. .map(KafkaConnectCluster::getAddress))
  305. .switchIfEmpty(Mono.error(ConnectNotFoundException::new))
  306. .map(KafkaConnectClients::withBaseUrl);
  307. }
  308. }