TaskManagerService.java
/*
* Copyright (C) 2024 B3Partners B.V.
*
* SPDX-License-Identifier: MIT
*/
package org.tailormap.api.scheduling;
import static io.sentry.quartz.SentryJobListener.SENTRY_SLUG_KEY;
import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.UUID;
import org.quartz.CronScheduleBuilder;
import org.quartz.DateBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.ObjectAlreadyExistsException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Service;
@Service
public class TaskManagerService {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Scheduler scheduler;
public TaskManagerService(@Autowired Scheduler scheduler) {
this.scheduler = scheduler;
}
/**
* Create a one-time job and schedule it to start immediately.
*
* @param job the task class to create
* @param jobData a map with job data, the {@code type} and {@code description} keys are mandatory
* @return the task name, a hash of the description
* @throws SchedulerException if the job could not be scheduled
*/
public UUID createTask(Class<? extends QuartzJobBean> job, TMJobDataMap jobData) throws SchedulerException {
JobDetail jobDetail = JobBuilder.newJob(job)
.withIdentity(new JobKey(
UUID.randomUUID().toString(), jobData.get(Task.TYPE_KEY).toString()))
.withDescription(jobData.getDescription())
.usingJobData(new JobDataMap(jobData))
.storeDurably(false)
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobDetail.getKey().getName(), jobDetail.getKey().getGroup())
.startNow()
.withPriority(jobData.getPriority())
.usingJobData(SENTRY_SLUG_KEY, "monitor_slug_simple_trigger_" + jobData.get(Task.TYPE_KEY))
.forJob(jobDetail)
.build();
scheduler.scheduleJob(jobDetail, Set.of(trigger), true);
return UUID.fromString(jobDetail.getKey().getName());
}
/**
* Create a job and schedule it with a cron expression.
*
* @param job the task class to create
* @param jobData a map with job data, the {@code type} and {@code description} keys are mandatory
* @param cronExpression the cron expression
* @return the task name, a UUID
* @throws SchedulerException if the job could not be scheduled
*/
public UUID createTask(Class<? extends QuartzJobBean> job, TMJobDataMap jobData, String cronExpression)
throws SchedulerException {
// Create a job
JobDetail jobDetail = JobBuilder.newJob(job)
.withIdentity(new JobKey(
UUID.randomUUID().toString(), jobData.get(Task.TYPE_KEY).toString()))
.withDescription(jobData.getDescription())
.usingJobData(new JobDataMap(jobData))
.build();
// Create a trigger
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobDetail.getKey().getName(), jobDetail.getKey().getGroup())
.startAt(DateBuilder.futureDate(90, DateBuilder.IntervalUnit.SECOND))
.withPriority(jobData.getPriority())
.usingJobData(SENTRY_SLUG_KEY, "monitor_slug_cron_trigger_" + jobData.get(Task.TYPE_KEY))
.withSchedule(
CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionFireAndProceed())
.build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (ObjectAlreadyExistsException ex) {
logger.warn(
"Job {} with trigger {} has not bean added to scheduler as it already exists.",
jobDetail.getKey(),
trigger.getKey());
return null;
}
return UUID.fromString(jobDetail.getKey().getName());
}
/**
* Reschedule a task using updated job data.
*
* @param jobKey the job key
* @param newJobData the new job data
* @throws SchedulerException if the job could not be rescheduled
*/
public void updateTask(JobKey jobKey, TMJobDataMap newJobData) throws SchedulerException {
if (scheduler.checkExists(jobKey)) {
// there should only ever be one trigger for a job in TM
Trigger oldTrigger = scheduler.getTriggersOfJob(jobKey).getFirst();
if (scheduler.checkExists(oldTrigger.getKey())) {
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.putAll(newJobData);
Trigger newTrigger = TriggerBuilder.newTrigger()
.withIdentity(jobKey.getName(), jobKey.getGroup())
.startAt(DateBuilder.futureDate(90, DateBuilder.IntervalUnit.SECOND))
.withPriority(jobDataMap.getInt(Task.PRIORITY_KEY))
.usingJobData(
SENTRY_SLUG_KEY,
"monitor_slug_cron_trigger_"
+ jobDataMap.get(Task.TYPE_KEY).toString())
.withSchedule(CronScheduleBuilder.cronSchedule(jobDataMap.getString(Task.CRON_EXPRESSION_KEY))
.withMisfireHandlingInstructionFireAndProceed())
.build();
scheduler.addJob(jobDetail, true, true);
scheduler.rescheduleJob(oldTrigger.getKey(), newTrigger);
}
}
}
/**
* Get the job key for a given type and uuid.
*
* @param jobType the type of the job
* @param uuid the uuid of the job
* @return the job key
* @throws SchedulerException when the scheduler cannot be reached
*/
@Nullable public JobKey getJobKey(TaskType jobType, UUID uuid) throws SchedulerException {
logger.debug("Finding job key for task {}:{}", jobType, uuid);
return scheduler.getJobKeys(GroupMatcher.groupEquals(jobType.getValue())).stream()
.filter(jobkey -> jobkey.getName().equals(uuid.toString()))
.findFirst()
.orElse(null);
}
/**
* Delete all tasks in a specific group.
*
* @param groupName the name of the group to delete tasks from
* @throws SchedulerException if the scheduler cannot be reached or if there is an error deleting the tasks
*/
public void deleteTasksByGroupName(String groupName) throws SchedulerException {
logger.debug("Deleting tasks in group: {}", groupName);
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.groupEquals(groupName));
for (JobKey jobKey : jobKeys) {
logger.info("Deleting task: {}", jobKey);
scheduler.deleteJob(jobKey);
}
}
}