ISSUE-297: Sonar warnings fixes (#298)

* ISSUE-297: Sonar warnings fixes
This commit is contained in:
iliax 2021-03-23 21:54:03 +03:00 committed by GitHub
parent 106c42e4cc
commit 31fe89d1c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 116 additions and 66 deletions

View file

@ -1,6 +1,6 @@
package com.provectus.kafka.ui.config; package com.provectus.kafka.ui.config;
import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.ReadOnlyModeException; import com.provectus.kafka.ui.exception.ReadOnlyModeException;
import com.provectus.kafka.ui.service.ClustersStorage; import com.provectus.kafka.ui.service.ClustersStorage;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -39,7 +39,8 @@ public class ReadOnlyModeFilter implements WebFilter {
var clusterName = matcher.group("clusterName"); var clusterName = matcher.group("clusterName");
var kafkaCluster = clustersStorage.getClusterByName(clusterName) var kafkaCluster = clustersStorage.getClusterByName(clusterName)
.orElseThrow( .orElseThrow(
() -> new NotFoundException(String.format("No cluster for name '%s'", clusterName))); () -> new ClusterNotFoundException(
String.format("No cluster for name '%s'", clusterName)));
if (!kafkaCluster.getReadOnly()) { if (!kafkaCluster.getReadOnly()) {
return chain.filter(exchange); return chain.filter(exchange);

View file

@ -32,7 +32,6 @@ public class ConsumerGroupsController implements ConsumerGroupsApi {
return clusterService.getConsumerGroups(clusterName) return clusterService.getConsumerGroups(clusterName)
.map(Flux::fromIterable) .map(Flux::fromIterable)
.map(ResponseEntity::ok) .map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound() .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
.build())); // TODO: check behaviour on cluster not found and empty groups list
} }
} }

View file

@ -0,0 +1,17 @@
package com.provectus.kafka.ui.exception;
public class ClusterNotFoundException extends CustomBaseException {
public ClusterNotFoundException() {
super("Cluster not found");
}
public ClusterNotFoundException(String message) {
super(message);
}
@Override
public ErrorCode getErrorCode() {
return ErrorCode.CLUSTER_NOT_FOUND;
}
}

View file

@ -0,0 +1,13 @@
package com.provectus.kafka.ui.exception;
public class ConnectNotFoundException extends CustomBaseException {
public ConnectNotFoundException() {
super("Connect not found");
}
@Override
public ErrorCode getErrorCode() {
return ErrorCode.CONNECT_NOT_FOUND;
}
}

View file

@ -2,22 +2,23 @@ package com.provectus.kafka.ui.exception;
public abstract class CustomBaseException extends RuntimeException { public abstract class CustomBaseException extends RuntimeException {
public CustomBaseException() { protected CustomBaseException() {
super();
} }
public CustomBaseException(String message) { protected CustomBaseException(String message) {
super(message); super(message);
} }
public CustomBaseException(String message, Throwable cause) { protected CustomBaseException(String message, Throwable cause) {
super(message, cause); super(message, cause);
} }
public CustomBaseException(Throwable cause) { protected CustomBaseException(Throwable cause) {
super(cause); super(cause);
} }
public CustomBaseException(String message, Throwable cause, boolean enableSuppression, protected CustomBaseException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) { boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace); super(message, cause, enableSuppression, writableStackTrace);
} }

View file

@ -10,11 +10,14 @@ public enum ErrorCode {
UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR), UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR),
BINDING_FAIL(4001, HttpStatus.BAD_REQUEST), BINDING_FAIL(4001, HttpStatus.BAD_REQUEST),
VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST), VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST),
ENTITY_NOT_FOUND(4003, HttpStatus.NOT_FOUND), READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED),
READ_ONLY_MODE_ENABLE(4004, HttpStatus.METHOD_NOT_ALLOWED), REBALANCE_IN_PROGRESS(4004, HttpStatus.CONFLICT),
REBALANCE_IN_PROGRESS(4005, HttpStatus.CONFLICT), DUPLICATED_ENTITY(4005, HttpStatus.CONFLICT),
DUPLICATED_ENTITY(4006, HttpStatus.CONFLICT), UNPROCESSABLE_ENTITY(4006, HttpStatus.UNPROCESSABLE_ENTITY),
UNPROCESSABLE_ENTITY(4007, HttpStatus.UNPROCESSABLE_ENTITY); CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND),
TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND),
SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND),
CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND);
static { static {
// codes uniqueness check // codes uniqueness check

View file

@ -1,14 +0,0 @@
package com.provectus.kafka.ui.exception;
public class NotFoundException extends CustomBaseException {
public NotFoundException(String message) {
super(message);
}
@Override
public ErrorCode getErrorCode() {
return ErrorCode.ENTITY_NOT_FOUND;
}
}

View file

@ -0,0 +1,17 @@
package com.provectus.kafka.ui.exception;
public class SchemaNotFoundException extends CustomBaseException {
public SchemaNotFoundException() {
super("Schema not found");
}
public SchemaNotFoundException(String message) {
super(message);
}
@Override
public ErrorCode getErrorCode() {
return ErrorCode.SCHEMA_NOT_FOUND;
}
}

View file

@ -0,0 +1,13 @@
package com.provectus.kafka.ui.exception;
public class TopicNotFoundException extends CustomBaseException {
public TopicNotFoundException() {
super("Topic not found");
}
@Override
public ErrorCode getErrorCode() {
return ErrorCode.TOPIC_NOT_FOUND;
}
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.Broker; import com.provectus.kafka.ui.model.Broker;
import com.provectus.kafka.ui.model.BrokerMetrics; import com.provectus.kafka.ui.model.BrokerMetrics;
@ -88,7 +89,7 @@ public class ClusterService {
int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage; var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
var cluster = clustersStorage.getClusterByName(name) var cluster = clustersStorage.getClusterByName(name)
.orElseThrow(() -> new NotFoundException("No such cluster")); .orElseThrow(ClusterNotFoundException::new);
var totalPages = (cluster.getTopics().size() / perPage) var totalPages = (cluster.getTopics().size() / perPage)
+ (cluster.getTopics().size() % perPage == 0 ? 0 : 1); + (cluster.getTopics().size() % perPage == 0 ? 0 : 1);
return new TopicsResponse() return new TopicsResponse()
@ -178,11 +179,10 @@ public class ClusterService {
} }
} }
@SneakyThrows
public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) { public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
return clustersStorage.getClusterByName(clusterName) return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
.map(kafkaService::getConsumerGroups) .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
.orElse(Mono.empty()); .flatMap(kafkaService::getConsumerGroups);
} }
public Flux<Broker> getBrokers(String clusterName) { public Flux<Broker> getBrokers(String clusterName) {
@ -211,10 +211,10 @@ public class ClusterService {
public Mono<Void> deleteTopic(String clusterName, String topicName) { public Mono<Void> deleteTopic(String clusterName, String topicName) {
var cluster = clustersStorage.getClusterByName(clusterName) var cluster = clustersStorage.getClusterByName(clusterName)
.orElseThrow(() -> new NotFoundException("No such cluster")); .orElseThrow(ClusterNotFoundException::new);
getTopicDetails(clusterName, topicName) var topic = getTopicDetails(clusterName, topicName)
.orElseThrow(() -> new NotFoundException("No such topic")); .orElseThrow(TopicNotFoundException::new);
return kafkaService.deleteTopic(cluster, topicName) return kafkaService.deleteTopic(cluster, topic.getName())
.doOnNext(t -> updateCluster(topicName, clusterName, cluster)); .doOnNext(t -> updateCluster(topicName, clusterName, cluster));
} }
@ -243,9 +243,9 @@ public class ClusterService {
public Mono<Void> deleteTopicMessages(String clusterName, String topicName, public Mono<Void> deleteTopicMessages(String clusterName, String topicName,
List<Integer> partitions) { List<Integer> partitions) {
var cluster = clustersStorage.getClusterByName(clusterName) var cluster = clustersStorage.getClusterByName(clusterName)
.orElseThrow(() -> new NotFoundException("No such cluster")); .orElseThrow(ClusterNotFoundException::new);
if (!cluster.getTopics().containsKey(topicName)) { if (!cluster.getTopics().containsKey(topicName)) {
throw new NotFoundException("No such topic"); throw new TopicNotFoundException();
} }
return consumingService.offsetsForDeletion(cluster, topicName, partitions) return consumingService.offsetsForDeletion(cluster, topicName, partitions)
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets)); .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));

View file

@ -1,7 +1,8 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.client.KafkaConnectClients; import com.provectus.kafka.ui.client.KafkaConnectClients;
import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.ConnectNotFoundException;
import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.mapper.KafkaConnectMapper; import com.provectus.kafka.ui.mapper.KafkaConnectMapper;
import com.provectus.kafka.ui.model.Connect; import com.provectus.kafka.ui.model.Connect;
@ -181,7 +182,7 @@ public class KafkaConnectService {
private Mono<KafkaCluster> getCluster(String clusterName) { private Mono<KafkaCluster> getCluster(String clusterName) {
return clustersStorage.getClusterByName(clusterName) return clustersStorage.getClusterByName(clusterName)
.map(Mono::just) .map(Mono::just)
.orElse(Mono.error(new NotFoundException("No such cluster"))); .orElse(Mono.error(ClusterNotFoundException::new));
} }
private Mono<String> getConnectAddress(String clusterName, String connectName) { private Mono<String> getConnectAddress(String clusterName, String connectName) {
@ -194,7 +195,7 @@ public class KafkaConnectService {
) )
.flatMap(connect -> connect .flatMap(connect -> connect
.map(Mono::just) .map(Mono::just)
.orElse(Mono.error(new NotFoundException("No such connect cluster"))) .orElse(Mono.error(ConnectNotFoundException::new))
); );
} }
} }

View file

@ -351,6 +351,7 @@ public class KafkaService {
.all(), topicCr.name()); .all(), topicCr.name());
} }
@SuppressWarnings("deprecation")
private Mono<String> alterConfig(TopicFormData topicFormData, ConfigResource topicCr, private Mono<String> alterConfig(TopicFormData topicFormData, ConfigResource topicCr,
ExtendedAdminClient ac) { ExtendedAdminClient ac) {
List<ConfigEntry> configEntries = topicFormData.getConfigs().entrySet().stream() List<ConfigEntry> configEntries = topicFormData.getConfigs().entrySet().stream()
@ -359,7 +360,6 @@ public class KafkaService {
Config config = new Config(configEntries); Config config = new Config(configEntries);
Map<ConfigResource, Config> map = Collections.singletonMap(topicCr, config); Map<ConfigResource, Config> map = Collections.singletonMap(topicCr, config);
return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name()); return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name());
} }
private InternalTopic mergeWithStats(InternalTopic topic, private InternalTopic mergeWithStats(InternalTopic topic,

View file

@ -3,8 +3,9 @@ package com.provectus.kafka.ui.service;
import static org.springframework.http.HttpStatus.NOT_FOUND; import static org.springframework.http.HttpStatus.NOT_FOUND;
import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY; import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.DuplicateEntityException; import com.provectus.kafka.ui.exception.DuplicateEntityException;
import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.SchemaNotFoundException;
import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.CompatibilityCheckResponse; import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
@ -39,7 +40,6 @@ import reactor.core.publisher.Mono;
public class SchemaRegistryService { public class SchemaRegistryService {
public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s"; public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s";
public static final String NO_SUCH_SCHEMA = "No such schema %s"; public static final String NO_SUCH_SCHEMA = "No such schema %s";
public static final String NO_SUCH_CLUSTER = "No such cluster";
private static final String URL_SUBJECTS = "/subjects"; private static final String URL_SUBJECTS = "/subjects";
private static final String URL_SUBJECT = "/subjects/{schemaName}"; private static final String URL_SUBJECT = "/subjects/{schemaName}";
@ -66,7 +66,7 @@ public class SchemaRegistryService {
.bodyToMono(String[].class) .bodyToMono(String[].class)
.doOnError(log::error) .doOnError(log::error)
) )
.orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); .orElse(Mono.error(ClusterNotFoundException::new));
} }
public Flux<SchemaSubject> getAllVersionsBySubject(String clusterName, String subject) { public Flux<SchemaSubject> getAllVersionsBySubject(String clusterName, String subject) {
@ -82,7 +82,7 @@ public class SchemaRegistryService {
.onStatus(NOT_FOUND::equals, .onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA)) throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA))
).bodyToFlux(Integer.class) ).bodyToFlux(Integer.class)
).orElse(Flux.error(new NotFoundException(NO_SUCH_CLUSTER))); ).orElse(Flux.error(ClusterNotFoundException::new));
} }
public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName,
@ -113,7 +113,7 @@ public class SchemaRegistryService {
return schema; return schema;
}) })
) )
.orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); .orElse(Mono.error(ClusterNotFoundException::new));
} }
/** /**
@ -145,7 +145,7 @@ public class SchemaRegistryService {
.onStatus(NOT_FOUND::equals, .onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
).toBodilessEntity() ).toBodilessEntity()
).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); ).orElse(Mono.error(ClusterNotFoundException::new));
} }
public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(String clusterName, public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(String clusterName,
@ -158,7 +158,7 @@ public class SchemaRegistryService {
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)) throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))
) )
.toBodilessEntity()) .toBodilessEntity())
.orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); .orElse(Mono.error(ClusterNotFoundException::new));
} }
/** /**
@ -181,7 +181,7 @@ public class SchemaRegistryService {
.flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl)) .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl))
.flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject)) .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject))
) )
.orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); .orElse(Mono.error(ClusterNotFoundException::new));
}); });
} }
@ -219,7 +219,7 @@ public class SchemaRegistryService {
@NotNull @NotNull
private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus( private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus(
String formatted) { String formatted) {
return resp -> Mono.error(new NotFoundException(formatted)); return resp -> Mono.error(new SchemaNotFoundException(formatted));
} }
/** /**
@ -241,7 +241,7 @@ public class SchemaRegistryService {
.onStatus(NOT_FOUND::equals, .onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
.bodyToMono(Void.class); .bodyToMono(Void.class);
}).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); }).orElse(Mono.error(ClusterNotFoundException::new));
} }
public Mono<Void> updateSchemaCompatibility(String clusterName, public Mono<Void> updateSchemaCompatibility(String clusterName,
@ -287,7 +287,7 @@ public class SchemaRegistryService {
.bodyToMono(InternalCompatibilityCheck.class) .bodyToMono(InternalCompatibilityCheck.class)
.map(mapper::toCompatibilityCheckResponse) .map(mapper::toCompatibilityCheckResponse)
.log() .log()
).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); ).orElse(Mono.error(ClusterNotFoundException::new));
} }
public String formatted(String str, Object... args) { public String formatted(String str, Object... args) {

View file

@ -158,9 +158,9 @@ public class ClusterUtil {
topic.inSyncReplicas(inSyncReplicasCount); topic.inSyncReplicas(inSyncReplicasCount);
topic.replicationFactor( topic.replicationFactor(
topicDescription.partitions().size() > 0 topicDescription.partitions().isEmpty()
? topicDescription.partitions().get(0).replicas().size() ? 0
: 0 : topicDescription.partitions().get(0).replicas().size()
); );
topic.underReplicatedPartitions(urpCount); topic.underReplicatedPartitions(urpCount);

View file

@ -77,10 +77,9 @@ public class JmxClusterUtil {
var attrNames = msc.getMBeanInfo(name).getAttributes(); var attrNames = msc.getMBeanInfo(name).getAttributes();
for (MBeanAttributeInfo attrName : attrNames) { for (MBeanAttributeInfo attrName : attrNames) {
var value = msc.getAttribute(name, attrName.getName()); var value = msc.getAttribute(name, attrName.getName());
if (value instanceof Number) { if ((value instanceof Number)
if (!(value instanceof Double) || !((Double) value).isInfinite()) { && (!(value instanceof Double) || !((Double) value).isInfinite())) {
resultAttr.put(attrName.getName(), new BigDecimal(value.toString())); resultAttr.put(attrName.getName(), new BigDecimal(value.toString()));
}
} }
} }
} catch (MalformedURLException url) { } catch (MalformedURLException url) {

View file

@ -57,7 +57,7 @@ class OffsetsSeekTest {
new ConsumerPosition(SeekType.BEGINNING, Map.of(0, 0L, 1, 0L))); new ConsumerPosition(SeekType.BEGINNING, Map.of(0, 0L, 1, 0L)));
seek.assignAndSeek(consumer); seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1);
assertThat(consumer.position(tp0)).isEqualTo(0L); assertThat(consumer.position(tp0)).isZero();
assertThat(consumer.position(tp1)).isEqualTo(10L); assertThat(consumer.position(tp1)).isEqualTo(10L);
} }
@ -68,9 +68,9 @@ class OffsetsSeekTest {
new ConsumerPosition(SeekType.BEGINNING, Map.of())); new ConsumerPosition(SeekType.BEGINNING, Map.of()));
seek.assignAndSeek(consumer); seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3);
assertThat(consumer.position(tp0)).isEqualTo(0L); assertThat(consumer.position(tp0)).isZero();
assertThat(consumer.position(tp1)).isEqualTo(10L); assertThat(consumer.position(tp1)).isEqualTo(10L);
assertThat(consumer.position(tp2)).isEqualTo(0L); assertThat(consumer.position(tp2)).isZero();
assertThat(consumer.position(tp3)).isEqualTo(25L); assertThat(consumer.position(tp3)).isEqualTo(25L);
} }
@ -81,7 +81,7 @@ class OffsetsSeekTest {
new ConsumerPosition(SeekType.OFFSET, Map.of(0, 0L, 1, 1L, 2, 2L))); new ConsumerPosition(SeekType.OFFSET, Map.of(0, 0L, 1, 1L, 2, 2L)));
seek.assignAndSeek(consumer); seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2);
assertThat(consumer.position(tp0)).isEqualTo(0L); assertThat(consumer.position(tp0)).isZero();
assertThat(consumer.position(tp1)).isEqualTo(1L); assertThat(consumer.position(tp1)).isEqualTo(1L);
assertThat(consumer.position(tp2)).isEqualTo(2L); assertThat(consumer.position(tp2)).isEqualTo(2L);
} }