SeekOperationsTest.java 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package com.provectus.kafka.ui.emitter;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import com.provectus.kafka.ui.model.SeekTypeDTO;
  4. import java.util.Map;
  5. import java.util.stream.Collectors;
  6. import java.util.stream.Stream;
  7. import org.apache.kafka.clients.consumer.MockConsumer;
  8. import org.apache.kafka.clients.consumer.OffsetResetStrategy;
  9. import org.apache.kafka.common.PartitionInfo;
  10. import org.apache.kafka.common.TopicPartition;
  11. import org.apache.kafka.common.utils.Bytes;
  12. import org.junit.jupiter.api.BeforeEach;
  13. import org.junit.jupiter.api.Nested;
  14. import org.junit.jupiter.api.Test;
  15. class SeekOperationsTest {
  16. final String topic = "test";
  17. final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0
  18. final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10
  19. final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20
  20. final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30
  21. MockConsumer<Bytes, Bytes> consumer;
  22. @BeforeEach
  23. void initMockConsumer() {
  24. consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
  25. consumer.updatePartitions(
  26. topic,
  27. Stream.of(tp0, tp1, tp2, tp3)
  28. .map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null))
  29. .collect(Collectors.toList()));
  30. consumer.updateBeginningOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 0L, tp3, 25L));
  31. consumer.updateEndOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L));
  32. }
  33. @Nested
  34. class GetOffsetsForSeek {
  35. @Test
  36. void latest() {
  37. var offsets = SeekOperations.getOffsetsForSeek(
  38. consumer,
  39. new OffsetsInfo(consumer, topic),
  40. SeekTypeDTO.LATEST,
  41. null
  42. );
  43. assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L));
  44. }
  45. @Test
  46. void beginning() {
  47. var offsets = SeekOperations.getOffsetsForSeek(
  48. consumer,
  49. new OffsetsInfo(consumer, topic),
  50. SeekTypeDTO.BEGINNING,
  51. null
  52. );
  53. assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L));
  54. }
  55. @Test
  56. void offsets() {
  57. var offsets = SeekOperations.getOffsetsForSeek(
  58. consumer,
  59. new OffsetsInfo(consumer, topic),
  60. SeekTypeDTO.OFFSET,
  61. Map.of(tp1, 10L, tp2, 10L, tp3, 26L)
  62. );
  63. assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 10L, tp3, 26L));
  64. }
  65. @Test
  66. void offsetsWithBoundsFixing() {
  67. var offsets = SeekOperations.getOffsetsForSeek(
  68. consumer,
  69. new OffsetsInfo(consumer, topic),
  70. SeekTypeDTO.OFFSET,
  71. Map.of(tp1, 10L, tp2, 21L, tp3, 24L)
  72. );
  73. assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 25L));
  74. }
  75. }
  76. }