View Javadoc
1   /*
2    * Copyright (C) 2024 B3Partners B.V.
3    *
4    * SPDX-License-Identifier: MIT
5    */
6   package org.tailormap.api.controller.admin;
7   
8   import static java.net.HttpURLConnection.HTTP_ACCEPTED;
9   import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
10  import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
11  
12  import com.fasterxml.jackson.databind.ObjectMapper;
13  import com.fasterxml.jackson.databind.node.ObjectNode;
14  import io.swagger.v3.oas.annotations.Operation;
15  import io.swagger.v3.oas.annotations.media.Content;
16  import io.swagger.v3.oas.annotations.media.Schema;
17  import io.swagger.v3.oas.annotations.responses.ApiResponse;
18  import java.lang.invoke.MethodHandles;
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Objects;
22  import java.util.UUID;
23  import org.quartz.CronTrigger;
24  import org.quartz.InterruptableJob;
25  import org.quartz.JobDataMap;
26  import org.quartz.JobDetail;
27  import org.quartz.JobKey;
28  import org.quartz.Scheduler;
29  import org.quartz.SchedulerException;
30  import org.quartz.Trigger;
31  import org.quartz.TriggerKey;
32  import org.quartz.TriggerUtils;
33  import org.quartz.impl.matchers.GroupMatcher;
34  import org.quartz.spi.OperableTrigger;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  import org.springframework.beans.factory.annotation.Autowired;
38  import org.springframework.http.HttpStatus;
39  import org.springframework.http.HttpStatusCode;
40  import org.springframework.http.MediaType;
41  import org.springframework.http.ResponseEntity;
42  import org.springframework.web.bind.annotation.DeleteMapping;
43  import org.springframework.web.bind.annotation.GetMapping;
44  import org.springframework.web.bind.annotation.PathVariable;
45  import org.springframework.web.bind.annotation.PutMapping;
46  import org.springframework.web.bind.annotation.RequestParam;
47  import org.springframework.web.bind.annotation.RestController;
48  import org.springframework.web.server.ResponseStatusException;
49  import org.tailormap.api.repository.SearchIndexRepository;
50  import org.tailormap.api.scheduling.Task;
51  import org.tailormap.api.scheduling.TaskManagerService;
52  import org.tailormap.api.scheduling.TaskType;
53  
54  /**
55   * Admin controller for controlling the task scheduler. Not to be used to create new tasks, adding tasks belongs in the
56   * domain of the specific controller or Spring Data REST API as that requires specific configuration information.
57   * Provides the following endpoints:
58   *
59   * <ul>
60   *   <li>{@link #list /admin/tasks} to list all tasks, optionally filtered by type
61   *   <li>{@link #details /admin/tasks/{type}/{uuid}} to get the details of a task
62   *   <li>{@link #startTask /admin/tasks/{type}/{uuid}/start} to start a task
63   *   <li>{@link #stopTask /admin/tasks/{type}/{uuid}/stop} to stop a task
64   *   <li>{@link #delete /admin/tasks/{type}/{uuid}} to delete a task
65   * </ul>
66   */
67  @RestController
68  public class TaskAdminController {
69    private static final Logger logger =
70        LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
71  
72    private final Scheduler scheduler;
73    private final TaskManagerService taskManagerService;
74    private final SearchIndexRepository searchIndexRepository;
75  
76    public TaskAdminController(
77        @Autowired Scheduler scheduler,
78        @Autowired TaskManagerService taskManagerService,
79        @Autowired SearchIndexRepository searchIndexRepository) {
80      this.scheduler = scheduler;
81      this.taskManagerService = taskManagerService;
82      this.searchIndexRepository = searchIndexRepository;
83    }
84  
85    @Operation(summary = "List all tasks, optionally filtered by type", description = """
86  This will return a list of all tasks, optionally filtered by task type.
87  The state of the task is one of the Quartz Trigger states.
88  The state can be one of: NONE, NORMAL, PAUSED, COMPLETE, ERROR, BLOCKED or null in error conditions.
89  """)
90    @GetMapping(path = "${tailormap-api.admin.base-path}/tasks", produces = MediaType.APPLICATION_JSON_VALUE)
91    @ApiResponse(
92        responseCode = "200",
93        description = "List of all tasks, this list may be empty",
94        content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(example = """
95  {"tasks":[
96  {"uuid":"6308d26e-fe1e-4268-bb28-20db2cd06914","type":"index", "state":"NORMAL", "interruptable": false},
97  {"uuid":"d5ce9152-e90e-4b5a-b129-3b2366cabca8","type":"poc", "state": "BLOCKED", "interruptable": false},
98  {"uuid":"d5ce9152-e90e-4b5a-b129-3b2366cabca9","type":"poc", "state": "PAUSED", "interruptable": false},
99  {"uuid":"d5ce9152-e90e-4b5a-b129-3b2366cabca2","type":"poc", "state": "COMPLETE", "interruptable": false},
100 {"uuid":"d5ce9152-e90e-4b5a-b129-3b2366cabca3","type":"interuptablepoc", "state": "ERROR", "interruptable": true}
101 ]}
102 """)))
103   public ResponseEntity<Object> list(@RequestParam(required = false) String type) throws ResponseStatusException {
104     logger.debug("Listing all tasks (optional type filter: {})", (null == type ? "all" : type));
105     final List<ObjectNode> tasks = new ArrayList<>();
106 
107     final GroupMatcher<JobKey> groupMatcher =
108         (null == type ? GroupMatcher.anyGroup() : GroupMatcher.groupEquals(type));
109     try {
110       scheduler.getJobKeys(groupMatcher).stream()
111           .map(jobKey -> {
112             try {
113               return scheduler.getJobDetail(jobKey);
114             } catch (SchedulerException e) {
115               logger.error("Error getting task detail", e);
116               return null;
117             }
118           })
119           .filter(Objects::nonNull)
120           .forEach(jobDetail -> {
121             Trigger.TriggerState state;
122             try {
123               state = scheduler.getTriggerState(TriggerKey.triggerKey(
124                   jobDetail.getKey().getName(),
125                   jobDetail.getKey().getGroup()));
126             } catch (SchedulerException e) {
127               logger.error("Error getting task state", e);
128               state = null;
129             }
130             tasks.add(new ObjectMapper()
131                 .createObjectNode()
132                 .put(Task.TYPE_KEY, jobDetail.getKey().getGroup())
133                 .put(Task.UUID_KEY, jobDetail.getKey().getName())
134                 .put(
135                     Task.INTERRUPTABLE_KEY,
136                     InterruptableJob.class.isAssignableFrom(jobDetail.getJobClass()))
137                 .put(
138                     Task.DESCRIPTION_KEY,
139                     jobDetail.getJobDataMap().getString(Task.DESCRIPTION_KEY))
140                 .put(
141                     Task.LAST_RESULT_KEY,
142                     jobDetail.getJobDataMap().getString(Task.LAST_RESULT_KEY))
143                 .putPOJO(Task.STATE_KEY, state));
144           });
145     } catch (SchedulerException e) {
146       throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting tasks", e);
147     }
148 
149     return ResponseEntity.ok(new ObjectMapper()
150         .createObjectNode()
151         .set("tasks", new ObjectMapper().createArrayNode().addAll(tasks)));
152   }
153 
154   @Operation(summary = "List all details for a given task", description = """
155 This will return the details of the task, including the status, progress,
156 result and any other information.
157 """)
158   @GetMapping(
159       path = "${tailormap-api.admin.base-path}/tasks/{type}/{uuid}",
160       produces = MediaType.APPLICATION_JSON_VALUE)
161   @ApiResponse(
162       responseCode = "404",
163       description = "Task not found",
164       content =
165           @Content(
166               mediaType = MediaType.APPLICATION_JSON_VALUE,
167               schema = @Schema(example = "{\"message\":\"Task not found\",\"code\":404}")))
168   @ApiResponse(
169       responseCode = "200",
170       description = """
171 Details of the task. The response content will vay according to the type of task,
172 the most common fields are listed in the Task interface.
173 """,
174       content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(example = """
175 {
176 "type":"poc",
177 "uuid":"6308d26e-fe1e-4268-bb28-20db2cd06914",
178 "interruptable":false,
179 "description":"This is a poc task",
180 "startTime":"2024-06-06T12:00:00Z",
181 "nextTime":"2024-06-06T12:00:00Z",
182 "state":"NORMAL",
183 "progress":"...",
184 "result":"...",
185 "message":"something is happening"
186 "jobData":{ "type":"poc", "description":"This is a poc task" }
187 }
188 """)))
189   public ResponseEntity<Object> details(@PathVariable TaskType type, @PathVariable UUID uuid)
190       throws ResponseStatusException {
191     logger.debug("Getting task details for {}:{}", type, uuid);
192 
193     try {
194       JobKey jobKey = taskManagerService.getJobKey(type, uuid);
195       if (null == jobKey) {
196         return handleTaskNotFound();
197       }
198 
199       JobDetail jobDetail = scheduler.getJobDetail(jobKey);
200       JobDataMap jobDataMap = jobDetail.getJobDataMap();
201 
202       /* there should be only one */
203       Trigger trigger = scheduler.getTriggersOfJob(jobDetail.getKey()).getFirst();
204       CronTrigger cron = ((CronTrigger) trigger);
205 
206       final Object[] result = new Object[1];
207       scheduler.getCurrentlyExecutingJobs().stream()
208           .filter(Objects::nonNull)
209           .filter(jobExecutionContext ->
210               jobExecutionContext.getJobDetail().getKey().equals(jobKey))
211           .forEach(jobExecutionContext -> {
212             logger.debug(
213                 "currently executing job {} with trigger {}.",
214                 jobExecutionContext.getJobDetail().getKey(),
215                 jobExecutionContext.getTrigger().getKey());
216 
217             result[0] = jobExecutionContext.getResult();
218           });
219 
220       return ResponseEntity.ok(new ObjectMapper()
221           .createObjectNode()
222           // immutable uuid, type and description
223           .put(Task.TYPE_KEY, jobDetail.getKey().getGroup())
224           .put(Task.UUID_KEY, jobDetail.getKey().getName())
225           .put(Task.INTERRUPTABLE_KEY, InterruptableJob.class.isAssignableFrom(jobDetail.getJobClass()))
226           .put(Task.DESCRIPTION_KEY, jobDataMap.getString(Task.DESCRIPTION_KEY))
227           .put(Task.CRON_EXPRESSION_KEY, cron.getCronExpression())
228           // TODO / XXX we could add a human-readable description of the cron expression using
229           // eg.
230           //   com.cronutils:cron-utils like:
231           //     CronParser cronParser = new
232           //         CronParser(CronDefinitionBuilder.instanceDefinitionFor(QUARTZ));
233           //     CronDescriptor.instance(locale).describe(cronParser.parse(cronExpression));
234           //   this could also be done front-end using eg.
235           // https://www.npmjs.com/package/cronstrue
236           //   which has the advantage of knowing the required locale for the human
237           // .put("cronDescription", cron.getCronExpression())
238           .put("timezone", cron.getTimeZone().getID())
239           .putPOJO("startTime", trigger.getStartTime())
240           .putPOJO("lastTime", trigger.getPreviousFireTime())
241           .putPOJO("nextFireTimes", TriggerUtils.computeFireTimes((OperableTrigger) cron, null, 5))
242           .putPOJO(Task.STATE_KEY, scheduler.getTriggerState(trigger.getKey()))
243           .putPOJO("progress", result[0])
244           .put(Task.LAST_RESULT_KEY, jobDataMap.getString(Task.LAST_RESULT_KEY))
245           .putPOJO("jobData", jobDataMap));
246     } catch (SchedulerException e) {
247       throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting task", e);
248     }
249   }
250 
251   @Operation(summary = "Start a task", description = "This will start the task if it is not already running")
252   @PutMapping(
253       path = "${tailormap-api.admin.base-path}/tasks/{type}/{uuid}/start",
254       produces = MediaType.APPLICATION_JSON_VALUE)
255   @ApiResponse(
256       responseCode = "404",
257       description = "Task not found",
258       content =
259           @Content(
260               mediaType = MediaType.APPLICATION_JSON_VALUE,
261               schema = @Schema(example = "{\"message\":\"Task not found\",\"code\":404}")))
262   @ApiResponse(
263       responseCode = "202",
264       description = "Task is started",
265       content =
266           @Content(
267               mediaType = MediaType.APPLICATION_JSON_VALUE,
268               schema = @Schema(example = "{\"message\":\"Task starting accepted\",\"code\":202}")))
269   public ResponseEntity<Object> startTask(@PathVariable TaskType type, @PathVariable UUID uuid)
270       throws ResponseStatusException {
271     logger.debug("Starting task {}:{}", type, uuid);
272 
273     try {
274       JobKey jobKey = taskManagerService.getJobKey(type, uuid);
275       if (null == jobKey) {
276         return handleTaskNotFound();
277       }
278       scheduler.triggerJob(jobKey);
279       return ResponseEntity.status(HttpStatusCode.valueOf(HTTP_ACCEPTED))
280           .body(new ObjectMapper().createObjectNode().put("message", "Task starting accepted"));
281 
282     } catch (SchedulerException e) {
283       throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting task", e);
284     }
285   }
286 
287   @Operation(summary = "Stop a task irrevocably", description = """
288 This will stop a running task, if the task is not running, nothing will happen.
289 This can leave the application in an inconsistent state.
290 A task that is not interruptable cannot be stopped.
291 A stopped task cannot be restarted, it fire again depending on the schedule.
292 """)
293   @PutMapping(
294       path = "${tailormap-api.admin.base-path}/tasks/{type}/{uuid}/stop",
295       produces = MediaType.APPLICATION_JSON_VALUE)
296   @ApiResponse(
297       responseCode = "404",
298       description = "Task not found",
299       content =
300           @Content(
301               mediaType = MediaType.APPLICATION_JSON_VALUE,
302               schema = @Schema(example = "{\"message\":\"Task not found\", \"code\":404}")))
303   @ApiResponse(
304       responseCode = "202",
305       description = "Task is stopping",
306       content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(example = """
307 {
308 "message":"Task stopping accepted".
309 "succes":true
310 }
311 """)))
312   @ApiResponse(
313       responseCode = "400",
314       description = "The task cannot be stopped as it does not implement the InterruptableJob interface.",
315       content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(example = """
316 { "message":"Task cannot be stopped" }
317 """)))
318   public ResponseEntity<Object> stopTask(@PathVariable TaskType type, @PathVariable UUID uuid)
319       throws ResponseStatusException {
320     logger.debug("Stopping task {}:{}", type, uuid);
321 
322     try {
323       JobKey jobKey = taskManagerService.getJobKey(type, uuid);
324       if (null == jobKey) {
325         return handleTaskNotFound();
326       }
327 
328       if (InterruptableJob.class.isAssignableFrom(
329           scheduler.getJobDetail(jobKey).getJobClass())) {
330         boolean interrupted = scheduler.interrupt(jobKey);
331         return ResponseEntity.status(HttpStatusCode.valueOf(HTTP_ACCEPTED))
332             .body(new ObjectMapper()
333                 .createObjectNode()
334                 .put("message", "Task stopping accepted")
335                 .put("succes", interrupted));
336       } else {
337         return ResponseEntity.status(HttpStatusCode.valueOf(HTTP_BAD_REQUEST))
338             .body(new ObjectMapper()
339                 .createObjectNode()
340                 .put("message", "Task cannot be stopped")
341                 .put("succes", false));
342       }
343 
344     } catch (SchedulerException e) {
345       throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting task", e);
346     }
347   }
348 
349   @Operation(
350       summary = "Delete a task",
351       description = "This will remove the task from the scheduler and delete all information about the task")
352   @DeleteMapping(
353       path = "${tailormap-api.admin.base-path}/tasks/{type}/{uuid}",
354       produces = MediaType.APPLICATION_JSON_VALUE)
355   @ApiResponse(
356       responseCode = "404",
357       description = "Task not found",
358       content =
359           @Content(
360               mediaType = MediaType.APPLICATION_JSON_VALUE,
361               schema = @Schema(example = "{\"message\":\"Task not found\"}")))
362   @ApiResponse(responseCode = "204", description = "Task is deleted")
363   public ResponseEntity<Object> delete(@PathVariable TaskType type, @PathVariable UUID uuid)
364       throws ResponseStatusException {
365 
366     try {
367       JobKey jobKey = taskManagerService.getJobKey(type, uuid);
368       if (null == jobKey) {
369         return handleTaskNotFound();
370       }
371 
372       boolean succes = scheduler.deleteJob(jobKey);
373       logger.info("Task {}:{} deletion {}", type, uuid, (succes ? "succeeded" : "failed"));
374       // cleanup the schedule from the business objects
375       switch (type) {
376         case INDEX:
377           deleteScheduleFromSearchIndex(uuid);
378           break;
379         case PROMETHEUS_PING:
380         // no action required, as these are not managed in Tailormap
381         default:
382           break;
383       }
384       return ResponseEntity.noContent().build();
385     } catch (SchedulerException e) {
386       throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error getting task", e);
387     }
388   }
389 
390   private void deleteScheduleFromSearchIndex(UUID uuid) {
391     searchIndexRepository.findByTaskScheduleUuid(uuid).forEach(searchIndex -> {
392       searchIndex.setSchedule(null);
393       searchIndexRepository.save(searchIndex);
394     });
395   }
396 
397   private ResponseEntity<Object> handleTaskNotFound() {
398     return ResponseEntity.status(HttpStatusCode.valueOf(HTTP_NOT_FOUND))
399         .contentType(MediaType.APPLICATION_JSON)
400         .body(new ObjectMapper()
401             .createObjectNode()
402             .put("message", "Task not found")
403             .put("code", HTTP_NOT_FOUND));
404   }
405 }