TaskAdminController.java
/*
* Copyright (C) 2024 B3Partners B.V.
*
* SPDX-License-Identifier: MIT
*/
package org.tailormap.api.controller.admin;
import static java.net.HttpURLConnection.HTTP_ACCEPTED;
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.quartz.CronTrigger;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.quartz.TriggerUtils;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.spi.OperableTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import org.tailormap.api.repository.SearchIndexRepository;
import org.tailormap.api.scheduling.Task;
import org.tailormap.api.scheduling.TaskManagerService;
import org.tailormap.api.scheduling.TaskType;
/**
* Admin controller for controlling the task scheduler. Not to be used to create new tasks, adding tasks belongs in the
* domain of the specific controller or Spring Data REST API as that requires specific configuration information.
* Provides the following endpoints:
*
* <ul>
* <li>{@link #list /admin/tasks} to list all tasks, optionally filtered by type
* <li>{@link #details /admin/tasks/{type}/{uuid}} to get the details of a task
* <li>{@link #startTask /admin/tasks/{type}/{uuid}/start} to start a task
* <li>{@link #stopTask /admin/tasks/{type}/{uuid}/stop} to stop a task
* <li>{@link #delete /admin/tasks/{type}/{uuid}} to delete a task
* </ul>
*/
@RestController
public class TaskAdminController {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Scheduler scheduler;
private final TaskManagerService taskManagerService;
private final SearchIndexRepository searchIndexRepository;
public TaskAdminController(
@Autowired Scheduler scheduler,
@Autowired TaskManagerService taskManagerService,
@Autowired SearchIndexRepository searchIndexRepository) {
this.scheduler = scheduler;
this.taskManagerService = taskManagerService;
this.searchIndexRepository = searchIndexRepository;
}
@Operation(
summary = "List all tasks, optionally filtered by type",
description =
"""
This will return a list of all tasks, optionally filtered by task type.
The state of the task is one of the Quartz Trigger states.
The state can be one of: NONE, NORMAL, PAUSED, COMPLETE, ERROR, BLOCKED or null in error conditions.
""")
@GetMapping(path = "${tailormap-api.admin.base-path}/tasks", produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponse(
responseCode = "200",
description = "List of all tasks, this list may be empty",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema =
@Schema(
example =
"""
{"tasks":[
{"uuid":"6308d26e-fe1e-4268-bb28-20db2cd06914","type":"index", "state":"NORMAL", "interruptable": false},
{"uuid":"d5ce9152-e90e-4b5a-b129-3b2366cabca8","type":"poc", "state": "BLOCKED", "interruptable": false},
{"uuid":"d5ce9152-e90e-4b5a-b129-3b2366cabca9","type":"poc", "state": "PAUSED", "interruptable": false},
{"uuid":"d5ce9152-e90e-4b5a-b129-3b2366cabca2","type":"poc", "state": "COMPLETE", "interruptable": false},
{"uuid":"d5ce9152-e90e-4b5a-b129-3b2366cabca3","type":"interuptablepoc", "state": "ERROR", "interruptable": true}
]}
""")))
public ResponseEntity<Object> list(@RequestParam(required = false) String type) throws ResponseStatusException {
logger.debug("Listing all tasks (optional type filter: {})", (null == type ? "all" : type));
final List<ObjectNode> tasks = new ArrayList<>();
final GroupMatcher<JobKey> groupMatcher =
(null == type ? GroupMatcher.anyGroup() : GroupMatcher.groupEquals(type));
try {
scheduler.getJobKeys(groupMatcher).stream()
.map(jobKey -> {
try {
return scheduler.getJobDetail(jobKey);
} catch (SchedulerException e) {
logger.error("Error getting task detail", e);
return null;
}
})
.filter(Objects::nonNull)
.forEach(jobDetail -> {
Trigger.TriggerState state;
try {
state = scheduler.getTriggerState(TriggerKey.triggerKey(
jobDetail.getKey().getName(),
jobDetail.getKey().getGroup()));
} catch (SchedulerException e) {
logger.error("Error getting task state", e);
state = null;
}
tasks.add(new ObjectMapper()
.createObjectNode()
.put(Task.TYPE_KEY, jobDetail.getKey().getGroup())
.put(Task.UUID_KEY, jobDetail.getKey().getName())
.put(
Task.INTERRUPTABLE_KEY,
InterruptableJob.class.isAssignableFrom(jobDetail.getJobClass()))
.put(
Task.DESCRIPTION_KEY,
jobDetail.getJobDataMap().getString(Task.DESCRIPTION_KEY))
.put(
Task.LAST_RESULT_KEY,
jobDetail.getJobDataMap().getString(Task.LAST_RESULT_KEY))
.putPOJO(Task.STATE_KEY, state));
});
} catch (SchedulerException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting tasks", e);
}
return ResponseEntity.ok(new ObjectMapper()
.createObjectNode()
.set("tasks", new ObjectMapper().createArrayNode().addAll(tasks)));
}
@Operation(
summary = "List all details for a given task",
description =
"""
This will return the details of the task, including the status, progress,
result and any other information.
""")
@GetMapping(
path = "${tailormap-api.admin.base-path}/tasks/{type}/{uuid}",
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponse(
responseCode = "404",
description = "Task not found",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema = @Schema(example = "{\"message\":\"Task not found\",\"code\":404}")))
@ApiResponse(
responseCode = "200",
description =
"""
Details of the task. The response content will vay according to the type of task,
the most common fields are listed in the Task interface.
""",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema =
@Schema(
example =
"""
{
"type":"poc",
"uuid":"6308d26e-fe1e-4268-bb28-20db2cd06914",
"interruptable":false,
"description":"This is a poc task",
"startTime":"2024-06-06T12:00:00Z",
"nextTime":"2024-06-06T12:00:00Z",
"state":"NORMAL",
"progress":"...",
"result":"...",
"message":"something is happening"
"jobData":{ "type":"poc", "description":"This is a poc task" }
}
""")))
public ResponseEntity<Object> details(@PathVariable TaskType type, @PathVariable UUID uuid)
throws ResponseStatusException {
logger.debug("Getting task details for {}:{}", type, uuid);
try {
JobKey jobKey = taskManagerService.getJobKey(type, uuid);
if (null == jobKey) {
return handleTaskNotFound();
}
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
JobDataMap jobDataMap = jobDetail.getJobDataMap();
/* there should be only one */
Trigger trigger = scheduler.getTriggersOfJob(jobDetail.getKey()).getFirst();
CronTrigger cron = ((CronTrigger) trigger);
final Object[] result = new Object[1];
scheduler.getCurrentlyExecutingJobs().stream()
.filter(Objects::nonNull)
.filter(jobExecutionContext ->
jobExecutionContext.getJobDetail().getKey().equals(jobKey))
.forEach(jobExecutionContext -> {
logger.debug(
"currently executing job {} with trigger {}.",
jobExecutionContext.getJobDetail().getKey(),
jobExecutionContext.getTrigger().getKey());
result[0] = jobExecutionContext.getResult();
});
return ResponseEntity.ok(new ObjectMapper()
.createObjectNode()
// immutable uuid, type and description
.put(Task.TYPE_KEY, jobDetail.getKey().getGroup())
.put(Task.UUID_KEY, jobDetail.getKey().getName())
.put(Task.INTERRUPTABLE_KEY, InterruptableJob.class.isAssignableFrom(jobDetail.getJobClass()))
.put(Task.DESCRIPTION_KEY, jobDataMap.getString(Task.DESCRIPTION_KEY))
.put(Task.CRON_EXPRESSION_KEY, cron.getCronExpression())
// TODO / XXX we could add a human-readable description of the cron expression using
// eg.
// com.cronutils:cron-utils like:
// CronParser cronParser = new
// CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
// CronDescriptor.instance(locale).describe(cronParser.parse(cronExpression));
// this could also be done front-end using eg.
// https://www.npmjs.com/package/cronstrue
// which has the advantage of knowing the required locale for the human
// .put("cronDescription", cron.getCronExpression())
.put("timezone", cron.getTimeZone().getID())
.putPOJO("startTime", trigger.getStartTime())
.putPOJO("lastTime", trigger.getPreviousFireTime())
.putPOJO("nextFireTimes", TriggerUtils.computeFireTimes((OperableTrigger) cron, null, 5))
.putPOJO(Task.STATE_KEY, scheduler.getTriggerState(trigger.getKey()))
.putPOJO("progress", result[0])
.put(Task.LAST_RESULT_KEY, jobDataMap.getString(Task.LAST_RESULT_KEY))
.putPOJO("jobData", jobDataMap));
} catch (SchedulerException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting task", e);
}
}
@Operation(summary = "Start a task", description = "This will start the task if it is not already running")
@PutMapping(
path = "${tailormap-api.admin.base-path}/tasks/{type}/{uuid}/start",
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponse(
responseCode = "404",
description = "Task not found",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema = @Schema(example = "{\"message\":\"Task not found\",\"code\":404}")))
@ApiResponse(
responseCode = "202",
description = "Task is started",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema = @Schema(example = "{\"message\":\"Task starting accepted\",\"code\":202}")))
public ResponseEntity<Object> startTask(@PathVariable TaskType type, @PathVariable UUID uuid)
throws ResponseStatusException {
logger.debug("Starting task {}:{}", type, uuid);
try {
JobKey jobKey = taskManagerService.getJobKey(type, uuid);
if (null == jobKey) {
return handleTaskNotFound();
}
scheduler.triggerJob(jobKey);
return ResponseEntity.status(HttpStatusCode.valueOf(HTTP_ACCEPTED))
.body(new ObjectMapper().createObjectNode().put("message", "Task starting accepted"));
} catch (SchedulerException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting task", e);
}
}
@Operation(
summary = "Stop a task irrevocably",
description =
"""
This will stop a running task, if the task is not running, nothing will happen.
This can leave the application in an inconsistent state.
A task that is not interruptable cannot be stopped.
A stopped task cannot be restarted, it fire again depending on the schedule.
""")
@PutMapping(
path = "${tailormap-api.admin.base-path}/tasks/{type}/{uuid}/stop",
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponse(
responseCode = "404",
description = "Task not found",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema = @Schema(example = "{\"message\":\"Task not found\", \"code\":404}")))
@ApiResponse(
responseCode = "202",
description = "Task is stopping",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema = @Schema(example = """
{
"message":"Task stopping accepted".
"succes":true
}
""")))
@ApiResponse(
responseCode = "400",
description = "The task cannot be stopped as it does not implement the InterruptableJob interface.",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema = @Schema(example = """
{ "message":"Task cannot be stopped" }
""")))
public ResponseEntity<Object> stopTask(@PathVariable TaskType type, @PathVariable UUID uuid)
throws ResponseStatusException {
logger.debug("Stopping task {}:{}", type, uuid);
try {
JobKey jobKey = taskManagerService.getJobKey(type, uuid);
if (null == jobKey) {
return handleTaskNotFound();
}
if (InterruptableJob.class.isAssignableFrom(
scheduler.getJobDetail(jobKey).getJobClass())) {
boolean interrupted = scheduler.interrupt(jobKey);
return ResponseEntity.status(HttpStatusCode.valueOf(HTTP_ACCEPTED))
.body(new ObjectMapper()
.createObjectNode()
.put("message", "Task stopping accepted")
.put("succes", interrupted));
} else {
return ResponseEntity.status(HttpStatusCode.valueOf(HTTP_BAD_REQUEST))
.body(new ObjectMapper()
.createObjectNode()
.put("message", "Task cannot be stopped")
.put("succes", false));
}
} catch (SchedulerException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting task", e);
}
}
@Operation(
summary = "Delete a task",
description = "This will remove the task from the scheduler and delete all information about the task")
@DeleteMapping(
path = "${tailormap-api.admin.base-path}/tasks/{type}/{uuid}",
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponse(
responseCode = "404",
description = "Task not found",
content =
@Content(
mediaType = MediaType.APPLICATION_JSON_VALUE,
schema = @Schema(example = "{\"message\":\"Task not found\"}")))
@ApiResponse(responseCode = "204", description = "Task is deleted")
public ResponseEntity<Object> delete(@PathVariable TaskType type, @PathVariable UUID uuid)
throws ResponseStatusException {
try {
JobKey jobKey = taskManagerService.getJobKey(type, uuid);
if (null == jobKey) {
return handleTaskNotFound();
}
boolean succes = scheduler.deleteJob(jobKey);
logger.info("Task {}:{} deletion {}", type, uuid, (succes ? "succeeded" : "failed"));
// cleanup the schedule from the business objects
switch (type) {
case INDEX:
deleteScheduleFromSearchIndex(uuid);
break;
case PROMETHEUS_PING:
// no action required, as these are not managed in Tailormap
default:
break;
}
return ResponseEntity.noContent().build();
} catch (SchedulerException e) {
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting task", e);
}
}
private void deleteScheduleFromSearchIndex(UUID uuid) {
searchIndexRepository.findByTaskScheduleUuid(uuid).forEach(searchIndex -> {
searchIndex.setSchedule(null);
searchIndexRepository.save(searchIndex);
});
}
private ResponseEntity<Object> handleTaskNotFound() {
return ResponseEntity.status(HttpStatusCode.valueOf(HTTP_NOT_FOUND))
.contentType(MediaType.APPLICATION_JSON)
.body(new ObjectMapper()
.createObjectNode()
.put("message", "Task not found")
.put("code", HTTP_NOT_FOUND));
}
}