1
2
3
4
5
6 package org.tailormap.api.scheduling;
7
8 import static ch.rasc.sse.eventbus.SseEvent.DEFAULT_EVENT;
9 import static org.tailormap.api.admin.model.ServerSentEvent.EventTypeEnum.TASK_PROGRESS;
10
11 import ch.rasc.sse.eventbus.SseEvent;
12 import ch.rasc.sse.eventbus.SseEventBus;
13 import com.fasterxml.jackson.core.JsonProcessingException;
14 import com.fasterxml.jackson.databind.ObjectMapper;
15 import java.lang.invoke.MethodHandles;
16 import java.time.Instant;
17 import java.util.UUID;
18 import org.quartz.DisallowConcurrentExecution;
19 import org.quartz.JobDataMap;
20 import org.quartz.JobDetail;
21 import org.quartz.JobExecutionContext;
22 import org.quartz.PersistJobDataAfterExecution;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.springframework.lang.NonNull;
26 import org.springframework.scheduling.quartz.QuartzJobBean;
27 import org.tailormap.api.admin.model.ServerSentEvent;
28 import org.tailormap.api.admin.model.TaskProgressEvent;
29
30
31 @DisallowConcurrentExecution
32 @PersistJobDataAfterExecution
33 public class PocTask extends QuartzJobBean implements Task {
34 private static final Logger logger =
35 LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
36
37 private String foo;
38 private String description;
39 private final SseEventBus eventBus;
40 private final ObjectMapper objectMapper;
41
42 public PocTask(SseEventBus eventBus, ObjectMapper objectMapper) {
43 this.eventBus = eventBus;
44 this.objectMapper = objectMapper;
45 }
46
47 @Override
48 protected void executeInternal(@NonNull JobExecutionContext context) {
49 final JobDetail jobDetail = context.getJobDetail();
50
51
52 final JobDataMap mergedJobDataMap = context.getMergedJobDataMap();
53
54
55 final JobDataMap jobDataMap = jobDetail.getJobDataMap();
56
57
58 logger.debug("foo: {}", getFoo());
59
60 logger.debug(
61 "executing POC task {}:{}, details: {}",
62 jobDetail.getKey().getGroup(),
63 jobDetail.getKey().getName(),
64 mergedJobDataMap.getWrappedMap());
65
66 try {
67 for (int i = 0; i < 110; i += 10) {
68
69 long workingTime = (long) (Math.random() * 5000);
70 logger.debug("Working for {} ms", workingTime);
71 Thread.sleep(workingTime);
72 logger.debug("POC task is at {}%", i);
73 context.setResult("POC task is at %d%%".formatted(i));
74 taskProgress(
75 new TaskProgressEvent()
76 .type(TaskType.POC.getValue())
77 .uuid(UUID.fromString(jobDetail.getKey().getName()))
78 .progress(i)
79 .total(100));
80 }
81 } catch (InterruptedException e) {
82 logger.error("Thread interrupted", e);
83 }
84
85 int executions = (1 + (int) mergedJobDataMap.getOrDefault("executions", 0));
86 jobDataMap.put("executions", executions);
87 jobDataMap.put("lastExecutionFinished", Instant.now());
88 jobDataMap.put(Task.LAST_RESULT_KEY, "POC task executed successfully");
89 context.setResult("POC task executed successfully");
90
91 setFoo("foo executed: " + executions);
92 }
93
94 @Override
95 public void taskProgress(TaskProgressEvent event) {
96 ServerSentEvent serverSentEvent = new ServerSentEvent().eventType(TASK_PROGRESS).details(event);
97 try {
98 eventBus.handleEvent(
99 SseEvent.of(DEFAULT_EVENT, objectMapper.writeValueAsString(serverSentEvent)));
100 } catch (JsonProcessingException e) {
101 logger.error("Error publishing poc task progress event", e);
102 }
103 }
104
105
106 public String getFoo() {
107 return foo;
108 }
109
110 public void setFoo(String foo) {
111 this.foo = foo;
112 }
113
114 @Override
115 public TaskType getType() {
116 return TaskType.POC;
117 }
118
119 @Override
120 public String getDescription() {
121 return description;
122 }
123
124 @Override
125 public void setDescription(String description) {
126 this.description = description;
127 }
128
129 }