Fix consumers sorting (#2447)

* Fix consumers sorting

* Fix tests

* Review fix

* Review suggestions
This commit is contained in:
Roman Zabaluev 2022-09-02 16:27:33 +04:00 committed by GitHub
parent 91b86b5b78
commit c8306f5970
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 2 deletions

View file

@ -161,7 +161,7 @@ public class ConsumerGroupService {
}; };
return Comparator.comparingInt(statesPriorities); return Comparator.comparingInt(statesPriorities);
case MEMBERS: case MEMBERS:
return Comparator.comparingInt(cg -> -cg.members().size()); return Comparator.comparingInt(cg -> cg.members().size());
default: default:
throw new IllegalStateException("Unsupported order by: " + orderBy); throw new IllegalStateException("Unsupported order by: " + orderBy);
} }

View file

@ -14,6 +14,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import lombok.val; import lombok.val;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -126,6 +127,21 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
assertThat(page.getConsumerGroups()) assertThat(page.getConsumerGroups())
.isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed()); .isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed());
}); });
webTestClient
.get()
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
+ "=cgPageTest&orderBy=MEMBERS&sortOrder=DESC", LOCAL)
.exchange()
.expectStatus()
.isOk()
.expectBody(ConsumerGroupsPageResponseDTO.class)
.value(page -> {
assertThat(page.getPageCount()).isEqualTo(1);
assertThat(page.getConsumerGroups().size()).isEqualTo(5);
assertThat(page.getConsumerGroups())
.isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getMembers).reversed());
});
} }
} }
@ -133,7 +149,7 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
String topicName = createTopicWithRandomName(); String topicName = createTopicWithRandomName();
var consumers = var consumers =
Stream.generate(() -> { Stream.generate(() -> {
String groupId = consumerGroupPrefix + UUID.randomUUID(); String groupId = consumerGroupPrefix + RandomStringUtils.randomAlphabetic(5);
val consumer = createTestConsumerWithGroupId(groupId); val consumer = createTestConsumerWithGroupId(groupId);
consumer.subscribe(List.of(topicName)); consumer.subscribe(List.of(topicName));
consumer.poll(Duration.ofMillis(100)); consumer.poll(Duration.ofMillis(100));

View file

@ -12,6 +12,7 @@ public class KafkaConnectContainer extends GenericContainer<KafkaConnectContaine
public KafkaConnectContainer(String version) { public KafkaConnectContainer(String version) {
super("confluentinc/cp-kafka-connect:" + version); super("confluentinc/cp-kafka-connect:" + version);
addExposedPort(CONNECT_PORT); addExposedPort(CONNECT_PORT);
waitStrategy = Wait.forHttp("/") waitStrategy = Wait.forHttp("/")
.withStartupTimeout(Duration.ofMinutes(5)); .withStartupTimeout(Duration.ofMinutes(5));
} }
@ -37,6 +38,7 @@ public class KafkaConnectContainer extends GenericContainer<KafkaConnectContaine
withEnv("CONNECT_INTERNAL_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter"); withEnv("CONNECT_INTERNAL_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter");
withEnv("CONNECT_INTERNAL_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter"); withEnv("CONNECT_INTERNAL_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter");
withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect"); withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect");
withEnv("CONNECT_REST_PORT", String.valueOf(CONNECT_PORT));
withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java,/usr/share/confluent-hub-components"); withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java,/usr/share/confluent-hub-components");
return self(); return self();
} }