KafkaConnectServiceTests.java 14 KB


  1. package com.provectus.kafka.ui;
  2. import static java.util.function.Predicate.not;
  3. import static org.junit.jupiter.api.Assertions.assertEquals;
  4. import com.provectus.kafka.ui.model.ConnectorDTO;
  5. import com.provectus.kafka.ui.model.ConnectorPluginConfigDTO;
  6. import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
  7. import com.provectus.kafka.ui.model.ConnectorPluginConfigValueDTO;
  8. import com.provectus.kafka.ui.model.ConnectorPluginDTO;
  9. import com.provectus.kafka.ui.model.ConnectorStateDTO;
  10. import com.provectus.kafka.ui.model.ConnectorStatusDTO;
  11. import com.provectus.kafka.ui.model.ConnectorTypeDTO;
  12. import com.provectus.kafka.ui.model.NewConnectorDTO;
  13. import com.provectus.kafka.ui.model.TaskIdDTO;
  14. import java.util.List;
  15. import java.util.Map;
  16. import java.util.UUID;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.junit.jupiter.api.AfterEach;
  19. import org.junit.jupiter.api.BeforeEach;
  20. import org.junit.jupiter.api.Test;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
  23. import org.springframework.core.ParameterizedTypeReference;
  24. import org.springframework.test.context.ContextConfiguration;
  25. import org.springframework.test.web.reactive.server.WebTestClient;
  26. @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
  27. @Slf4j
  28. @AutoConfigureWebTestClient(timeout = "60000")
  29. public class KafkaConnectServiceTests extends AbstractBaseTest {
  30. private final String connectName = "kafka-connect";
  31. private final String connectorName = UUID.randomUUID().toString();
  32. private final Map<String, Object> config = Map.of(
  33. "name", connectorName,
  34. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  35. "tasks.max", "1",
  36. "topics", "output-topic",
  37. "file", "/tmp/test",
  38. "test.password", "******"
  39. );
  40. @Autowired
  41. private WebTestClient webTestClient;
  42. @BeforeEach
  43. public void setUp() {
  44. webTestClient.post()
  45. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  46. .bodyValue(new NewConnectorDTO()
  47. .name(connectorName)
  48. .config(Map.of(
  49. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  50. "tasks.max", "1",
  51. "topics", "output-topic",
  52. "file", "/tmp/test",
  53. "test.password", "test-credentials"
  54. ))
  55. )
  56. .exchange()
  57. .expectStatus().isOk();
  58. }
  59. @AfterEach
  60. public void tearDown() {
  61. webTestClient.delete()
  62. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL,
  63. connectName, connectorName)
  64. .exchange()
  65. .expectStatus().isOk();
  66. }
  67. @Test
  68. public void shouldListAllConnectors() {
  69. webTestClient.get()
  70. .uri("/api/clusters/{clusterName}/connectors", LOCAL)
  71. .exchange()
  72. .expectStatus().isOk()
  73. .expectBody()
  74. .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
  75. .exists();
  76. }
  77. @Test
  78. public void shouldFilterByNameConnectors() {
  79. webTestClient.get()
  80. .uri(
  81. "/api/clusters/{clusterName}/connectors?search={search}",
  82. LOCAL,
  83. connectorName.split("-")[1])
  84. .exchange()
  85. .expectStatus().isOk()
  86. .expectBody()
  87. .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
  88. .exists();
  89. }
  90. @Test
  91. public void shouldFilterByStatusConnectors() {
  92. webTestClient.get()
  93. .uri(
  94. "/api/clusters/{clusterName}/connectors?search={search}",
  95. LOCAL,
  96. "running")
  97. .exchange()
  98. .expectStatus().isOk()
  99. .expectBody()
  100. .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
  101. .exists();
  102. }
  103. @Test
  104. public void shouldFilterByTypeConnectors() {
  105. webTestClient.get()
  106. .uri(
  107. "/api/clusters/{clusterName}/connectors?search={search}",
  108. LOCAL,
  109. "sink")
  110. .exchange()
  111. .expectStatus().isOk()
  112. .expectBody()
  113. .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
  114. .exists();
  115. }
  116. @Test
  117. public void shouldNotFilterConnectors() {
  118. webTestClient.get()
  119. .uri(
  120. "/api/clusters/{clusterName}/connectors?search={search}",
  121. LOCAL,
  122. "something-else")
  123. .exchange()
  124. .expectStatus().isOk()
  125. .expectBody()
  126. .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
  127. .doesNotExist();
  128. }
  129. @Test
  130. public void shouldListConnectors() {
  131. webTestClient.get()
  132. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  133. .exchange()
  134. .expectStatus().isOk()
  135. .expectBody()
  136. .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
  137. .exists();
  138. }
  139. @Test
  140. public void shouldReturnNotFoundForNonExistingCluster() {
  141. webTestClient.get()
  142. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster",
  143. connectName)
  144. .exchange()
  145. .expectStatus().isNotFound();
  146. }
  147. @Test
  148. public void shouldReturnNotFoundForNonExistingConnectName() {
  149. webTestClient.get()
  150. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL,
  151. "nonExistingConnect")
  152. .exchange()
  153. .expectStatus().isNotFound();
  154. }
  155. @Test
  156. public void shouldRetrieveConnector() {
  157. ConnectorDTO expected = (ConnectorDTO) new ConnectorDTO()
  158. .connect(connectName)
  159. .status(new ConnectorStatusDTO()
  160. .state(ConnectorStateDTO.RUNNING)
  161. .workerId("kafka-connect:8083"))
  162. .tasks(List.of(new TaskIdDTO()
  163. .connector(connectorName)
  164. .task(0)))
  165. .type(ConnectorTypeDTO.SINK)
  166. .name(connectorName)
  167. .config(config);
  168. webTestClient.get()
  169. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL,
  170. connectName, connectorName)
  171. .exchange()
  172. .expectStatus().isOk()
  173. .expectBody(ConnectorDTO.class)
  174. .value(connector -> assertEquals(expected, connector));
  175. }
  176. @Test
  177. public void shouldUpdateConfig() {
  178. webTestClient.put()
  179. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
  180. LOCAL, connectName, connectorName)
  181. .bodyValue(Map.of(
  182. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  183. "tasks.max", "1",
  184. "topics", "another-topic",
  185. "file", "/tmp/new"
  186. )
  187. )
  188. .exchange()
  189. .expectStatus().isOk();
  190. webTestClient.get()
  191. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
  192. LOCAL, connectName, connectorName)
  193. .exchange()
  194. .expectStatus().isOk()
  195. .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
  196. })
  197. .isEqualTo(Map.of(
  198. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  199. "tasks.max", "1",
  200. "topics", "another-topic",
  201. "file", "/tmp/new",
  202. "name", connectorName
  203. ));
  204. }
  205. @Test
  206. public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() {
  207. var connectorName = UUID.randomUUID().toString();
  208. webTestClient.post()
  209. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  210. .bodyValue(Map.of(
  211. "name", connectorName,
  212. "config", Map.of(
  213. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  214. "tasks.max", "invalid number",
  215. "topics", "another-topic",
  216. "file", "/tmp/test"
  217. ))
  218. )
  219. .exchange()
  220. .expectStatus().isBadRequest();
  221. webTestClient.get()
  222. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  223. .exchange()
  224. .expectStatus().isOk()
  225. .expectBody()
  226. .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
  227. .doesNotExist();
  228. }
  229. @Test
  230. public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {
  231. var connectorName = UUID.randomUUID().toString();
  232. webTestClient.post()
  233. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  234. .bodyValue(Map.of(
  235. "name", connectorName,
  236. "config", Map.of(
  237. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
  238. ))
  239. )
  240. .exchange()
  241. .expectStatus().isBadRequest();
  242. webTestClient.get()
  243. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  244. .exchange()
  245. .expectStatus().isOk()
  246. .expectBody()
  247. .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
  248. .doesNotExist();
  249. }
  250. @Test
  251. public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
  252. webTestClient.put()
  253. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
  254. LOCAL, connectName, connectorName)
  255. .bodyValue(Map.of(
  256. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  257. "tasks.max", "invalid number",
  258. "topics", "another-topic",
  259. "file", "/tmp/test"
  260. )
  261. )
  262. .exchange()
  263. .expectStatus().isBadRequest();
  264. webTestClient.get()
  265. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
  266. LOCAL, connectName, connectorName)
  267. .exchange()
  268. .expectStatus().isOk()
  269. .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
  270. })
  271. .isEqualTo(Map.of(
  272. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  273. "tasks.max", "1",
  274. "topics", "output-topic",
  275. "file", "/tmp/test",
  276. "name", connectorName,
  277. "test.password", "******"
  278. ));
  279. }
  280. @Test
  281. public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() {
  282. webTestClient.put()
  283. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
  284. LOCAL, connectName, connectorName)
  285. .bodyValue(Map.of(
  286. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
  287. )
  288. )
  289. .exchange()
  290. .expectStatus().isBadRequest();
  291. webTestClient.get()
  292. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
  293. LOCAL, connectName, connectorName)
  294. .exchange()
  295. .expectStatus().isOk()
  296. .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
  297. })
  298. .isEqualTo(Map.of(
  299. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  300. "tasks.max", "1",
  301. "topics", "output-topic",
  302. "file", "/tmp/test",
  303. "test.password", "******",
  304. "name", connectorName
  305. ));
  306. }
  307. @Test
  308. public void shouldRetrieveConnectorPlugins() {
  309. webTestClient.get()
  310. .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins", LOCAL, connectName)
  311. .exchange()
  312. .expectStatus().isOk()
  313. .expectBodyList(ConnectorPluginDTO.class)
  314. .value(plugins -> assertEquals(14, plugins.size()));
  315. }
  316. @Test
  317. public void shouldSuccessfullyValidateConnectorPluginConfiguration() {
  318. var pluginName = "FileStreamSinkConnector";
  319. var path =
  320. "/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate";
  321. webTestClient.put()
  322. .uri(path, LOCAL, connectName, pluginName)
  323. .bodyValue(Map.of(
  324. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  325. "tasks.max", "1",
  326. "topics", "output-topic",
  327. "file", "/tmp/test",
  328. "name", connectorName
  329. )
  330. )
  331. .exchange()
  332. .expectStatus().isOk()
  333. .expectBody(ConnectorPluginConfigValidationResponseDTO.class)
  334. .value(response -> assertEquals(0, response.getErrorCount()));
  335. }
  336. @Test
  337. public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() {
  338. var pluginName = "FileStreamSinkConnector";
  339. var path =
  340. "/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate";
  341. webTestClient.put()
  342. .uri(path, LOCAL, connectName, pluginName)
  343. .bodyValue(Map.of(
  344. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  345. "tasks.max", "0",
  346. "topics", "output-topic",
  347. "file", "/tmp/test",
  348. "name", connectorName
  349. )
  350. )
  351. .exchange()
  352. .expectStatus().isOk()
  353. .expectBody(ConnectorPluginConfigValidationResponseDTO.class)
  354. .value(response -> {
  355. assertEquals(1, response.getErrorCount());
  356. var error = response.getConfigs().stream()
  357. .map(ConnectorPluginConfigDTO::getValue)
  358. .map(ConnectorPluginConfigValueDTO::getErrors)
  359. .filter(not(List::isEmpty))
  360. .findFirst().get();
  361. assertEquals(
  362. "Invalid value 0 for configuration tasks.max: Value must be at least 1",
  363. error.get(0)
  364. );
  365. });
  366. }
  367. @Test
  368. public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() {
  369. webTestClient.post()
  370. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  371. .bodyValue(new NewConnectorDTO()
  372. .name(connectorName)
  373. .config(Map.of(
  374. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  375. "tasks.max", "1",
  376. "topics", "output-topic",
  377. "file", "/tmp/test"
  378. ))
  379. )
  380. .exchange()
  381. .expectStatus()
  382. .isBadRequest();
  383. }
  384. }