This commit is contained in:
Shinsuke Sugaya 2014-05-02 15:16:47 +09:00
parent 8dba6984af
commit 834d4df791
4 changed files with 63 additions and 26 deletions

View file

@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHashMap;
import jp.sf.fess.FessSystemException;
import jp.sf.fess.job.JobExecutor;
import jp.sf.fess.util.InputStreamThread;
import jp.sf.fess.util.JobProcess;
import org.apache.commons.io.IOUtils;
import org.seasar.framework.container.annotation.tiger.DestroyMethod;
@ -32,9 +34,9 @@ public class JobHelper {
private static final Logger logger = LoggerFactory
.getLogger(JobHelper.class);
private final ConcurrentHashMap<String, Process> runningProcessMap = new ConcurrentHashMap<String, Process>();
private final ConcurrentHashMap<String, JobProcess> runningProcessMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, JobExecutor> runningJobExecutorMap = new ConcurrentHashMap<Long, JobExecutor>();
private final ConcurrentHashMap<Long, JobExecutor> runningJobExecutorMap = new ConcurrentHashMap<>();
@DestroyMethod
public void destroy() {
@ -43,31 +45,39 @@ public class JobHelper {
}
}
public Process startCrawlerProcess(final String sessionId,
public JobProcess startCrawlerProcess(final String sessionId,
final ProcessBuilder processBuilder) {
destroyCrawlerProcess(sessionId);
Process currentProcess;
JobProcess jobProcess;
try {
currentProcess = processBuilder.start();
jobProcess = new JobProcess(processBuilder.start());
destroyCrawlerProcess(runningProcessMap.putIfAbsent(sessionId,
currentProcess));
return currentProcess;
jobProcess));
return jobProcess;
} catch (final IOException e) {
throw new FessSystemException("Crawler Process terminated.", e);
}
}
public void destroyCrawlerProcess(final String sessionId) {
final Process process = runningProcessMap.remove(sessionId);
destroyCrawlerProcess(process);
final JobProcess jobProcess = runningProcessMap.remove(sessionId);
destroyCrawlerProcess(jobProcess);
}
public boolean isCrawlProcessRunning() {
return !runningProcessMap.isEmpty();
}
protected void destroyCrawlerProcess(final Process process) {
if (process != null) {
protected void destroyCrawlerProcess(final JobProcess jobProcess) {
if (jobProcess != null) {
InputStreamThread ist = jobProcess.getInputStreamThread();
try {
ist.interrupt();
} catch (Exception e) {
logger.warn("Could not interrupt a thread of an input stream.",
e);
}
Process process = jobProcess.getProcess();
try {
IOUtils.closeQuietly(process.getInputStream());
} catch (final Exception e) {

View file

@ -33,6 +33,7 @@ import jp.sf.fess.helper.SystemHelper;
import jp.sf.fess.job.JobExecutor.ShutdownListener;
import jp.sf.fess.util.ComponentUtil;
import jp.sf.fess.util.InputStreamThread;
import jp.sf.fess.util.JobProcess;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
@ -303,13 +304,13 @@ public class CrawlJob {
pb.redirectErrorStream(true);
try {
final Process currentProcess = jobHelper.startCrawlerProcess(
final JobProcess jobProcess = jobHelper.startCrawlerProcess(
sessionId, pb);
final InputStreamThread it = new InputStreamThread(
currentProcess.getInputStream(), Constants.UTF_8);
InputStreamThread it = jobProcess.getInputStreamThread();
it.start();
Process currentProcess = jobProcess.getProcess();
currentProcess.waitFor();
it.join(5000);

View file

@ -17,7 +17,6 @@
package jp.sf.fess.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
@ -49,21 +48,24 @@ public class InputStreamThread extends Thread {
@Override
public void run() {
for (;;) {
boolean running = true;
while (running) {
try {
final String line = br.readLine();
if (line == null) {
break;
running = false;
} else {
if (logger.isDebugEnabled()) {
logger.debug(line);
}
list.add(line);
if (list.size() > MAX_BUFFER_SIZE) {
list.remove(0);
}
}
if (logger.isDebugEnabled()) {
logger.debug(line);
}
list.add(line);
if (list.size() > MAX_BUFFER_SIZE) {
list.remove(0);
}
} catch (final IOException e) {
throw new FessSystemException(e);
} catch (final Exception e) {
running = false;
logger.error("Failed to process an input stream.", e);
}
}
}

View file

@ -0,0 +1,24 @@
package jp.sf.fess.util;
import jp.sf.fess.Constants;
public class JobProcess {
protected Process process;
protected InputStreamThread inputStreamThread;
public JobProcess(Process process) {
this.process = process;
this.inputStreamThread = new InputStreamThread(
process.getInputStream(), Constants.UTF_8);
}
public Process getProcess() {
return process;
}
public InputStreamThread getInputStreamThread() {
return inputStreamThread;
}
}