Add a failover recovery for empty cluster versions (#2473)

* Add a failover recovery for empty cluster versions

* Review suggestions
This commit is contained in:
Roman Zabaluev 2022-08-30 12:06:05 +04:00 committed by GitHub
parent 56e4cbf60f
commit 9e1f8d773f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 6 deletions

View file

@ -90,10 +90,15 @@ public class ReactiveAdminClient implements Closeable {
}
private static SupportedFeature getSupportedUpdateFeatureForVersion(String versionStr) {
try {
float version = NumberUtil.parserClusterVersion(versionStr);
return version <= 2.3f
? SupportedFeature.ALTER_CONFIGS
: SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
} catch (NumberFormatException e) {
log.info("Assuming non-incremental alter configs due to version parsing error");
return SupportedFeature.ALTER_CONFIGS;
}
}
//TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here

View file

@ -13,7 +13,8 @@ public class NumberUtil {
return value != null && NumberUtils.isCreatable(value.toString());
}
public static float parserClusterVersion(String version) {
public static float parserClusterVersion(String version) throws NumberFormatException {
log.trace("Parsing cluster version [{}]", version);
try {
final String[] parts = version.split("\\.");
if (parts.length > 2) {
@ -21,7 +22,7 @@ public class NumberUtil {
}
return Float.parseFloat(version.split("-")[0]);
} catch (Exception e) {
log.error("Conversion clusterVersion {} to float value failed", version);
log.error("Conversion clusterVersion [{}] to float value failed", version, e);
throw e;
}
}