|
@@ -74,6 +74,7 @@ import org.elasticsearch.action.DocWriteRequest;
|
|
|
import org.elasticsearch.action.DocWriteRequest.OpType;
|
|
|
import org.elasticsearch.action.DocWriteResponse.Result;
|
|
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
|
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
|
|
|
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
|
|
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
|
@@ -302,7 +303,7 @@ public class FessEsClient implements Client {
|
|
|
if (values.length == 2) {
|
|
|
final String configIndex = values[0];
|
|
|
final String configType = values[1];
|
|
|
- boolean exists = false;
|
|
|
+
|
|
|
final String indexName;
|
|
|
final boolean isFessIndex = configIndex.equals("fess");
|
|
|
if (isFessIndex) {
|
|
@@ -310,156 +311,224 @@ public class FessEsClient implements Client {
|
|
|
} else {
|
|
|
indexName = configIndex;
|
|
|
}
|
|
|
- try {
|
|
|
- final IndicesExistsResponse response =
|
|
|
- client.admin().indices().prepareExists(indexName).execute().actionGet(fessConfig.getIndexSearchTimeout());
|
|
|
- exists = response.isExists();
|
|
|
- } catch (final Exception e) {
|
|
|
- // ignore
|
|
|
- }
|
|
|
- if (!exists) {
|
|
|
- waitForConfigSyncStatus();
|
|
|
- configListMap.getOrDefault(configIndex, Collections.emptyList()).forEach(
|
|
|
- path -> {
|
|
|
- String source = null;
|
|
|
- final String filePath = indexConfigPath + "/" + configIndex + "/" + path;
|
|
|
- try {
|
|
|
- source = FileUtil.readUTF8(filePath);
|
|
|
- try (CurlResponse response =
|
|
|
- Curl.post(org.codelibs.fess.util.ResourceUtil.getElasticsearchHttpUrl() + "/_configsync/file")
|
|
|
- .header("Content-Type", "application/json").param("path", path).body(source).execute()) {
|
|
|
- if (response.getHttpStatusCode() == 200) {
|
|
|
- logger.info("Register " + path + " to " + configIndex);
|
|
|
- } else {
|
|
|
- if (response.getContentException() != null) {
|
|
|
- logger.warn("Invalid request for " + path + ".", response.getContentException());
|
|
|
- } else {
|
|
|
- logger.warn("Invalid request for " + path + ". The response is "
|
|
|
- + response.getContentAsString());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (final Exception e) {
|
|
|
- logger.warn("Failed to register " + filePath, e);
|
|
|
- }
|
|
|
- });
|
|
|
- try (CurlResponse response =
|
|
|
- Curl.post(org.codelibs.fess.util.ResourceUtil.getElasticsearchHttpUrl() + "/_configsync/flush")
|
|
|
- .header("Content-Type", "application/json").execute()) {
|
|
|
- if (response.getHttpStatusCode() == 200) {
|
|
|
- logger.info("Flushed config files.");
|
|
|
+ boolean exists = existsIndex(indexName);
|
|
|
+ if (!exists) {
|
|
|
+ final String createdIndexName;
|
|
|
+ if (isFessIndex) {
|
|
|
+ createdIndexName = generateNewIndexName(configIndex);
|
|
|
} else {
|
|
|
- logger.warn("Failed to flush config files.");
|
|
|
+ createdIndexName = configIndex;
|
|
|
}
|
|
|
- } catch (final Exception e) {
|
|
|
- logger.warn("Failed to flush config files.", e);
|
|
|
+ createIndex(configIndex, configType, createdIndexName);
|
|
|
+ createAlias(configIndex, createdIndexName);
|
|
|
}
|
|
|
|
|
|
- final String createdIndexName;
|
|
|
+ final String updatedIndexName;
|
|
|
if (isFessIndex) {
|
|
|
- createdIndexName = generateNewIndexName(configIndex);
|
|
|
- } else {
|
|
|
- createdIndexName = configIndex;
|
|
|
- }
|
|
|
- final String indexConfigFile = indexConfigPath + "/" + configIndex + ".json";
|
|
|
- try {
|
|
|
- String source = FileUtil.readUTF8(indexConfigFile);
|
|
|
- final String dictionaryPath = System.getProperty("fess.dictionary.path", StringUtil.EMPTY);
|
|
|
- source = source.replaceAll(Pattern.quote("${fess.dictionary.path}"), dictionaryPath);
|
|
|
- final CreateIndexResponse indexResponse =
|
|
|
- client.admin().indices().prepareCreate(createdIndexName)
|
|
|
- .setSource(source, XContentFactory.xContentType(source)).execute()
|
|
|
+ client.admin().cluster().prepareHealth(fessConfig.getIndexDocumentUpdateIndex()).setWaitForYellowStatus().execute()
|
|
|
+ .actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
+ final GetIndexResponse response =
|
|
|
+ client.admin().indices().prepareGetIndex().addIndices(fessConfig.getIndexDocumentUpdateIndex()).execute()
|
|
|
.actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
- if (indexResponse.isAcknowledged()) {
|
|
|
- logger.info("Created " + createdIndexName + " index.");
|
|
|
- } else if (logger.isDebugEnabled()) {
|
|
|
- logger.debug("Failed to create " + createdIndexName + " index.");
|
|
|
+ final String[] indices = response.indices();
|
|
|
+ if (indices.length == 1) {
|
|
|
+ updatedIndexName = indices[0];
|
|
|
+ } else {
|
|
|
+ updatedIndexName = configIndex;
|
|
|
}
|
|
|
- } catch (final Exception e) {
|
|
|
- logger.warn(indexConfigFile + " is not found.", e);
|
|
|
+ } else {
|
|
|
+ updatedIndexName = configIndex;
|
|
|
}
|
|
|
+ addMapping(configIndex, configType, updatedIndexName);
|
|
|
+ } else {
|
|
|
+ logger.warn("Invalid index config name: " + configName);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
- // alias
|
|
|
- final String aliasConfigDirPath = indexConfigPath + "/" + configIndex + "/alias";
|
|
|
- try {
|
|
|
- final File aliasConfigDir = ResourceUtil.getResourceAsFile(aliasConfigDirPath);
|
|
|
- if (aliasConfigDir.isDirectory()) {
|
|
|
- stream(aliasConfigDir.listFiles((dir, name) -> name.endsWith(".json"))).of(
|
|
|
- stream -> stream.forEach(f -> {
|
|
|
- final String aliasName = f.getName().replaceFirst(".json$", "");
|
|
|
- String source = FileUtil.readUTF8(f);
|
|
|
- if (source.trim().equals("{}")) {
|
|
|
- source = null;
|
|
|
- }
|
|
|
- final IndicesAliasesResponse response =
|
|
|
- client.admin().indices().prepareAliases().addAlias(createdIndexName, aliasName, source)
|
|
|
- .execute().actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
- if (response.isAcknowledged()) {
|
|
|
- logger.info("Created " + aliasName + " alias for " + createdIndexName);
|
|
|
- } else if (logger.isDebugEnabled()) {
|
|
|
- logger.debug("Failed to create " + aliasName + " alias for " + createdIndexName);
|
|
|
- }
|
|
|
- }));
|
|
|
- }
|
|
|
- } catch (final ResourceNotFoundRuntimeException e) {
|
|
|
- // ignore
|
|
|
- } catch (final Exception e) {
|
|
|
- logger.warn(aliasConfigDirPath + " is not found.", e);
|
|
|
- }
|
|
|
+ public boolean existsIndex(final String indexName) {
|
|
|
+ final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
+ boolean exists = false;
|
|
|
+ try {
|
|
|
+ final IndicesExistsResponse response =
|
|
|
+ client.admin().indices().prepareExists(indexName).execute().actionGet(fessConfig.getIndexSearchTimeout());
|
|
|
+ exists = response.isExists();
|
|
|
+ } catch (final Exception e) {
|
|
|
+ // ignore
|
|
|
+ }
|
|
|
+ return exists;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean reindex(final String fromIndex, final String toIndex, final boolean waitForCompletion) {
|
|
|
+ final String source = "{\"source\":{\"index\":\"" + fromIndex + "\"},\"dest\":{\"index\":\"" + toIndex + "\"}}";
|
|
|
+ try (CurlResponse response =
|
|
|
+ Curl.post(org.codelibs.fess.util.ResourceUtil.getElasticsearchHttpUrl() + "/_reindex")
|
|
|
+ .header("Content-Type", "application/json").param("wait_for_completion", Boolean.toString(waitForCompletion))
|
|
|
+ .body(source).execute()) {
|
|
|
+ if (response.getHttpStatusCode() == 200) {
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ logger.warn("Failed to reindex from " + fromIndex + " to " + toIndex);
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ logger.warn("Failed to reindex from " + fromIndex + " to " + toIndex, e);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean createIndex(final String index, final String docType, final String indexName) {
|
|
|
+ final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
+
|
|
|
+ waitForConfigSyncStatus();
|
|
|
+ sendConfigFiles(index);
|
|
|
+
|
|
|
+ final String indexConfigFile = indexConfigPath + "/" + index + ".json";
|
|
|
+ try {
|
|
|
+ String source = FileUtil.readUTF8(indexConfigFile);
|
|
|
+ final String dictionaryPath = System.getProperty("fess.dictionary.path", StringUtil.EMPTY);
|
|
|
+ source = source.replaceAll(Pattern.quote("${fess.dictionary.path}"), dictionaryPath);
|
|
|
+ final CreateIndexResponse indexResponse =
|
|
|
+ client.admin().indices().prepareCreate(indexName).setSource(source, XContentFactory.xContentType(source)).execute()
|
|
|
+ .actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
+ if (indexResponse.isAcknowledged()) {
|
|
|
+ logger.info("Created " + indexName + " index.");
|
|
|
+ return true;
|
|
|
+ } else if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("Failed to create " + indexName + " index.");
|
|
|
}
|
|
|
+ } catch (final Exception e) {
|
|
|
+ logger.warn(indexConfigFile + " is not found.", e);
|
|
|
+ }
|
|
|
|
|
|
- final String updatedIndexName;
|
|
|
- if (isFessIndex) {
|
|
|
- client.admin().cluster().prepareHealth(fessConfig.getIndexDocumentUpdateIndex()).setWaitForYellowStatus().execute()
|
|
|
- .actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
- final GetIndexResponse response =
|
|
|
- client.admin().indices().prepareGetIndex().addIndices(fessConfig.getIndexDocumentUpdateIndex()).execute()
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void addMapping(final String index, final String docType, final String indexName) {
|
|
|
+ final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
+
|
|
|
+ final GetMappingsResponse getMappingsResponse =
|
|
|
+ client.admin().indices().prepareGetMappings(indexName).execute().actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
+ final ImmutableOpenMap<String, MappingMetaData> indexMappings = getMappingsResponse.mappings().get(indexName);
|
|
|
+ if (indexMappings == null || !indexMappings.containsKey(docType)) {
|
|
|
+ String source = null;
|
|
|
+ final String mappingFile = indexConfigPath + "/" + index + "/" + docType + ".json";
|
|
|
+ try {
|
|
|
+ source = FileUtil.readUTF8(mappingFile);
|
|
|
+ } catch (final Exception e) {
|
|
|
+ logger.warn(mappingFile + " is not found.", e);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ final PutMappingResponse putMappingResponse =
|
|
|
+ client.admin().indices().preparePutMapping(indexName).setType(docType)
|
|
|
+ .setSource(source, XContentFactory.xContentType(source)).execute()
|
|
|
.actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
- final String[] indices = response.indices();
|
|
|
- if (indices.length == 1) {
|
|
|
- updatedIndexName = indices[0];
|
|
|
+ if (putMappingResponse.isAcknowledged()) {
|
|
|
+ logger.info("Created " + indexName + "/" + docType + " mapping.");
|
|
|
} else {
|
|
|
- updatedIndexName = configIndex;
|
|
|
+ logger.warn("Failed to create " + indexName + "/" + docType + " mapping.");
|
|
|
}
|
|
|
- } else {
|
|
|
- updatedIndexName = configIndex;
|
|
|
- }
|
|
|
- final GetMappingsResponse getMappingsResponse =
|
|
|
- client.admin().indices().prepareGetMappings(updatedIndexName).execute().actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
- final ImmutableOpenMap<String, MappingMetaData> indexMappings = getMappingsResponse.mappings().get(updatedIndexName);
|
|
|
- if (indexMappings == null || !indexMappings.containsKey(configType)) {
|
|
|
- String source = null;
|
|
|
- final String mappingFile = indexConfigPath + "/" + configIndex + "/" + configType + ".json";
|
|
|
- try {
|
|
|
- source = FileUtil.readUTF8(mappingFile);
|
|
|
- } catch (final Exception e) {
|
|
|
- logger.warn(mappingFile + " is not found.", e);
|
|
|
+
|
|
|
+ final String dataPath = indexConfigPath + "/" + index + "/" + docType + ".bulk";
|
|
|
+ if (ResourceUtil.isExist(dataPath)) {
|
|
|
+ insertBulkData(fessConfig, index, docType, dataPath);
|
|
|
}
|
|
|
- try {
|
|
|
- final PutMappingResponse putMappingResponse =
|
|
|
- client.admin().indices().preparePutMapping(updatedIndexName).setType(configType)
|
|
|
- .setSource(source, XContentFactory.xContentType(source)).execute()
|
|
|
- .actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
- if (putMappingResponse.isAcknowledged()) {
|
|
|
- logger.info("Created " + updatedIndexName + "/" + configType + " mapping.");
|
|
|
- } else {
|
|
|
- logger.warn("Failed to create " + updatedIndexName + "/" + configType + " mapping.");
|
|
|
- }
|
|
|
+ } catch (final Exception e) {
|
|
|
+ logger.warn("Failed to create " + indexName + "/" + docType + " mapping.", e);
|
|
|
+ }
|
|
|
+ } else if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug(indexName + "/" + docType + " mapping exists.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean updateAlias(final String newIndex) {
|
|
|
+ final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
+ String updateAlias = fessConfig.getIndexDocumentUpdateIndex();
|
|
|
+ String searchAlias = fessConfig.getIndexDocumentSearchIndex();
|
|
|
+ final GetIndexResponse response1 =
|
|
|
+ client.admin().indices().prepareGetIndex().addIndices(updateAlias).execute().actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
+ final String[] updateIndices = response1.indices();
|
|
|
+ final GetIndexResponse response2 =
|
|
|
+ client.admin().indices().prepareGetIndex().addIndices(searchAlias).execute().actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
+ final String[] searchIndices = response2.indices();
|
|
|
+
|
|
|
+ IndicesAliasesRequestBuilder builder =
|
|
|
+ client.admin().indices().prepareAliases().addAlias(newIndex, updateAlias).addAlias(newIndex, searchAlias);
|
|
|
+ for (String index : updateIndices) {
|
|
|
+ builder.removeAlias(index, updateAlias);
|
|
|
+ }
|
|
|
+ for (String index : searchIndices) {
|
|
|
+ builder.removeAlias(index, searchAlias);
|
|
|
+ }
|
|
|
+ IndicesAliasesResponse response = builder.execute().actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
+ return response.isAcknowledged();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void createAlias(final String index, final String createdIndexName) {
|
|
|
+ final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
+ // alias
|
|
|
+ final String aliasConfigDirPath = indexConfigPath + "/" + index + "/alias";
|
|
|
+ try {
|
|
|
+ final File aliasConfigDir = ResourceUtil.getResourceAsFile(aliasConfigDirPath);
|
|
|
+ if (aliasConfigDir.isDirectory()) {
|
|
|
+ stream(aliasConfigDir.listFiles((dir, name) -> name.endsWith(".json"))).of(
|
|
|
+ stream -> stream.forEach(f -> {
|
|
|
+ final String aliasName = f.getName().replaceFirst(".json$", "");
|
|
|
+ String source = FileUtil.readUTF8(f);
|
|
|
+ if (source.trim().equals("{}")) {
|
|
|
+ source = null;
|
|
|
+ }
|
|
|
+ final IndicesAliasesResponse response =
|
|
|
+ client.admin().indices().prepareAliases().addAlias(createdIndexName, aliasName, source).execute()
|
|
|
+ .actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
+ if (response.isAcknowledged()) {
|
|
|
+ logger.info("Created " + aliasName + " alias for " + createdIndexName);
|
|
|
+ } else if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("Failed to create " + aliasName + " alias for " + createdIndexName);
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ } catch (final ResourceNotFoundRuntimeException e) {
|
|
|
+ // ignore
|
|
|
+ } catch (final Exception e) {
|
|
|
+ logger.warn(aliasConfigDirPath + " is not found.", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- final String dataPath = indexConfigPath + "/" + configIndex + "/" + configType + ".bulk";
|
|
|
- if (ResourceUtil.isExist(dataPath)) {
|
|
|
- insertBulkData(fessConfig, configIndex, configType, dataPath);
|
|
|
+ protected void sendConfigFiles(final String index) {
|
|
|
+ configListMap.getOrDefault(index, Collections.emptyList()).forEach(
|
|
|
+ path -> {
|
|
|
+ String source = null;
|
|
|
+ final String filePath = indexConfigPath + "/" + index + "/" + path;
|
|
|
+ try {
|
|
|
+ source = FileUtil.readUTF8(filePath);
|
|
|
+ try (CurlResponse response =
|
|
|
+ Curl.post(org.codelibs.fess.util.ResourceUtil.getElasticsearchHttpUrl() + "/_configsync/file")
|
|
|
+ .header("Content-Type", "application/json").param("path", path).body(source).execute()) {
|
|
|
+ if (response.getHttpStatusCode() == 200) {
|
|
|
+ logger.info("Register " + path + " to " + index);
|
|
|
+ } else {
|
|
|
+ if (response.getContentException() != null) {
|
|
|
+ logger.warn("Invalid request for " + path + ".", response.getContentException());
|
|
|
+ } else {
|
|
|
+ logger.warn("Invalid request for " + path + ". The response is " + response.getContentAsString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (final Exception e) {
|
|
|
+ logger.warn("Failed to register " + filePath, e);
|
|
|
}
|
|
|
- } catch (final Exception e) {
|
|
|
- logger.warn("Failed to create " + updatedIndexName + "/" + configType + " mapping.", e);
|
|
|
- }
|
|
|
- } else if (logger.isDebugEnabled()) {
|
|
|
- logger.debug(updatedIndexName + "/" + configType + " mapping exists.");
|
|
|
+ });
|
|
|
+ try (CurlResponse response =
|
|
|
+ Curl.post(org.codelibs.fess.util.ResourceUtil.getElasticsearchHttpUrl() + "/_configsync/flush")
|
|
|
+ .header("Content-Type", "application/json").execute()) {
|
|
|
+ if (response.getHttpStatusCode() == 200) {
|
|
|
+ logger.info("Flushed config files.");
|
|
|
+ } else {
|
|
|
+ logger.warn("Failed to flush config files.");
|
|
|
}
|
|
|
- } else {
|
|
|
- logger.warn("Invalid index config name: " + configName);
|
|
|
+ } catch (final Exception e) {
|
|
|
+ logger.warn("Failed to flush config files.", e);
|
|
|
}
|
|
|
- }) ;
|
|
|
}
|
|
|
|
|
|
protected String generateNewIndexName(final String configIndex) {
|