View Javadoc
1   /*
2    * Copyright (C) 2024 B3Partners B.V.
3    *
4    * SPDX-License-Identifier: MIT
5    */
6   package org.tailormap.api.scheduling;
7   
8   import static ch.rasc.sse.eventbus.SseEvent.DEFAULT_EVENT;
9   import static org.tailormap.api.admin.model.ServerSentEvent.EventTypeEnum.TASK_PROGRESS;
10  
11  import ch.rasc.sse.eventbus.SseEvent;
12  import ch.rasc.sse.eventbus.SseEventBus;
13  import com.fasterxml.jackson.core.JsonProcessingException;
14  import com.fasterxml.jackson.databind.ObjectMapper;
15  import java.lang.invoke.MethodHandles;
16  import java.time.Instant;
17  import java.time.OffsetDateTime;
18  import java.time.ZoneId;
19  import java.util.Map;
20  import java.util.UUID;
21  import org.quartz.DisallowConcurrentExecution;
22  import org.quartz.JobDataMap;
23  import org.quartz.JobDetail;
24  import org.quartz.JobExecutionContext;
25  import org.quartz.PersistJobDataAfterExecution;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  import org.springframework.lang.NonNull;
29  import org.springframework.scheduling.quartz.QuartzJobBean;
30  import org.tailormap.api.admin.model.ServerSentEvent;
31  import org.tailormap.api.admin.model.TaskProgressEvent;
32  
33  /** POC task for testing purposes. */
34  @DisallowConcurrentExecution
35  @PersistJobDataAfterExecution
36  public class PocTask extends QuartzJobBean implements Task {
37    private static final Logger logger =
38        LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
39  
40    private String foo;
41    private String description;
42    private final SseEventBus eventBus;
43    private final ObjectMapper objectMapper;
44  
45    public PocTask(SseEventBus eventBus, ObjectMapper objectMapper) {
46      this.eventBus = eventBus;
47      this.objectMapper = objectMapper;
48    }
49  
50    @Override
51    protected void executeInternal(@NonNull JobExecutionContext context) {
52      final JobDetail jobDetail = context.getJobDetail();
53  
54      // NOTE: This immutable map is a snapshot of the job data maps at the time of the job execution.
55      final JobDataMap mergedJobDataMap = context.getMergedJobDataMap();
56  
57      // NOTE: This map is mutable and can be used to store job data.
58      final JobDataMap jobDataMap = jobDetail.getJobDataMap();
59  
60      // this.foo is set through QuartzJobBean
61      logger.debug("foo: {}", getFoo());
62  
63      logger.debug(
64          "executing POC task {}:{}, details: {}",
65          jobDetail.getKey().getGroup(),
66          jobDetail.getKey().getName(),
67          mergedJobDataMap.getWrappedMap());
68  
69      try {
70        TaskProgressEvent progressEvent = new TaskProgressEvent()
71            .startedAt(OffsetDateTime.now(ZoneId.systemDefault()))
72            .type(getType().getValue())
73            .taskData(Map.of("jobKey", jobDetail.getKey().getName()))
74            .uuid(UUID.fromString(jobDetail.getKey().getName()));
75  
76        for (int i = 0; i < 110; i += 10) {
77          // Simulate some work for a random period of time
78          long workingTime = (long) (Math.random() * 5000);
79          logger.debug("Working for {} ms", workingTime);
80          Thread.sleep(workingTime);
81          logger.debug("POC task is at {}%", i);
82          context.setResult("POC task is at %d%%".formatted(i));
83          taskProgress(progressEvent.progress(i).total(100));
84        }
85      } catch (InterruptedException e) {
86        logger.error("Thread interrupted", e);
87      }
88  
89      int executions = (1 + (int) mergedJobDataMap.getOrDefault(EXECUTION_COUNT_KEY, 0));
90      jobDataMap.put(EXECUTION_COUNT_KEY, executions);
91      jobDataMap.put(EXECUTION_FINISHED_KEY, Instant.now());
92      jobDataMap.put(Task.LAST_RESULT_KEY, "POC task executed successfully");
93      context.setResult("POC task executed successfully");
94  
95      setFoo("foo executed: " + executions);
96    }
97  
98    @Override
99    public void taskProgress(TaskProgressEvent event) {
100     ServerSentEvent serverSentEvent =
101         new ServerSentEvent().eventType(TASK_PROGRESS).details(event);
102     try {
103       eventBus.handleEvent(SseEvent.of(DEFAULT_EVENT, objectMapper.writeValueAsString(serverSentEvent)));
104     } catch (JsonProcessingException e) {
105       logger.error("Error publishing poc task progress event", e);
106     }
107   }
108 
109   // <editor-fold desc="Getters and Setters">
110   public String getFoo() {
111     return foo;
112   }
113 
114   public void setFoo(String foo) {
115     this.foo = foo;
116   }
117 
118   @Override
119   public TaskType getType() {
120     return TaskType.POC;
121   }
122 
123   @Override
124   public String getDescription() {
125     return description;
126   }
127 
128   @Override
129   public void setDescription(String description) {
130     this.description = description;
131   }
132   // </editor-fold>
133 }