This commit is contained in:
iliax 2023-08-05 17:25:46 +04:00
parent 8d30d14458
commit 8eb6d7073e

View file

@ -61,19 +61,19 @@ public class SeekOperations {
*/ */
@VisibleForTesting @VisibleForTesting
static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer, static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
OffsetsInfo offsetsInfo, OffsetsInfo offsetsInfo,
SeekTypeDTO seekType, SeekTypeDTO seekType,
@Nullable Map<TopicPartition, Long> seekTo) { @Nullable Map<TopicPartition, Long> seekTo) {
switch (seekType) { switch (seekType) {
case LATEST: case LATEST:
return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions()); return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
case BEGINNING: case BEGINNING:
return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions()); return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
case OFFSET: case OFFSET:
Preconditions.checkNotNull(offsetsInfo); Preconditions.checkNotNull(seekTo);
return fixOffsets(offsetsInfo, seekTo); return fixOffsets(offsetsInfo, seekTo);
case TIMESTAMP: case TIMESTAMP:
Preconditions.checkNotNull(offsetsInfo); Preconditions.checkNotNull(seekTo);
return offsetsForTimestamp(consumer, offsetsInfo, seekTo); return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
default: default:
throw new IllegalStateException(); throw new IllegalStateException();