|
@@ -210,7 +210,8 @@ public class FessEsClient implements Client {
|
|
|
}
|
|
|
|
|
|
public String getStatus() {
|
|
|
- return admin().cluster().prepareHealth().execute().actionGet().getStatus().name();
|
|
|
+ return admin().cluster().prepareHealth().execute().actionGet(ComponentUtil.getFessConfig().getIndexHealthTimeout()).getStatus()
|
|
|
+ .name();
|
|
|
}
|
|
|
|
|
|
public void setRunner(final ElasticsearchClusterRunner runner) {
|
|
@@ -307,7 +308,7 @@ public class FessEsClient implements Client {
|
|
|
final String configType = values[1];
|
|
|
boolean exists = false;
|
|
|
try {
|
|
|
- client.prepareExists(configIndex).execute().actionGet();
|
|
|
+ client.prepareExists(configIndex).execute().actionGet(fessConfig.getIndexSearchTimeout());
|
|
|
exists = true;
|
|
|
} catch (final IndexNotFoundException e) {
|
|
|
// ignore
|
|
@@ -350,7 +351,8 @@ public class FessEsClient implements Client {
|
|
|
final String dictionaryPath = System.getProperty("fess.dictionary.path", StringUtil.EMPTY);
|
|
|
source = source.replaceAll(Pattern.quote("${fess.dictionary.path}"), dictionaryPath);
|
|
|
final CreateIndexResponse indexResponse =
|
|
|
- client.admin().indices().prepareCreate(configIndex).setSource(source).execute().actionGet();
|
|
|
+ client.admin().indices().prepareCreate(configIndex).setSource(source).execute()
|
|
|
+ .actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
if (indexResponse.isAcknowledged()) {
|
|
|
logger.info("Created " + configIndex + " index.");
|
|
|
} else if (logger.isDebugEnabled()) {
|
|
@@ -364,7 +366,8 @@ public class FessEsClient implements Client {
|
|
|
}
|
|
|
|
|
|
final GetMappingsResponse getMappingsResponse =
|
|
|
- client.admin().indices().prepareGetMappings(configIndex).setTypes(configType).execute().actionGet();
|
|
|
+ client.admin().indices().prepareGetMappings(configIndex).setTypes(configType).execute()
|
|
|
+ .actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
final ImmutableOpenMap<String, MappingMetaData> indexMappings = getMappingsResponse.mappings().get(configIndex);
|
|
|
if (indexMappings == null || !indexMappings.containsKey(configType)) {
|
|
|
String source = null;
|
|
@@ -375,7 +378,8 @@ public class FessEsClient implements Client {
|
|
|
logger.warn(mappingFile + " is not found.", e);
|
|
|
}
|
|
|
final PutMappingResponse putMappingResponse =
|
|
|
- client.admin().indices().preparePutMapping(configIndex).setType(configType).setSource(source).execute().actionGet();
|
|
|
+ client.admin().indices().preparePutMapping(configIndex).setType(configType).setSource(source).execute()
|
|
|
+ .actionGet(fessConfig.getIndexIndicesTimeout());
|
|
|
if (putMappingResponse.isAcknowledged()) {
|
|
|
logger.info("Created " + configIndex + "/" + configType + " mapping.");
|
|
|
} else {
|
|
@@ -417,7 +421,7 @@ public class FessEsClient implements Client {
|
|
|
}
|
|
|
return StringUtil.EMPTY;
|
|
|
});
|
|
|
- final BulkResponse response = builder.execute().actionGet();
|
|
|
+ final BulkResponse response = builder.execute().actionGet(fessConfig.getIndexBulkTimeout());
|
|
|
if (response.hasFailures()) {
|
|
|
logger.warn("Failed to register " + dataPath + ": " + response.buildFailureMessage());
|
|
|
}
|
|
@@ -435,7 +439,9 @@ public class FessEsClient implements Client {
|
|
|
}
|
|
|
|
|
|
private void waitForYellowStatus() {
|
|
|
- final ClusterHealthResponse response = client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
|
|
|
+ final ClusterHealthResponse response =
|
|
|
+ client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute()
|
|
|
+ .actionGet(ComponentUtil.getFessConfig().getIndexHealthTimeout());
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
logger.debug("Elasticsearch Cluster Status: " + response.getStatus());
|
|
|
}
|
|
@@ -445,7 +451,8 @@ public class FessEsClient implements Client {
|
|
|
@PreDestroy
|
|
|
public void close() {
|
|
|
try {
|
|
|
- client.admin().indices().prepareFlush().setForce(true).execute().actionGet();
|
|
|
+ client.admin().indices().prepareFlush().setForce(true).execute()
|
|
|
+ .actionGet(ComponentUtil.getFessConfig().getIndexIndicesTimeout());
|
|
|
} catch (Exception e) {
|
|
|
logger.warn("Failed to flush indices.", e);
|
|
|
}
|
|
@@ -461,7 +468,8 @@ public class FessEsClient implements Client {
|
|
|
final FessConfig fessConfig = ComponentUtil.getFessConfig();
|
|
|
SearchResponse response =
|
|
|
client.prepareSearch(index).setTypes(type).setScroll(scrollForDelete).setSize(sizeForDelete)
|
|
|
- .addField(fessConfig.getIndexFieldId()).setQuery(queryBuilder).execute().actionGet();
|
|
|
+ .addField(fessConfig.getIndexFieldId()).setQuery(queryBuilder).execute()
|
|
|
+ .actionGet(fessConfig.getIndexScrollSearchTimeoutTimeout());
|
|
|
|
|
|
int count = 0;
|
|
|
String scrollId = response.getScrollId();
|
|
@@ -478,12 +486,13 @@ public class FessEsClient implements Client {
|
|
|
bulkRequest.add(client.prepareDelete(index, type, hit.getId()));
|
|
|
}
|
|
|
count += hits.length;
|
|
|
- final BulkResponse bulkResponse = bulkRequest.execute().actionGet();
|
|
|
+ final BulkResponse bulkResponse = bulkRequest.execute().actionGet(fessConfig.getIndexBulkTimeout());
|
|
|
if (bulkResponse.hasFailures()) {
|
|
|
throw new IllegalBehaviorStateException(bulkResponse.buildFailureMessage());
|
|
|
}
|
|
|
|
|
|
- response = client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet();
|
|
|
+ response =
|
|
|
+ client.prepareSearchScroll(scrollId).setScroll(scrollForDelete).execute().actionGet(fessConfig.getIndexBulkTimeout());
|
|
|
scrollId = response.getScrollId();
|
|
|
}
|
|
|
return count;
|
|
@@ -511,7 +520,7 @@ public class FessEsClient implements Client {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- response = requestBuilder.execute().actionGet();
|
|
|
+ response = requestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexSearchTimeout());
|
|
|
}
|
|
|
final long execTime = System.currentTimeMillis() - startTime;
|
|
|
|
|
@@ -545,7 +554,7 @@ public class FessEsClient implements Client {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- searchResponse = searchRequestBuilder.execute().actionGet();
|
|
|
+ searchResponse = searchRequestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexSearchTimeout());
|
|
|
} catch (final SearchPhaseExecutionException e) {
|
|
|
throw new InvalidQueryException(messages -> messages.addErrorsInvalidQueryParseError(UserMessages.GLOBAL_PROPERTY_KEY),
|
|
|
"Invalid query: " + searchRequestBuilder, e);
|
|
@@ -673,7 +682,8 @@ public class FessEsClient implements Client {
|
|
|
|
|
|
public boolean update(final String index, final String type, final String id, final String field, final Object value) {
|
|
|
try {
|
|
|
- return client.prepareUpdate(index, type, id).setDoc(field, value).execute().actionGet().isCreated();
|
|
|
+ return client.prepareUpdate(index, type, id).setDoc(field, value).execute()
|
|
|
+ .actionGet(ComponentUtil.getFessConfig().getIndexIndexTimeout()).isCreated();
|
|
|
} catch (final ElasticsearchException e) {
|
|
|
throw new FessEsClientException("Failed to set " + value + " to " + field + " for doc " + id, e);
|
|
|
}
|
|
@@ -716,7 +726,8 @@ public class FessEsClient implements Client {
|
|
|
|
|
|
public PingResponse ping() {
|
|
|
try {
|
|
|
- final ClusterHealthResponse response = client.admin().cluster().prepareHealth().execute().actionGet();
|
|
|
+ final ClusterHealthResponse response =
|
|
|
+ client.admin().cluster().prepareHealth().execute().actionGet(ComponentUtil.getFessConfig().getIndexHealthTimeout());
|
|
|
return new PingResponse(response);
|
|
|
} catch (final ElasticsearchException e) {
|
|
|
throw new FessEsClientException("Failed to process a ping request.", e);
|
|
@@ -730,7 +741,7 @@ public class FessEsClient implements Client {
|
|
|
final Object id = doc.remove(fessConfig.getIndexFieldId());
|
|
|
bulkRequestBuilder.add(client.prepareIndex(index, type, id.toString()).setSource(doc));
|
|
|
}
|
|
|
- final BulkResponse response = bulkRequestBuilder.execute().actionGet();
|
|
|
+ final BulkResponse response = bulkRequestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexBulkTimeout());
|
|
|
if (response.hasFailures()) {
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
final List<ActionRequest> requests = bulkRequestBuilder.request().requests();
|
|
@@ -895,12 +906,13 @@ public class FessEsClient implements Client {
|
|
|
if (id == null) {
|
|
|
// create
|
|
|
response =
|
|
|
- client.prepareIndex(index, type).setSource(source).setRefresh(true).setOpType(OpType.CREATE).execute().actionGet();
|
|
|
+ client.prepareIndex(index, type).setSource(source).setRefresh(true).setOpType(OpType.CREATE).execute()
|
|
|
+ .actionGet(fessConfig.getIndexIndexTimeout());
|
|
|
} else {
|
|
|
// create or update
|
|
|
response =
|
|
|
client.prepareIndex(index, type, id).setSource(source).setRefresh(true).setOpType(OpType.INDEX).setVersion(version)
|
|
|
- .execute().actionGet();
|
|
|
+ .execute().actionGet(fessConfig.getIndexIndexTimeout());
|
|
|
}
|
|
|
return response.isCreated();
|
|
|
} catch (final ElasticsearchException e) {
|
|
@@ -914,7 +926,7 @@ public class FessEsClient implements Client {
|
|
|
if (version > 0) {
|
|
|
builder.setVersion(version);
|
|
|
}
|
|
|
- final DeleteResponse response = builder.execute().actionGet();
|
|
|
+ final DeleteResponse response = builder.execute().actionGet(ComponentUtil.getFessConfig().getIndexDeleteTimeout());
|
|
|
return response.isFound();
|
|
|
} catch (final ElasticsearchException e) {
|
|
|
throw new FessEsClientException("Failed to delete: " + index + "/" + type + "/" + id + "/" + version, e);
|