1
2
3
4
5
6 package org.tailormap.api.scheduling;
7
8 import static ch.rasc.sse.eventbus.SseEvent.DEFAULT_EVENT;
9 import static org.tailormap.api.admin.model.ServerSentEvent.EventTypeEnum.TASK_PROGRESS;
10
11 import ch.rasc.sse.eventbus.SseEvent;
12 import ch.rasc.sse.eventbus.SseEventBus;
13 import com.fasterxml.jackson.core.JsonProcessingException;
14 import com.fasterxml.jackson.databind.ObjectMapper;
15 import java.lang.invoke.MethodHandles;
16 import java.time.Instant;
17 import java.time.OffsetDateTime;
18 import java.time.ZoneId;
19 import java.util.Map;
20 import java.util.UUID;
21 import org.quartz.DisallowConcurrentExecution;
22 import org.quartz.JobDataMap;
23 import org.quartz.JobDetail;
24 import org.quartz.JobExecutionContext;
25 import org.quartz.PersistJobDataAfterExecution;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import org.springframework.lang.NonNull;
29 import org.springframework.scheduling.quartz.QuartzJobBean;
30 import org.tailormap.api.admin.model.ServerSentEvent;
31 import org.tailormap.api.admin.model.TaskProgressEvent;
32
33
34 @DisallowConcurrentExecution
35 @PersistJobDataAfterExecution
36 public class PocTask extends QuartzJobBean implements Task {
37 private static final Logger logger =
38 LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
39
40 private String foo;
41 private String description;
42 private final SseEventBus eventBus;
43 private final ObjectMapper objectMapper;
44
45 public PocTask(SseEventBus eventBus, ObjectMapper objectMapper) {
46 this.eventBus = eventBus;
47 this.objectMapper = objectMapper;
48 }
49
50 @Override
51 protected void executeInternal(@NonNull JobExecutionContext context) {
52 final JobDetail jobDetail = context.getJobDetail();
53
54
55 final JobDataMap mergedJobDataMap = context.getMergedJobDataMap();
56
57
58 final JobDataMap jobDataMap = jobDetail.getJobDataMap();
59
60
61 logger.debug("foo: {}", getFoo());
62
63 logger.debug(
64 "executing POC task {}:{}, details: {}",
65 jobDetail.getKey().getGroup(),
66 jobDetail.getKey().getName(),
67 mergedJobDataMap.getWrappedMap());
68
69 try {
70 TaskProgressEvent progressEvent = new TaskProgressEvent()
71 .startedAt(OffsetDateTime.now(ZoneId.systemDefault()))
72 .type(getType().getValue())
73 .taskData(Map.of("jobKey", jobDetail.getKey().getName()))
74 .uuid(UUID.fromString(jobDetail.getKey().getName()));
75
76 for (int i = 0; i < 110; i += 10) {
77
78 long workingTime = (long) (Math.random() * 5000);
79 logger.debug("Working for {} ms", workingTime);
80 Thread.sleep(workingTime);
81 logger.debug("POC task is at {}%", i);
82 context.setResult("POC task is at %d%%".formatted(i));
83 taskProgress(progressEvent.progress(i).total(100));
84 }
85 } catch (InterruptedException e) {
86 logger.error("Thread interrupted", e);
87 }
88
89 int executions = (1 + (int) mergedJobDataMap.getOrDefault(EXECUTION_COUNT_KEY, 0));
90 jobDataMap.put(EXECUTION_COUNT_KEY, executions);
91 jobDataMap.put(EXECUTION_FINISHED_KEY, Instant.now());
92 jobDataMap.put(Task.LAST_RESULT_KEY, "POC task executed successfully");
93 context.setResult("POC task executed successfully");
94
95 setFoo("foo executed: " + executions);
96 }
97
98 @Override
99 public void taskProgress(TaskProgressEvent event) {
100 ServerSentEvent serverSentEvent =
101 new ServerSentEvent().eventType(TASK_PROGRESS).details(event);
102 try {
103 eventBus.handleEvent(SseEvent.of(DEFAULT_EVENT, objectMapper.writeValueAsString(serverSentEvent)));
104 } catch (JsonProcessingException e) {
105 logger.error("Error publishing poc task progress event", e);
106 }
107 }
108
109
110 public String getFoo() {
111 return foo;
112 }
113
114 public void setFoo(String foo) {
115 this.foo = foo;
116 }
117
118 @Override
119 public TaskType getType() {
120 return TaskType.POC;
121 }
122
123 @Override
124 public String getDescription() {
125 return description;
126 }
127
128 @Override
129 public void setDescription(String description) {
130 this.description = description;
131 }
132
133 }