|
@@ -163,6 +163,8 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.google.common.io.BaseEncoding;
|
|
|
|
|
|
public class FessEsClient implements Client {
|
|
@@ -396,21 +398,37 @@ public class FessEsClient implements Client {
|
|
|
if (ResourceUtil.isExist(dataPath)) {
|
|
|
try {
|
|
|
final BulkRequestBuilder builder = client.prepareBulk();
|
|
|
- Arrays.stream(FileUtil.readUTF8(dataPath).split("\n")).reduce((prev, line) -> {
|
|
|
- if (StringUtil.isBlank(prev)) {
|
|
|
- if (line.startsWith("{\"index\":{")) {
|
|
|
- return line;
|
|
|
- } else if (line.startsWith("{\"update\":{")) {
|
|
|
- return line;
|
|
|
- } else if (line.startsWith("{\"delete\":{")) {
|
|
|
+ ObjectMapper mapper = new ObjectMapper();
|
|
|
+ Arrays.stream(FileUtil.readUTF8(dataPath).split("\n")).reduce(
|
|
|
+ (prev, line) -> {
|
|
|
+ try {
|
|
|
+ if (StringUtil.isBlank(prev)) {
|
|
|
+ Map<String, Map<String, String>> result =
|
|
|
+ mapper.readValue(line, new TypeReference<Map<String, Map<String, String>>>() {
|
|
|
+ });
|
|
|
+ if (result.keySet().contains("index")) {
|
|
|
+ return line;
|
|
|
+ } else if (result.keySet().contains("update")) {
|
|
|
+ return line;
|
|
|
+ } else if (result.keySet().contains("delete")) {
|
|
|
+ return StringUtil.EMPTY;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ Map<String, Map<String, String>> result =
|
|
|
+ mapper.readValue(prev, new TypeReference<Map<String, Map<String, String>>>() {
|
|
|
+ });
|
|
|
+ if (result.keySet().contains("index")) {
|
|
|
+ final IndexRequestBuilder requestBuilder =
|
|
|
+ client.prepareIndex(configIndex, configType, result.get("index").get("_id"))
|
|
|
+ .setSource(line);
|
|
|
+ builder.add(requestBuilder);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.warn("Failed to parse " + configIndex + "/" + configType + " mapping.");
|
|
|
+ }
|
|
|
return StringUtil.EMPTY;
|
|
|
- }
|
|
|
- } else if (prev.startsWith("{\"index\":{")) {
|
|
|
- final IndexRequestBuilder requestBuilder = client.prepareIndex(configIndex, configType).setSource(line);
|
|
|
- builder.add(requestBuilder);
|
|
|
- }
|
|
|
- return StringUtil.EMPTY;
|
|
|
- });
|
|
|
+ });
|
|
|
final BulkResponse response = builder.execute().actionGet();
|
|
|
if (response.hasFailures()) {
|
|
|
logger.warn("Failed to register " + dataPath.toString() + ": " + response.buildFailureMessage());
|