SchemaRegistryPaginationTest.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import static org.mockito.ArgumentMatchers.anyList;
  4. import static org.mockito.ArgumentMatchers.isA;
  5. import static org.mockito.Mockito.mock;
  6. import static org.mockito.Mockito.when;
  7. import com.provectus.kafka.ui.controller.SchemasController;
  8. import com.provectus.kafka.ui.model.KafkaCluster;
  9. import com.provectus.kafka.ui.model.SchemaSubjectDTO;
  10. import com.provectus.kafka.ui.service.audit.AuditService;
  11. import com.provectus.kafka.ui.sr.model.Compatibility;
  12. import com.provectus.kafka.ui.sr.model.SchemaSubject;
  13. import com.provectus.kafka.ui.util.AccessControlServiceMock;
  14. import com.provectus.kafka.ui.util.ReactiveFailover;
  15. import java.util.Comparator;
  16. import java.util.List;
  17. import java.util.Optional;
  18. import java.util.stream.IntStream;
  19. import org.junit.jupiter.api.Test;
  20. import reactor.core.publisher.Mono;
  21. public class SchemaRegistryPaginationTest {
  22. private static final String LOCAL_KAFKA_CLUSTER_NAME = "local";
  23. private SchemasController controller;
  24. private void init(List<String> subjects) {
  25. ClustersStorage clustersStorage = mock(ClustersStorage.class);
  26. when(clustersStorage.getClusterByName(isA(String.class)))
  27. .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
  28. SchemaRegistryService schemaRegistryService = mock(SchemaRegistryService.class);
  29. when(schemaRegistryService.getAllSubjectNames(isA(KafkaCluster.class)))
  30. .thenReturn(Mono.just(subjects));
  31. when(schemaRegistryService
  32. .getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList())).thenCallRealMethod();
  33. when(schemaRegistryService.getLatestSchemaVersionBySubject(isA(KafkaCluster.class), isA(String.class)))
  34. .thenAnswer(a -> Mono.just(
  35. new SchemaRegistryService.SubjectWithCompatibilityLevel(
  36. new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
  37. this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock(),
  38. mock(AuditService.class));
  39. this.controller.setClustersStorage(clustersStorage);
  40. }
  41. @Test
  42. void shouldListFirst25andThen10Schemas() {
  43. init(
  44. IntStream.rangeClosed(1, 100)
  45. .boxed()
  46. .map(num -> "subject" + num)
  47. .toList()
  48. );
  49. var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  50. null, null, null, null).block();
  51. assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4);
  52. assertThat(schemasFirst25.getBody().getSchemas()).hasSize(25);
  53. assertThat(schemasFirst25.getBody().getSchemas())
  54. .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
  55. var schemasFirst10 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  56. null, 10, null, null).block();
  57. assertThat(schemasFirst10.getBody().getPageCount()).isEqualTo(10);
  58. assertThat(schemasFirst10.getBody().getSchemas()).hasSize(10);
  59. assertThat(schemasFirst10.getBody().getSchemas())
  60. .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
  61. }
  62. @Test
  63. void shouldListSchemasContaining_1() {
  64. init(
  65. IntStream.rangeClosed(1, 100)
  66. .boxed()
  67. .map(num -> "subject" + num)
  68. .toList()
  69. );
  70. var schemasSearch7 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  71. null, null, "1", null).block();
  72. assertThat(schemasSearch7.getBody().getPageCount()).isEqualTo(1);
  73. assertThat(schemasSearch7.getBody().getSchemas()).hasSize(20);
  74. }
  75. @Test
  76. void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
  77. init(
  78. IntStream.rangeClosed(1, 100)
  79. .boxed()
  80. .map(num -> "subject" + num)
  81. .toList()
  82. );
  83. var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  84. 0, -1, null, null).block();
  85. assertThat(schemas.getBody().getPageCount()).isEqualTo(4);
  86. assertThat(schemas.getBody().getSchemas()).hasSize(25);
  87. assertThat(schemas.getBody().getSchemas()).isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
  88. }
  89. @Test
  90. void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
  91. init(
  92. IntStream.rangeClosed(1, 100)
  93. .boxed()
  94. .map(num -> "subject" + num)
  95. .toList()
  96. );
  97. var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  98. 4, 33, null, null).block();
  99. assertThat(schemas.getBody().getPageCount()).isEqualTo(4);
  100. assertThat(schemas.getBody().getSchemas()).hasSize(1);
  101. assertThat(schemas.getBody().getSchemas().get(0).getSubject()).isEqualTo("subject99");
  102. }
  103. private KafkaCluster buildKafkaCluster(String clusterName) {
  104. return KafkaCluster.builder()
  105. .name(clusterName)
  106. .schemaRegistryClient(mock(ReactiveFailover.class))
  107. .build();
  108. }
  109. }