fix #416 : modify mapping check
This commit is contained in:
parent
bf0566cf4d
commit
e0b76ad035
1 changed files with 57 additions and 51 deletions
|
@ -367,8 +367,7 @@ public class FessEsClient implements Client {
|
|||
}
|
||||
|
||||
final GetMappingsResponse getMappingsResponse =
|
||||
client.admin().indices().prepareGetMappings(configIndex).setTypes(configType).execute()
|
||||
.actionGet(fessConfig.getIndexIndicesTimeout());
|
||||
client.admin().indices().prepareGetMappings(configIndex).execute().actionGet(fessConfig.getIndexIndicesTimeout());
|
||||
final ImmutableOpenMap<String, MappingMetaData> indexMappings = getMappingsResponse.mappings().get(configIndex);
|
||||
if (indexMappings == null || !indexMappings.containsKey(configType)) {
|
||||
String source = null;
|
||||
|
@ -378,57 +377,22 @@ public class FessEsClient implements Client {
|
|||
} catch (final Exception e) {
|
||||
logger.warn(mappingFile + " is not found.", e);
|
||||
}
|
||||
final PutMappingResponse putMappingResponse =
|
||||
client.admin().indices().preparePutMapping(configIndex).setType(configType).setSource(source).execute()
|
||||
.actionGet(fessConfig.getIndexIndicesTimeout());
|
||||
if (putMappingResponse.isAcknowledged()) {
|
||||
logger.info("Created " + configIndex + "/" + configType + " mapping.");
|
||||
} else {
|
||||
logger.warn("Failed to create " + configIndex + "/" + configType + " mapping.");
|
||||
}
|
||||
|
||||
final String dataPath = indexConfigPath + "/" + configIndex + "/" + configType + ".bulk";
|
||||
if (ResourceUtil.isExist(dataPath)) {
|
||||
try {
|
||||
final BulkRequestBuilder builder = client.prepareBulk();
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
Arrays.stream(FileUtil.readUTF8(dataPath).split("\n")).reduce(
|
||||
(prev, line) -> {
|
||||
try {
|
||||
if (StringUtil.isBlank(prev)) {
|
||||
final Map<String, Map<String, String>> result =
|
||||
mapper.readValue(line, new TypeReference<Map<String, Map<String, String>>>() {
|
||||
});
|
||||
if (result.keySet().contains("index")) {
|
||||
return line;
|
||||
} else if (result.keySet().contains("update")) {
|
||||
return line;
|
||||
} else if (result.keySet().contains("delete")) {
|
||||
return StringUtil.EMPTY;
|
||||
}
|
||||
} else {
|
||||
final Map<String, Map<String, String>> result =
|
||||
mapper.readValue(prev, new TypeReference<Map<String, Map<String, String>>>() {
|
||||
});
|
||||
if (result.keySet().contains("index")) {
|
||||
final IndexRequestBuilder requestBuilder =
|
||||
client.prepareIndex(configIndex, configType, result.get("index").get("_id"))
|
||||
.setSource(line);
|
||||
builder.add(requestBuilder);
|
||||
}
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to parse " + dataPath);
|
||||
}
|
||||
return StringUtil.EMPTY;
|
||||
});
|
||||
final BulkResponse response = builder.execute().actionGet(fessConfig.getIndexBulkTimeout());
|
||||
if (response.hasFailures()) {
|
||||
logger.warn("Failed to register " + dataPath + ": " + response.buildFailureMessage());
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
try {
|
||||
final PutMappingResponse putMappingResponse =
|
||||
client.admin().indices().preparePutMapping(configIndex).setType(configType).setSource(source).execute()
|
||||
.actionGet(fessConfig.getIndexIndicesTimeout());
|
||||
if (putMappingResponse.isAcknowledged()) {
|
||||
logger.info("Created " + configIndex + "/" + configType + " mapping.");
|
||||
} else {
|
||||
logger.warn("Failed to create " + configIndex + "/" + configType + " mapping.");
|
||||
}
|
||||
|
||||
final String dataPath = indexConfigPath + "/" + configIndex + "/" + configType + ".bulk";
|
||||
if (ResourceUtil.isExist(dataPath)) {
|
||||
insertBulkData(fessConfig, configIndex, configType, dataPath);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to create " + configIndex + "/" + configType + " mapping.", e);
|
||||
}
|
||||
} else if (logger.isDebugEnabled()) {
|
||||
logger.debug(configIndex + "/" + configType + " mapping exists.");
|
||||
|
@ -439,6 +403,48 @@ public class FessEsClient implements Client {
|
|||
}) ;
|
||||
}
|
||||
|
||||
protected void insertBulkData(final FessConfig fessConfig, final String configIndex, final String configType, final String dataPath) {
|
||||
try {
|
||||
final BulkRequestBuilder builder = client.prepareBulk();
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
Arrays.stream(FileUtil.readUTF8(dataPath).split("\n")).reduce(
|
||||
(prev, line) -> {
|
||||
try {
|
||||
if (StringUtil.isBlank(prev)) {
|
||||
final Map<String, Map<String, String>> result =
|
||||
mapper.readValue(line, new TypeReference<Map<String, Map<String, String>>>() {
|
||||
});
|
||||
if (result.keySet().contains("index")) {
|
||||
return line;
|
||||
} else if (result.keySet().contains("update")) {
|
||||
return line;
|
||||
} else if (result.keySet().contains("delete")) {
|
||||
return StringUtil.EMPTY;
|
||||
}
|
||||
} else {
|
||||
final Map<String, Map<String, String>> result =
|
||||
mapper.readValue(prev, new TypeReference<Map<String, Map<String, String>>>() {
|
||||
});
|
||||
if (result.keySet().contains("index")) {
|
||||
final IndexRequestBuilder requestBuilder =
|
||||
client.prepareIndex(configIndex, configType, result.get("index").get("_id")).setSource(line);
|
||||
builder.add(requestBuilder);
|
||||
}
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to parse " + dataPath);
|
||||
}
|
||||
return StringUtil.EMPTY;
|
||||
});
|
||||
final BulkResponse response = builder.execute().actionGet(fessConfig.getIndexBulkTimeout());
|
||||
if (response.hasFailures()) {
|
||||
logger.warn("Failed to register " + dataPath + ": " + response.buildFailureMessage());
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to create " + configIndex + "/" + configType + " mapping.");
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForYellowStatus() {
|
||||
final ClusterHealthResponse response =
|
||||
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute()
|
||||
|
|
Loading…
Add table
Reference in a new issue