SchemaRegistryPaginationTest.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import static org.mockito.ArgumentMatchers.anyList;
  4. import static org.mockito.ArgumentMatchers.isA;
  5. import static org.mockito.Mockito.mock;
  6. import static org.mockito.Mockito.when;
  7. import com.provectus.kafka.ui.controller.SchemasController;
  8. import com.provectus.kafka.ui.model.KafkaCluster;
  9. import com.provectus.kafka.ui.model.SchemaSubjectDTO;
  10. import com.provectus.kafka.ui.service.audit.AuditService;
  11. import com.provectus.kafka.ui.sr.model.Compatibility;
  12. import com.provectus.kafka.ui.sr.model.SchemaSubject;
  13. import com.provectus.kafka.ui.util.AccessControlServiceMock;
  14. import com.provectus.kafka.ui.util.ReactiveFailover;
  15. import java.util.Comparator;
  16. import java.util.List;
  17. import java.util.Optional;
  18. import java.util.stream.IntStream;
  19. import org.junit.jupiter.api.Test;
  20. import reactor.core.publisher.Mono;
  21. public class SchemaRegistryPaginationTest {
  22. private static final String LOCAL_KAFKA_CLUSTER_NAME = "local";
  23. private SchemasController controller;
  24. private void init(List<String> subjects) {
  25. ClustersStorage clustersStorage = mock(ClustersStorage.class);
  26. when(clustersStorage.getClusterByName(isA(String.class)))
  27. .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
  28. SchemaRegistryService schemaRegistryService = mock(SchemaRegistryService.class);
  29. when(schemaRegistryService.getAllSubjectNames(isA(KafkaCluster.class)))
  30. .thenReturn(Mono.just(subjects));
  31. when(schemaRegistryService
  32. .getAllLatestVersionSchemas(isA(KafkaCluster.class), anyList())).thenCallRealMethod();
  33. when(schemaRegistryService.getLatestSchemaVersionBySubject(isA(KafkaCluster.class), isA(String.class)))
  34. .thenAnswer(a -> Mono.just(
  35. new SchemaRegistryService.SubjectWithCompatibilityLevel(
  36. new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
  37. this.controller = new SchemasController(schemaRegistryService);
  38. this.controller.setAccessControlService(new AccessControlServiceMock().getMock());
  39. this.controller.setAuditService(mock(AuditService.class));
  40. this.controller.setClustersStorage(clustersStorage);
  41. }
  42. @Test
  43. void shouldListFirst25andThen10Schemas() {
  44. init(
  45. IntStream.rangeClosed(1, 100)
  46. .boxed()
  47. .map(num -> "subject" + num)
  48. .toList()
  49. );
  50. var schemasFirst25 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  51. null, null, null, null).block();
  52. assertThat(schemasFirst25.getBody().getPageCount()).isEqualTo(4);
  53. assertThat(schemasFirst25.getBody().getSchemas()).hasSize(25);
  54. assertThat(schemasFirst25.getBody().getSchemas())
  55. .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
  56. var schemasFirst10 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  57. null, 10, null, null).block();
  58. assertThat(schemasFirst10.getBody().getPageCount()).isEqualTo(10);
  59. assertThat(schemasFirst10.getBody().getSchemas()).hasSize(10);
  60. assertThat(schemasFirst10.getBody().getSchemas())
  61. .isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
  62. }
  63. @Test
  64. void shouldListSchemasContaining_1() {
  65. init(
  66. IntStream.rangeClosed(1, 100)
  67. .boxed()
  68. .map(num -> "subject" + num)
  69. .toList()
  70. );
  71. var schemasSearch7 = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  72. null, null, "1", null).block();
  73. assertThat(schemasSearch7.getBody().getPageCount()).isEqualTo(1);
  74. assertThat(schemasSearch7.getBody().getSchemas()).hasSize(20);
  75. }
  76. @Test
  77. void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
  78. init(
  79. IntStream.rangeClosed(1, 100)
  80. .boxed()
  81. .map(num -> "subject" + num)
  82. .toList()
  83. );
  84. var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  85. 0, -1, null, null).block();
  86. assertThat(schemas.getBody().getPageCount()).isEqualTo(4);
  87. assertThat(schemas.getBody().getSchemas()).hasSize(25);
  88. assertThat(schemas.getBody().getSchemas()).isSortedAccordingTo(Comparator.comparing(SchemaSubjectDTO::getSubject));
  89. }
  90. @Test
  91. void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
  92. init(
  93. IntStream.rangeClosed(1, 100)
  94. .boxed()
  95. .map(num -> "subject" + num)
  96. .toList()
  97. );
  98. var schemas = controller.getSchemas(LOCAL_KAFKA_CLUSTER_NAME,
  99. 4, 33, null, null).block();
  100. assertThat(schemas.getBody().getPageCount()).isEqualTo(4);
  101. assertThat(schemas.getBody().getSchemas()).hasSize(1);
  102. assertThat(schemas.getBody().getSchemas().get(0).getSubject()).isEqualTo("subject99");
  103. }
  104. private KafkaCluster buildKafkaCluster(String clusterName) {
  105. return KafkaCluster.builder()
  106. .name(clusterName)
  107. .schemaRegistryClient(mock(ReactiveFailover.class))
  108. .build();
  109. }
  110. }