fix #472 : improve targeted schedule jobs

This commit is contained in:
Shinsuke Sugaya 2016-04-09 17:55:10 +09:00
parent 4fe742be6c
commit ac2ab82a1a
10 changed files with 260 additions and 130 deletions

View file

@ -18,10 +18,12 @@ package org.codelibs.fess.app.job;
import javax.annotation.Resource;
import org.codelibs.core.lang.StringUtil;
import org.codelibs.core.timer.TimeoutManager;
import org.codelibs.fess.Constants;
import org.codelibs.fess.app.logic.AccessContextLogic;
import org.codelibs.fess.app.service.ScheduledJobService;
import org.codelibs.fess.es.config.exbhv.JobLogBhv;
import org.codelibs.fess.helper.JobHelper;
import org.codelibs.fess.helper.SystemHelper;
import org.codelibs.fess.mylasta.direction.FessConfig;
import org.codelibs.fess.util.ComponentUtil;
@ -31,9 +33,13 @@ import org.lastaflute.job.LaCron;
import org.lastaflute.job.LaJob;
import org.lastaflute.job.LaJobRunner;
import org.lastaflute.job.LaJobScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AllJobScheduler implements LaJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(AllJobScheduler.class);
protected static final String APP_TYPE = "JOB";
@Resource
@ -51,10 +57,16 @@ public class AllJobScheduler implements LaJobScheduler {
@Resource
private SystemHelper systemHelper;
@Resource
private JobHelper jobHelper;
protected Class<? extends LaJob> jobClass = ScriptExecutorJob.class;
protected long schedulerTime;
@Override
public void schedule(final LaCron cron) {
schedulerTime = System.currentTimeMillis();
scheduledJobService.start(cron);
final String myName = fessConfig.getSchedulerTargetName();
@ -65,6 +77,23 @@ public class AllJobScheduler implements LaJobScheduler {
});
}
TimeoutManager.getInstance().addTimeoutTarget(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Updating scheduled jobs. time:" + schedulerTime);
}
final long now = System.currentTimeMillis();
scheduledJobService.getScheduledJobListAfter(schedulerTime).forEach(scheduledJob -> {
if (logger.isDebugEnabled()) {
logger.debug("Updating job schedule:" + scheduledJob.getName());
}
try {
jobHelper.register(scheduledJob);
} catch (Exception e) {
logger.warn("Failed to update schdule " + scheduledJob, e);
}
});
schedulerTime = now;
}, fessConfig.getSchedulerMonitorIntervalAsInteger(), true);
}
@Override
@ -74,4 +103,7 @@ public class AllJobScheduler implements LaJobScheduler {
});
}
public void setJobClass(Class<? extends LaJob> jobClass) {
this.jobClass = jobClass;
}
}

View file

@ -15,10 +15,11 @@
*/
package org.codelibs.fess.app.job;
import org.codelibs.core.lang.StringUtil;
import org.codelibs.fess.Constants;
import org.codelibs.fess.app.service.JobLogService;
import org.codelibs.fess.es.config.exentity.JobLog;
import org.codelibs.fess.es.config.exentity.ScheduledJob;
import org.codelibs.fess.helper.JobHelper;
import org.codelibs.fess.helper.SystemHelper;
import org.codelibs.fess.job.JobExecutor;
import org.codelibs.fess.job.ScheduledJobException;
@ -35,13 +36,32 @@ public class ScriptExecutorJob implements LaJob {
@Override
public void run(final LaJobRuntime runtime) {
final ScheduledJob scheduledJob = (ScheduledJob) runtime.getParameterMap().get(Constants.SCHEDULED_JOB); // TODO null check
if (!runtime.getParameterMap().containsKey(Constants.SCHEDULED_JOB)) {
logger.warn(Constants.SCHEDULED_JOB + " is empty.");
return;
}
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
final JobManager jobManager = ComponentUtil.getJobManager();
final ScheduledJob scheduledJob = (ScheduledJob) runtime.getParameterMap().get(Constants.SCHEDULED_JOB);
final String id = scheduledJob.getId();
final String target = scheduledJob.getTarget();
if (!isTarget(target)) {
logger.info("Ignore Job " + id + ":" + scheduledJob.getName() + " because of not target: " + scheduledJob.getTarget());
return;
}
final JobHelper jobHelper = ComponentUtil.getJobHelper();
if (!jobHelper.isAvailable(id)) {
logger.info("Job " + id + " is unavailable. Unregistering this job.");
jobHelper.unregister(scheduledJob);
return;
}
final JobLog jobLog = new JobLog(scheduledJob);
final String scriptType = scheduledJob.getScriptType();
final String script = scheduledJob.getScriptData();
final String id = scheduledJob.getId();
final JobExecutor jobExecutor = ComponentUtil.getJobExecutor(scriptType);
if (jobExecutor == null) {
throw new ScheduledJobException("No jobExecutor: " + scriptType);
@ -56,7 +76,7 @@ public class ScriptExecutorJob implements LaJob {
try {
if (scheduledJob.isLoggingEnabled()) {
storeJobLog(jobLog);
jobHelper.store(jobLog);
}
if (logger.isDebugEnabled()) {
@ -87,14 +107,28 @@ public class ScriptExecutorJob implements LaJob {
logger.debug("jobLog: " + jobLog);
}
if (scheduledJob.isLoggingEnabled()) {
storeJobLog(jobLog);
jobHelper.store(jobLog);
}
}
}
private void storeJobLog(final JobLog jobLog) {
final JobLogService jobLogService = ComponentUtil.getComponent(JobLogService.class);
jobLogService.store(jobLog);
protected boolean isTarget(final String target) {
if (StringUtil.isBlank(target)) {
return true;
}
final String myName = ComponentUtil.getFessConfig().getSchedulerTargetName();
final String[] targets = target.split(",");
for (String name : targets) {
name = name.trim();
if (Constants.DEFAULT_JOB_TARGET.equalsIgnoreCase(name)) {
return true;
} else if (StringUtil.isNotBlank(myName) && myName.equalsIgnoreCase(name)) {
return true;
}
}
return false;
}
}

View file

@ -16,30 +16,21 @@
package org.codelibs.fess.app.service;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.codelibs.core.beans.util.BeanUtil;
import org.codelibs.core.lang.StringUtil;
import org.codelibs.fess.Constants;
import org.codelibs.fess.app.pager.SchedulerPager;
import org.codelibs.fess.es.config.cbean.ScheduledJobCB;
import org.codelibs.fess.es.config.exbhv.ScheduledJobBhv;
import org.codelibs.fess.es.config.exentity.ScheduledJob;
import org.codelibs.fess.job.ScheduledJobException;
import org.codelibs.fess.mylasta.direction.FessConfig;
import org.codelibs.fess.util.ComponentUtil;
import org.dbflute.cbean.result.PagingResultBean;
import org.dbflute.optional.OptionalEntity;
import org.dbflute.optional.OptionalThing;
import org.lastaflute.job.JobManager;
import org.lastaflute.job.LaCron;
import org.lastaflute.job.LaScheduledJob;
import org.lastaflute.job.key.LaJobUnique;
import org.lastaflute.job.subsidiary.CronParamsSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,16 +62,22 @@ public class ScheduledJobService implements Serializable {
return scheduledJobList;
}
public List<ScheduledJob> getScheduledJobListAfter(final long updateTime) {
return scheduledJobBhv.selectPage(cb -> {
cb.fetchFirst(fessConfig.getPageScheduledJobMaxFetchSizeAsInteger());
cb.query().setAvailable_Equal(Boolean.TRUE);
cb.query().setUpdatedTime_GreaterThan(updateTime);
});
}
public OptionalEntity<ScheduledJob> getScheduledJob(final String id) {
return scheduledJobBhv.selectByPK(id);
}
public void delete(final ScheduledJob scheduledJob) {
scheduledJobBhv.delete(scheduledJob, op -> {
op.setRefresh(true);
});
}
protected void setupListCondition(final ScheduledJobCB cb, final SchedulerPager scheduledJobPager) {
@ -106,12 +103,9 @@ public class ScheduledJobService implements Serializable {
}
public void store(final ScheduledJob scheduledJob) {
scheduledJobBhv.insertOrUpdate(scheduledJob, op -> {
op.setRefresh(true);
});
final JobManager jobManager = ComponentUtil.getJobManager();
jobManager.schedule(cron -> register(cron, scheduledJob));
}
public List<ScheduledJob> getCrawlerJobList() {
@ -123,107 +117,6 @@ public class ScheduledJobService implements Serializable {
});
}
protected void register(final LaCron cron, final ScheduledJob scheduledJob) {
if (scheduledJob == null) {
throw new ScheduledJobException("No job.");
}
final String id = scheduledJob.getId();
if (!Constants.T.equals(scheduledJob.getAvailable())) {
logger.info("Inactive Job " + id + ":" + scheduledJob.getName());
try {
unregister(scheduledJob);
} catch (final Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to delete Job " + scheduledJob, e);
}
}
return;
}
final String target = scheduledJob.getTarget();
if (!isTarget(target)) {
logger.info("Ignore Job " + id + ":" + scheduledJob.getName() + " because of not target: " + scheduledJob.getTarget());
return;
}
final CronParamsSupplier paramsOp = () -> {
final Map<String, Object> params = new HashMap<>();
params.put(Constants.SCHEDULED_JOB, scheduledJob);
return params;
};
findJobByUniqueOf(LaJobUnique.of(id)).ifPresent(job -> {
if (!job.isUnscheduled()) {
if (StringUtil.isNotBlank(scheduledJob.getCronExpression())) {
logger.info("Starting Job " + id + ":" + scheduledJob.getName());
final String cronExpression = scheduledJob.getCronExpression();
job.reschedule(cronExpression, op -> op.changeNoticeLogToDebug().params(paramsOp));
} else {
logger.info("Inactive Job " + id + ":" + scheduledJob.getName());
job.becomeNonCron();
}
} else if (StringUtil.isNotBlank(scheduledJob.getCronExpression())) {
logger.info("Starting Job " + id + ":" + scheduledJob.getName());
final String cronExpression = scheduledJob.getCronExpression();
job.reschedule(cronExpression, op -> op.changeNoticeLogToDebug().params(paramsOp));
}
}).orElse(
() -> {
if (StringUtil.isNotBlank(scheduledJob.getCronExpression())) {
logger.info("Starting Job " + id + ":" + scheduledJob.getName());
final String cronExpression = scheduledJob.getCronExpression();
cron.register(cronExpression, fessConfig.getSchedulerJobClassAsClass(),
fessConfig.getSchedulerConcurrentExecModeAsEnum(),
op -> op.uniqueBy(id).changeNoticeLogToDebug().params(paramsOp));
} else {
logger.info("Inactive Job " + id + ":" + scheduledJob.getName());
cron.registerNonCron(fessConfig.getSchedulerJobClassAsClass(), fessConfig.getSchedulerConcurrentExecModeAsEnum(),
op -> op.uniqueBy(id).changeNoticeLogToDebug().params(paramsOp));
}
});
}
private OptionalThing<LaScheduledJob> findJobByUniqueOf(final LaJobUnique jobUnique) {
final JobManager jobManager = ComponentUtil.getJobManager();
try {
return jobManager.findJobByUniqueOf(jobUnique);
} catch (final Exception e) {
return OptionalThing.empty();
}
}
public void unregister(final ScheduledJob scheduledJob) {
try {
final JobManager jobManager = ComponentUtil.getJobManager();
if (jobManager.isSchedulingDone()) {
jobManager.findJobByUniqueOf(LaJobUnique.of(scheduledJob.getId())).ifPresent(job -> {
job.unschedule();
}).orElse(() -> logger.debug("Job {} is not scheduled.", scheduledJob.getId()));
}
} catch (final Exception e) {
throw new ScheduledJobException("Failed to delete Job: " + scheduledJob, e);
}
}
protected boolean isTarget(final String target) {
if (StringUtil.isBlank(target)) {
return true;
}
final String myName = fessConfig.getSchedulerTargetName();
final String[] targets = target.split(",");
for (String name : targets) {
name = name.trim();
if (Constants.DEFAULT_JOB_TARGET.equalsIgnoreCase(name)) {
return true;
} else if (StringUtil.isNotBlank(myName) && myName.equalsIgnoreCase(name)) {
return true;
}
}
return false;
}
public void start(final LaCron cron) {
scheduledJobBhv.selectCursor(cb -> {
cb.query().setAvailable_Equal(Constants.T);
@ -231,7 +124,7 @@ public class ScheduledJobService implements Serializable {
cb.query().addOrderBy_Name_Asc();
}, scheduledJob -> {
try {
register(cron, scheduledJob);
ComponentUtil.getJobHelper().register(cron, scheduledJob);
} catch (final Exception e) {
logger.error("Failed to start Job " + scheduledJob.getId(), e);
}

View file

@ -0,0 +1,135 @@
/*
* Copyright 2012-2016 CodeLibs Project and the Others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.codelibs.fess.helper;
import java.util.HashMap;
import java.util.Map;
import org.codelibs.core.lang.StringUtil;
import org.codelibs.fess.Constants;
import org.codelibs.fess.es.config.exbhv.JobLogBhv;
import org.codelibs.fess.es.config.exbhv.ScheduledJobBhv;
import org.codelibs.fess.es.config.exentity.JobLog;
import org.codelibs.fess.es.config.exentity.ScheduledJob;
import org.codelibs.fess.job.ScheduledJobException;
import org.codelibs.fess.mylasta.direction.FessConfig;
import org.codelibs.fess.util.ComponentUtil;
import org.dbflute.optional.OptionalThing;
import org.lastaflute.job.JobManager;
import org.lastaflute.job.LaCron;
import org.lastaflute.job.LaScheduledJob;
import org.lastaflute.job.key.LaJobUnique;
import org.lastaflute.job.subsidiary.CronParamsSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JobHelper {
private static final Logger logger = LoggerFactory.getLogger(JobHelper.class);
public void register(final ScheduledJob scheduledJob) {
final JobManager jobManager = ComponentUtil.getJobManager();
jobManager.schedule(cron -> register(cron, scheduledJob));
}
public void register(final LaCron cron, final ScheduledJob scheduledJob) {
if (scheduledJob == null) {
throw new ScheduledJobException("No job.");
}
final String id = scheduledJob.getId();
if (!Constants.T.equals(scheduledJob.getAvailable())) {
logger.info("Inactive Job " + id + ":" + scheduledJob.getName());
try {
unregister(scheduledJob);
} catch (final Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to delete Job " + scheduledJob, e);
}
}
return;
}
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final CronParamsSupplier paramsOp = () -> {
final Map<String, Object> params = new HashMap<>();
params.put(Constants.SCHEDULED_JOB, scheduledJob);
return params;
};
findJobByUniqueOf(LaJobUnique.of(id)).ifPresent(job -> {
if (!job.isUnscheduled()) {
if (StringUtil.isNotBlank(scheduledJob.getCronExpression())) {
logger.info("Starting Job " + id + ":" + scheduledJob.getName());
final String cronExpression = scheduledJob.getCronExpression();
job.reschedule(cronExpression, op -> op.changeNoticeLogToDebug().params(paramsOp));
} else {
logger.info("Inactive Job " + id + ":" + scheduledJob.getName());
job.becomeNonCron();
}
} else if (StringUtil.isNotBlank(scheduledJob.getCronExpression())) {
logger.info("Starting Job " + id + ":" + scheduledJob.getName());
final String cronExpression = scheduledJob.getCronExpression();
job.reschedule(cronExpression, op -> op.changeNoticeLogToDebug().params(paramsOp));
}
}).orElse(
() -> {
if (StringUtil.isNotBlank(scheduledJob.getCronExpression())) {
logger.info("Starting Job " + id + ":" + scheduledJob.getName());
final String cronExpression = scheduledJob.getCronExpression();
cron.register(cronExpression, fessConfig.getSchedulerJobClassAsClass(),
fessConfig.getSchedulerConcurrentExecModeAsEnum(),
op -> op.uniqueBy(id).changeNoticeLogToDebug().params(paramsOp));
} else {
logger.info("Inactive Job " + id + ":" + scheduledJob.getName());
cron.registerNonCron(fessConfig.getSchedulerJobClassAsClass(), fessConfig.getSchedulerConcurrentExecModeAsEnum(),
op -> op.uniqueBy(id).changeNoticeLogToDebug().params(paramsOp));
}
});
}
private OptionalThing<LaScheduledJob> findJobByUniqueOf(final LaJobUnique jobUnique) {
final JobManager jobManager = ComponentUtil.getJobManager();
try {
return jobManager.findJobByUniqueOf(jobUnique);
} catch (final Exception e) {
return OptionalThing.empty();
}
}
public void unregister(final ScheduledJob scheduledJob) {
try {
final JobManager jobManager = ComponentUtil.getJobManager();
if (jobManager.isSchedulingDone()) {
jobManager.findJobByUniqueOf(LaJobUnique.of(scheduledJob.getId())).ifPresent(job -> {
job.unschedule();
}).orElse(() -> logger.debug("Job {} is not scheduled.", scheduledJob.getId()));
}
} catch (final Exception e) {
throw new ScheduledJobException("Failed to delete Job: " + scheduledJob, e);
}
}
public boolean isAvailable(String id) {
return ComponentUtil.getComponent(ScheduledJobBhv.class).selectByPK(id).filter(e -> Boolean.TRUE.equals(e.getAvailable()))
.isPresent();
}
public void store(JobLog jobLog) {
ComponentUtil.getComponent(JobLogBhv.class).insertOrUpdate(jobLog, op -> {
op.setRefresh(true);
});
}
}

View file

@ -227,7 +227,7 @@ public class CrawlJob {
}
if (jobExecutor != null) {
jobExecutor.addShutdownListener(() -> ComponentUtil.getJobHelper().destroyProcess(sessionId));
jobExecutor.addShutdownListener(() -> ComponentUtil.getProcessHelper().destroyProcess(sessionId));
}
try {
@ -248,7 +248,7 @@ public class CrawlJob {
final String cpSeparator = SystemUtils.IS_OS_WINDOWS ? ";" : ":";
final ServletContext servletContext = ComponentUtil.getComponent(ServletContext.class);
final SystemHelper systemHelper = ComponentUtil.getSystemHelper();
final ProcessHelper processHelper = ComponentUtil.getJobHelper();
final ProcessHelper processHelper = ComponentUtil.getProcessHelper();
final FessConfig fessConfig = ComponentUtil.getFessConfig();
cmdList.add(fessConfig.getJavaCommandPath());

View file

@ -102,7 +102,7 @@ public class SuggestJob {
}
resultBuf.append("Session Id: ").append(sessionId).append("\n");
if (jobExecutor != null) {
jobExecutor.addShutdownListener(() -> ComponentUtil.getJobHelper().destroyProcess(sessionId));
jobExecutor.addShutdownListener(() -> ComponentUtil.getProcessHelper().destroyProcess(sessionId));
}
try {
@ -120,7 +120,7 @@ public class SuggestJob {
final List<String> cmdList = new ArrayList<>();
final String cpSeparator = SystemUtils.IS_OS_WINDOWS ? ";" : ":";
final ServletContext servletContext = ComponentUtil.getComponent(ServletContext.class);
final ProcessHelper processHelper = ComponentUtil.getJobHelper();
final ProcessHelper processHelper = ComponentUtil.getProcessHelper();
final FessConfig fessConfig = ComponentUtil.getFessConfig();
cmdList.add(fessConfig.getJavaCommandPath());

View file

@ -556,6 +556,9 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
/** The key of the configuration. e.g. QUIT */
String SCHEDULER_CONCURRENT_EXEC_MODE = "scheduler.concurrent.exec.mode";
/** The key of the configuration. e.g. 30 */
String SCHEDULER_MONITOR_INTERVAL = "scheduler.monitor.interval";
/** The key of the configuration. e.g. http://fess.codelibs.org/{lang}/{version}/admin/ */
String ONLINE_HELP_BASE_LINK = "online.help.base.link";
@ -2628,6 +2631,21 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
*/
String getSchedulerConcurrentExecMode();
/**
* Get the value for the key 'scheduler.monitor.interval'. <br>
* The value is, e.g. 30 <br>
* @return The value of found property. (NotNull: if not found, exception but basically no way)
*/
String getSchedulerMonitorInterval();
/**
* Get the value for the key 'scheduler.monitor.interval' as {@link Integer}. <br>
* The value is, e.g. 30 <br>
* @return The value of found property. (NotNull: if not found, exception but basically no way)
* @throws NumberFormatException When the property is not integer.
*/
Integer getSchedulerMonitorIntervalAsInteger();
/**
* Get the value for the key 'online.help.base.link'. <br>
* The value is, e.g. http://fess.codelibs.org/{lang}/{version}/admin/ <br>
@ -4443,6 +4461,14 @@ public interface FessConfig extends FessEnv, org.codelibs.fess.mylasta.direction
return get(FessConfig.SCHEDULER_CONCURRENT_EXEC_MODE);
}
public String getSchedulerMonitorInterval() {
return get(FessConfig.SCHEDULER_MONITOR_INTERVAL);
}
public Integer getSchedulerMonitorIntervalAsInteger() {
return getAsInteger(FessConfig.SCHEDULER_MONITOR_INTERVAL);
}
public String getOnlineHelpBaseLink() {
return get(FessConfig.ONLINE_HELP_BASE_LINK);
}

View file

@ -35,6 +35,7 @@ import org.codelibs.fess.helper.DuplicateHostHelper;
import org.codelibs.fess.helper.FileTypeHelper;
import org.codelibs.fess.helper.IndexingHelper;
import org.codelibs.fess.helper.IntervalControlHelper;
import org.codelibs.fess.helper.JobHelper;
import org.codelibs.fess.helper.KeyMatchHelper;
import org.codelibs.fess.helper.LabelTypeHelper;
import org.codelibs.fess.helper.PathMappingHelper;
@ -100,7 +101,9 @@ public final class ComponentUtil {
private static final String WEB_API_MANAGER_FACTORY = "webApiManagerFactory";
private static final String JOB_HELPER = "processHelper";
private static final String PROCESS_HELPER = "processHelper";
private static final String JOB_HELPER = "jobHelper";
private static final String DUPLICATE_HOST_HELPER = "duplicateHostHelper";
@ -213,7 +216,11 @@ public final class ComponentUtil {
return getComponent(DUPLICATE_HOST_HELPER);
}
public static ProcessHelper getJobHelper() {
public static ProcessHelper getProcessHelper() {
return getComponent(PROCESS_HELPER);
}
public static JobHelper getJobHelper() {
return getComponent(JOB_HELPER);
}

View file

@ -18,6 +18,8 @@
<component name="activityHelper" class="org.codelibs.fess.helper.ActivityHelper">
</component>
<component name="jobHelper" class="org.codelibs.fess.helper.JobHelper">
</component>
<component name="labelTypeHelper" class="org.codelibs.fess.helper.LabelTypeHelper">
</component>
<component name="keyMatchHelper" class="org.codelibs.fess.helper.KeyMatchHelper">

View file

@ -322,6 +322,7 @@ mail.from.address = root@localhost
scheduler.target.name=
scheduler.job.class=org.codelibs.fess.app.job.ScriptExecutorJob
scheduler.concurrent.exec.mode=QUIT
scheduler.monitor.interval=30
# ----------------------------------------------------------
# OnlineHelp