KafkaConnectServiceTests.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package com.provectus.kafka.ui;
  2. import com.provectus.kafka.ui.model.*;
  3. import lombok.extern.log4j.Log4j2;
  4. import org.junit.jupiter.api.AfterEach;
  5. import org.junit.jupiter.api.BeforeEach;
  6. import org.junit.jupiter.api.Test;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
  9. import org.springframework.core.ParameterizedTypeReference;
  10. import org.springframework.test.context.ContextConfiguration;
  11. import org.springframework.test.web.reactive.server.WebTestClient;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.UUID;
  15. import static java.util.function.Predicate.not;
  16. import static org.junit.jupiter.api.Assertions.assertEquals;
  17. @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
  18. @Log4j2
  19. @AutoConfigureWebTestClient(timeout = "60000")
  20. public class KafkaConnectServiceTests extends AbstractBaseTest {
  21. private final String connectName = "kafka-connect";
  22. private final String connectorName = UUID.randomUUID().toString();
  23. private final Map<String, Object> config = Map.of(
  24. "name", connectorName,
  25. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  26. "tasks.max", "1",
  27. "topics", "output-topic",
  28. "file", "/tmp/test"
  29. );
  30. @Autowired
  31. private WebTestClient webTestClient;
  32. @BeforeEach
  33. public void setUp() {
  34. webTestClient.post()
  35. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  36. .bodyValue(new NewConnector()
  37. .name(connectorName)
  38. .config(Map.of(
  39. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  40. "tasks.max", "1",
  41. "topics", "output-topic",
  42. "file", "/tmp/test"
  43. ))
  44. )
  45. .exchange()
  46. .expectStatus().isOk();
  47. }
  48. @AfterEach
  49. public void tearDown() {
  50. webTestClient.delete()
  51. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName)
  52. .exchange()
  53. .expectStatus().isOk();
  54. }
  55. @Test
  56. public void shouldListConnectors() {
  57. webTestClient.get()
  58. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  59. .exchange()
  60. .expectStatus().isOk()
  61. .expectBody()
  62. .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
  63. .exists();
  64. }
  65. @Test
  66. public void shouldReturnNotFoundForNonExistingCluster() {
  67. webTestClient.get()
  68. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster", connectName)
  69. .exchange()
  70. .expectStatus().isNotFound();
  71. }
  72. @Test
  73. public void shouldReturnNotFoundForNonExistingConnectName() {
  74. webTestClient.get()
  75. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, "nonExistingConnect")
  76. .exchange()
  77. .expectStatus().isNotFound();
  78. }
  79. @Test
  80. public void shouldRetrieveConnector() {
  81. Connector expected = (Connector) new Connector()
  82. .status(new ConnectorStatus()
  83. .state(ConnectorStatus.StateEnum.RUNNING)
  84. .workerId("kafka-connect:8083"))
  85. .tasks(List.of(new TaskId()
  86. .connector(connectorName)
  87. .task(0)))
  88. .type(Connector.TypeEnum.SINK)
  89. .name(connectorName)
  90. .config(config);
  91. webTestClient.get()
  92. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName)
  93. .exchange()
  94. .expectStatus().isOk()
  95. .expectBody(Connector.class)
  96. .value(connector -> assertEquals(expected, connector));
  97. }
  98. @Test
  99. public void shouldUpdateConfig() {
  100. webTestClient.put()
  101. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
  102. .bodyValue(Map.of(
  103. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  104. "tasks.max", "1",
  105. "topics", "another-topic",
  106. "file", "/tmp/new"
  107. )
  108. )
  109. .exchange()
  110. .expectStatus().isOk();
  111. webTestClient.get()
  112. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
  113. .exchange()
  114. .expectStatus().isOk()
  115. .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
  116. })
  117. .isEqualTo(Map.of(
  118. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  119. "tasks.max", "1",
  120. "topics", "another-topic",
  121. "file", "/tmp/new",
  122. "name", connectorName
  123. ));
  124. }
  125. @Test
  126. public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() {
  127. var connectorName = UUID.randomUUID().toString();
  128. webTestClient.post()
  129. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  130. .bodyValue(Map.of(
  131. "name", connectorName,
  132. "config", Map.of(
  133. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  134. "tasks.max", "invalid number",
  135. "topics", "another-topic",
  136. "file", "/tmp/test"
  137. ))
  138. )
  139. .exchange()
  140. .expectStatus().isBadRequest();
  141. webTestClient.get()
  142. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  143. .exchange()
  144. .expectStatus().isOk()
  145. .expectBody()
  146. .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
  147. .doesNotExist();
  148. }
  149. @Test
  150. public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {
  151. var connectorName = UUID.randomUUID().toString();
  152. webTestClient.post()
  153. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  154. .bodyValue(Map.of(
  155. "name", connectorName,
  156. "config", Map.of(
  157. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
  158. ))
  159. )
  160. .exchange()
  161. .expectStatus().isBadRequest();
  162. webTestClient.get()
  163. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
  164. .exchange()
  165. .expectStatus().isOk()
  166. .expectBody()
  167. .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
  168. .doesNotExist();
  169. }
  170. @Test
  171. public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
  172. webTestClient.put()
  173. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
  174. .bodyValue(Map.of(
  175. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  176. "tasks.max", "invalid number",
  177. "topics", "another-topic",
  178. "file", "/tmp/test"
  179. )
  180. )
  181. .exchange()
  182. .expectStatus().isBadRequest();
  183. webTestClient.get()
  184. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
  185. .exchange()
  186. .expectStatus().isOk()
  187. .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
  188. })
  189. .isEqualTo(Map.of(
  190. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  191. "tasks.max", "1",
  192. "topics", "output-topic",
  193. "file", "/tmp/test",
  194. "name", connectorName
  195. ));
  196. }
  197. @Test
  198. public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() {
  199. webTestClient.put()
  200. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
  201. .bodyValue(Map.of(
  202. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
  203. )
  204. )
  205. .exchange()
  206. .expectStatus().isBadRequest();
  207. webTestClient.get()
  208. .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
  209. .exchange()
  210. .expectStatus().isOk()
  211. .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
  212. })
  213. .isEqualTo(Map.of(
  214. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  215. "tasks.max", "1",
  216. "topics", "output-topic",
  217. "file", "/tmp/test",
  218. "name", connectorName
  219. ));
  220. }
  221. @Test
  222. public void shouldRetrieveConnectorPlugins() {
  223. webTestClient.get()
  224. .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins", LOCAL, connectName)
  225. .exchange()
  226. .expectStatus().isOk()
  227. .expectBodyList(ConnectorPlugin.class)
  228. .value(plugins -> assertEquals(13, plugins.size()));
  229. }
  230. @Test
  231. public void shouldSuccessfullyValidateConnectorPluginConfiguration() {
  232. var pluginName = "FileStreamSinkConnector";
  233. webTestClient.put()
  234. .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", LOCAL, connectName, pluginName)
  235. .bodyValue(Map.of(
  236. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  237. "tasks.max", "1",
  238. "topics", "output-topic",
  239. "file", "/tmp/test",
  240. "name", connectorName
  241. )
  242. )
  243. .exchange()
  244. .expectStatus().isOk()
  245. .expectBody(ConnectorPluginConfigValidationResponse.class)
  246. .value(response -> assertEquals(0, response.getErrorCount()));
  247. }
  248. @Test
  249. public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() {
  250. var pluginName = "FileStreamSinkConnector";
  251. webTestClient.put()
  252. .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", LOCAL, connectName, pluginName)
  253. .bodyValue(Map.of(
  254. "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
  255. "tasks.max", "0",
  256. "topics", "output-topic",
  257. "file", "/tmp/test",
  258. "name", connectorName
  259. )
  260. )
  261. .exchange()
  262. .expectStatus().isOk()
  263. .expectBody(ConnectorPluginConfigValidationResponse.class)
  264. .value(response -> {
  265. assertEquals(1, response.getErrorCount());
  266. var error = response.getConfigs().stream()
  267. .map(ConnectorPluginConfig::getValue)
  268. .map(ConnectorPluginConfigValue::getErrors)
  269. .filter(not(List::isEmpty))
  270. .findFirst().get();
  271. assertEquals(
  272. "Invalid value 0 for configuration tasks.max: Value must be at least 1",
  273. error.get(0)
  274. );
  275. });
  276. }
  277. }