ReactiveAdminClientTest.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package com.provectus.kafka.ui.service;
  2. import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExceptionFilter;
  3. import static java.util.Objects.requireNonNull;
  4. import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
  5. import static org.assertj.core.api.Assertions.assertThat;
  6. import static org.assertj.core.api.Assertions.assertThatThrownBy;
  7. import static org.assertj.core.api.ThrowableAssert.ThrowingCallable;
  8. import com.provectus.kafka.ui.AbstractIntegrationTest;
  9. import com.provectus.kafka.ui.exception.ValidationException;
  10. import com.provectus.kafka.ui.producer.KafkaTestProducer;
  11. import java.time.Duration;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.Map;
  15. import java.util.Properties;
  16. import java.util.UUID;
  17. import java.util.function.Function;
  18. import java.util.stream.Stream;
  19. import lombok.SneakyThrows;
  20. import org.apache.kafka.clients.admin.AdminClient;
  21. import org.apache.kafka.clients.admin.AlterConfigOp;
  22. import org.apache.kafka.clients.admin.Config;
  23. import org.apache.kafka.clients.admin.ConfigEntry;
  24. import org.apache.kafka.clients.admin.NewTopic;
  25. import org.apache.kafka.clients.admin.OffsetSpec;
  26. import org.apache.kafka.clients.admin.TopicDescription;
  27. import org.apache.kafka.clients.consumer.ConsumerConfig;
  28. import org.apache.kafka.clients.consumer.KafkaConsumer;
  29. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  30. import org.apache.kafka.clients.producer.ProducerRecord;
  31. import org.apache.kafka.common.KafkaFuture;
  32. import org.apache.kafka.common.Node;
  33. import org.apache.kafka.common.TopicPartition;
  34. import org.apache.kafka.common.TopicPartitionInfo;
  35. import org.apache.kafka.common.config.ConfigResource;
  36. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  37. import org.apache.kafka.common.internals.KafkaFutureImpl;
  38. import org.apache.kafka.common.serialization.StringDeserializer;
  39. import org.assertj.core.api.ThrowableAssert;
  40. import org.junit.function.ThrowingRunnable;
  41. import org.junit.jupiter.api.AfterEach;
  42. import org.junit.jupiter.api.BeforeEach;
  43. import org.junit.jupiter.api.Test;
  44. import reactor.test.StepVerifier;
  45. class ReactiveAdminClientTest extends AbstractIntegrationTest {
  46. private final List<ThrowingRunnable> clearings = new ArrayList<>();
  47. private AdminClient adminClient;
  48. private ReactiveAdminClient reactiveAdminClient;
  49. @BeforeEach
  50. void init() {
  51. AdminClientService adminClientService = applicationContext.getBean(AdminClientService.class);
  52. ClustersStorage clustersStorage = applicationContext.getBean(ClustersStorage.class);
  53. reactiveAdminClient = requireNonNull(adminClientService.get(clustersStorage.getClusterByName(LOCAL).get()).block());
  54. adminClient = reactiveAdminClient.getClient();
  55. }
  56. @AfterEach
  57. void tearDown() {
  58. for (ThrowingRunnable clearing : clearings) {
  59. try {
  60. clearing.run();
  61. } catch (Throwable th) {
  62. //NOOP
  63. }
  64. }
  65. }
  66. @Test
  67. void testUpdateTopicConfigs() throws Exception {
  68. String topic = UUID.randomUUID().toString();
  69. createTopics(new NewTopic(topic, 1, (short) 1));
  70. var configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
  71. adminClient.incrementalAlterConfigs(
  72. Map.of(
  73. configResource,
  74. List.of(
  75. new AlterConfigOp(new ConfigEntry("compression.type", "gzip"), AlterConfigOp.OpType.SET),
  76. new AlterConfigOp(new ConfigEntry("retention.bytes", "12345678"), AlterConfigOp.OpType.SET)
  77. )
  78. )
  79. ).all().get();
  80. StepVerifier.create(
  81. reactiveAdminClient.updateTopicConfig(
  82. topic,
  83. Map.of(
  84. "compression.type", "snappy", //changing existing config
  85. "file.delete.delay.ms", "12345" // adding new one
  86. )
  87. )
  88. ).expectComplete().verify();
  89. Config config = adminClient.describeConfigs(List.of(configResource)).values().get(configResource).get();
  90. assertThat(config.get("retention.bytes").value()).isNotEqualTo("12345678"); // wes reset to default
  91. assertThat(config.get("compression.type").value()).isEqualTo("snappy");
  92. assertThat(config.get("file.delete.delay.ms").value()).isEqualTo("12345");
  93. }
  94. @SneakyThrows
  95. void createTopics(NewTopic... topics) {
  96. adminClient.createTopics(List.of(topics)).all().get();
  97. clearings.add(() -> adminClient.deleteTopics(Stream.of(topics).map(NewTopic::name).toList()).all().get());
  98. }
  99. void fillTopic(String topic, int msgsCnt) {
  100. try (var producer = KafkaTestProducer.forKafka(kafka)) {
  101. for (int i = 0; i < msgsCnt; i++) {
  102. producer.send(topic, UUID.randomUUID().toString());
  103. }
  104. }
  105. }
  106. @Test
  107. void testToMonoWithExceptionFilter() {
  108. var failedFuture = new KafkaFutureImpl<String>();
  109. failedFuture.completeExceptionally(new UnknownTopicOrPartitionException());
  110. var okFuture = new KafkaFutureImpl<String>();
  111. okFuture.complete("done");
  112. var emptyFuture = new KafkaFutureImpl<String>();
  113. emptyFuture.complete(null);
  114. Map<String, KafkaFuture<String>> arg = Map.of(
  115. "failure", failedFuture,
  116. "ok", okFuture,
  117. "empty", emptyFuture
  118. );
  119. StepVerifier.create(toMonoWithExceptionFilter(arg, UnknownTopicOrPartitionException.class))
  120. .assertNext(result -> assertThat(result).hasSize(1).containsEntry("ok", "done"))
  121. .verifyComplete();
  122. }
  123. @Test
  124. void filterPartitionsWithLeaderCheckSkipsPartitionsFromTopicWhereSomePartitionsHaveNoLeader() {
  125. var filteredPartitions = ReactiveAdminClient.filterPartitionsWithLeaderCheck(
  126. List.of(
  127. // contains partitions with no leader
  128. new TopicDescription("noLeaderTopic", false,
  129. List.of(
  130. new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
  131. new TopicPartitionInfo(1, null, List.of(), List.of()))),
  132. // should be skipped by predicate
  133. new TopicDescription("skippingByPredicate", false,
  134. List.of(
  135. new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))),
  136. // good topic
  137. new TopicDescription("good", false,
  138. List.of(
  139. new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
  140. new TopicPartitionInfo(1, new Node(2, "n2", 9092), List.of(), List.of()))
  141. )),
  142. p -> !p.topic().equals("skippingByPredicate"),
  143. false
  144. );
  145. assertThat(filteredPartitions)
  146. .containsExactlyInAnyOrder(
  147. new TopicPartition("good", 0),
  148. new TopicPartition("good", 1)
  149. );
  150. }
  151. @Test
  152. void filterPartitionsWithLeaderCheckThrowExceptionIfThereIsSomePartitionsWithoutLeaderAndFlagSet() {
  153. ThrowingCallable call = () -> ReactiveAdminClient.filterPartitionsWithLeaderCheck(
  154. List.of(
  155. // contains partitions with no leader
  156. new TopicDescription("t1", false,
  157. List.of(
  158. new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
  159. new TopicPartitionInfo(1, null, List.of(), List.of()))),
  160. new TopicDescription("t2", false,
  161. List.of(
  162. new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))
  163. )),
  164. p -> true,
  165. // setting failOnNoLeader flag
  166. true
  167. );
  168. assertThatThrownBy(call).isInstanceOf(ValidationException.class);
  169. }
  170. @Test
  171. void testListOffsetsUnsafe() {
  172. String topic = UUID.randomUUID().toString();
  173. createTopics(new NewTopic(topic, 2, (short) 1));
  174. // sending messages to have non-zero offsets for tp
  175. try (var producer = KafkaTestProducer.forKafka(kafka)) {
  176. producer.send(new ProducerRecord<>(topic, 1, "k", "v"));
  177. producer.send(new ProducerRecord<>(topic, 1, "k", "v"));
  178. }
  179. var requestedPartitions = List.of(
  180. new TopicPartition(topic, 0),
  181. new TopicPartition(topic, 1)
  182. );
  183. StepVerifier.create(reactiveAdminClient.listOffsetsUnsafe(requestedPartitions, OffsetSpec.earliest()))
  184. .assertNext(offsets -> {
  185. assertThat(offsets)
  186. .hasSize(2)
  187. .containsEntry(new TopicPartition(topic, 0), 0L)
  188. .containsEntry(new TopicPartition(topic, 1), 0L);
  189. })
  190. .verifyComplete();
  191. StepVerifier.create(reactiveAdminClient.listOffsetsUnsafe(requestedPartitions, OffsetSpec.latest()))
  192. .assertNext(offsets -> {
  193. assertThat(offsets)
  194. .hasSize(2)
  195. .containsEntry(new TopicPartition(topic, 0), 0L)
  196. .containsEntry(new TopicPartition(topic, 1), 2L);
  197. })
  198. .verifyComplete();
  199. }
  200. @Test
  201. void testListConsumerGroupOffsets() throws Exception {
  202. String topic = UUID.randomUUID().toString();
  203. String anotherTopic = UUID.randomUUID().toString();
  204. createTopics(new NewTopic(topic, 2, (short) 1), new NewTopic(anotherTopic, 1, (short) 1));
  205. fillTopic(topic, 10);
  206. Function<String, KafkaConsumer<String, String>> consumerSupplier = groupName -> {
  207. Properties p = new Properties();
  208. p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
  209. p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupName);
  210. p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  211. p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  212. p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  213. p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  214. return new KafkaConsumer<String, String>(p);
  215. };
  216. String fullyPolledConsumer = UUID.randomUUID().toString();
  217. try (KafkaConsumer<String, String> c = consumerSupplier.apply(fullyPolledConsumer)) {
  218. c.subscribe(List.of(topic));
  219. int polled = 0;
  220. while (polled < 10) {
  221. polled += c.poll(Duration.ofMillis(50)).count();
  222. }
  223. c.commitSync();
  224. }
  225. String polled1MsgConsumer = UUID.randomUUID().toString();
  226. try (KafkaConsumer<String, String> c = consumerSupplier.apply(polled1MsgConsumer)) {
  227. c.subscribe(List.of(topic));
  228. c.poll(Duration.ofMillis(100));
  229. c.commitSync(Map.of(tp(topic, 0), new OffsetAndMetadata(1)));
  230. }
  231. String noCommitConsumer = UUID.randomUUID().toString();
  232. try (KafkaConsumer<String, String> c = consumerSupplier.apply(noCommitConsumer)) {
  233. c.subscribe(List.of(topic));
  234. c.poll(Duration.ofMillis(100));
  235. }
  236. Map<TopicPartition, ListOffsetsResultInfo> endOffsets = adminClient.listOffsets(Map.of(
  237. tp(topic, 0), OffsetSpec.latest(),
  238. tp(topic, 1), OffsetSpec.latest())).all().get();
  239. StepVerifier.create(
  240. reactiveAdminClient.listConsumerGroupOffsets(
  241. List.of(fullyPolledConsumer, polled1MsgConsumer, noCommitConsumer),
  242. List.of(
  243. tp(topic, 0),
  244. tp(topic, 1),
  245. tp(anotherTopic, 0))
  246. )
  247. ).assertNext(table -> {
  248. assertThat(table.row(polled1MsgConsumer))
  249. .containsEntry(tp(topic, 0), 1L)
  250. .hasSize(1);
  251. assertThat(table.row(noCommitConsumer))
  252. .isEmpty();
  253. assertThat(table.row(fullyPolledConsumer))
  254. .containsEntry(tp(topic, 0), endOffsets.get(tp(topic, 0)).offset())
  255. .containsEntry(tp(topic, 1), endOffsets.get(tp(topic, 1)).offset())
  256. .hasSize(2);
  257. })
  258. .verifyComplete();
  259. }
  260. private static TopicPartition tp(String topic, int partition) {
  261. return new TopicPartition(topic, partition);
  262. }
  263. }