1
2
3
4
5
6 package org.tailormap.api.scheduling;
7
8 import static ch.rasc.sse.eventbus.SseEvent.DEFAULT_EVENT;
9 import static org.tailormap.api.admin.model.ServerSentEvent.EventTypeEnum.TASK_PROGRESS;
10
11 import ch.rasc.sse.eventbus.SseEvent;
12 import ch.rasc.sse.eventbus.SseEventBus;
13 import com.fasterxml.jackson.core.JsonProcessingException;
14 import com.fasterxml.jackson.databind.ObjectMapper;
15 import io.micrometer.core.annotation.Timed;
16 import java.io.IOException;
17 import java.lang.invoke.MethodHandles;
18 import java.time.Instant;
19 import java.util.UUID;
20 import org.apache.solr.client.solrj.SolrClient;
21 import org.apache.solr.client.solrj.SolrServerException;
22 import org.apache.solr.common.SolrException;
23 import org.quartz.DisallowConcurrentExecution;
24 import org.quartz.JobDataMap;
25 import org.quartz.JobExecutionContext;
26 import org.quartz.JobExecutionException;
27 import org.quartz.PersistJobDataAfterExecution;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.springframework.beans.factory.annotation.Autowired;
31 import org.springframework.beans.factory.annotation.Value;
32 import org.springframework.lang.NonNull;
33 import org.springframework.scheduling.quartz.QuartzJobBean;
34 import org.tailormap.api.admin.model.ServerSentEvent;
35 import org.tailormap.api.admin.model.TaskProgressEvent;
36 import org.tailormap.api.geotools.featuresources.FeatureSourceFactoryHelper;
37 import org.tailormap.api.persistence.SearchIndex;
38 import org.tailormap.api.persistence.TMFeatureType;
39 import org.tailormap.api.repository.FeatureTypeRepository;
40 import org.tailormap.api.repository.SearchIndexRepository;
41 import org.tailormap.api.solr.SolrHelper;
42 import org.tailormap.api.solr.SolrService;
43
44 @DisallowConcurrentExecution
45 @PersistJobDataAfterExecution
46 public class IndexTask extends QuartzJobBean implements Task {
47 public static final String INDEX_KEY = "indexId";
48 private static final Logger logger =
49 LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
50 private final FeatureSourceFactoryHelper featureSourceFactoryHelper;
51 private final FeatureTypeRepository featureTypeRepository;
52 private final SearchIndexRepository searchIndexRepository;
53 private final SolrService solrService;
54 private final SseEventBus eventBus;
55 private final ObjectMapper objectMapper;
56
57 @Value("${tailormap-api.solr-batch-size:1000}")
58 private int solrBatchSize;
59
60 @Value("${tailormap-api.solr-geometry-validation-rule:repairBuffer0}")
61 private String solrGeometryValidationRule;
62
63 private long indexId;
64 private String description;
65
66 public IndexTask(
67 @Autowired SearchIndexRepository searchIndexRepository,
68 @Autowired FeatureTypeRepository featureTypeRepository,
69 @Autowired FeatureSourceFactoryHelper featureSourceFactoryHelper,
70 @Autowired SolrService solrService,
71 @Autowired SseEventBus eventBus,
72 @Autowired ObjectMapper objectMapper) {
73
74 this.featureSourceFactoryHelper = featureSourceFactoryHelper;
75 this.solrService = solrService;
76 this.featureTypeRepository = featureTypeRepository;
77 this.searchIndexRepository = searchIndexRepository;
78 this.eventBus = eventBus;
79 this.objectMapper = objectMapper;
80 }
81
82 @Timed(value = "indexTask", description = "Time taken to execute index task")
83 @Override
84 protected void executeInternal(@NonNull JobExecutionContext context)
85 throws JobExecutionException {
86
87 final JobDataMap persistedJobData = context.getJobDetail().getJobDataMap();
88
89 logger.info(
90 "Start Executing IndexTask {} for index {}, described with '{}'",
91 context.getJobDetail().getKey(),
92 getIndexId(),
93 getDescription());
94
95 SearchIndex searchIndex =
96 searchIndexRepository
97 .findById(getIndexId())
98 .orElseThrow(() -> new JobExecutionException("Search index not found"));
99
100 TMFeatureType indexingFT =
101 featureTypeRepository
102 .findById(searchIndex.getFeatureTypeId())
103 .orElseThrow(() -> new JobExecutionException("Feature type not found"));
104
105 try (SolrClient solrClient = solrService.getSolrClientForIndexing();
106 SolrHelper solrHelper =
107 new SolrHelper(solrClient)
108 .withBatchSize(solrBatchSize)
109 .withGeometryValidationRule(solrGeometryValidationRule)) {
110
111 searchIndex.setStatus(SearchIndex.Status.INDEXING);
112 searchIndex = searchIndexRepository.save(searchIndex);
113
114 searchIndex =
115 solrHelper.addFeatureTypeIndex(
116 searchIndex,
117 indexingFT,
118 featureSourceFactoryHelper,
119 searchIndexRepository,
120 this::taskProgress,
121 UUID.fromString(context.getTrigger().getJobKey().getName()));
122 searchIndex = searchIndex.setStatus(SearchIndex.Status.INDEXED);
123 searchIndexRepository.save(searchIndex);
124 persistedJobData.put(
125 "executions", (1 + (int) context.getMergedJobDataMap().getOrDefault("executions", 0)));
126 persistedJobData.put("lastExecutionFinished", Instant.now());
127 persistedJobData.put(Task.LAST_RESULT_KEY, "Index task executed successfully");
128 context.setResult("Index task executed successfully");
129 } catch (UnsupportedOperationException | IOException | SolrServerException | SolrException e) {
130 logger.error("Error indexing", e);
131 searchIndex.setStatus(SearchIndex.Status.ERROR).setComment(e.getMessage());
132 persistedJobData.put("lastExecutionFinished", null);
133 persistedJobData.put(
134 Task.LAST_RESULT_KEY,
135 "Index task failed with " + e.getMessage() + ". Check logs for details");
136 searchIndexRepository.save(searchIndex);
137 context.setResult("Error indexing. Check logs for details.");
138 throw new JobExecutionException("Error indexing", e);
139 }
140 }
141
142 @Override
143 public void taskProgress(TaskProgressEvent event) {
144 ServerSentEvent serverSentEvent = new ServerSentEvent().eventType(TASK_PROGRESS).details(event);
145 try {
146 eventBus.handleEvent(
147 SseEvent.of(DEFAULT_EVENT, objectMapper.writeValueAsString(serverSentEvent)));
148 } catch (JsonProcessingException e) {
149 logger.error("Error publishing indexing task progress event", e);
150 }
151 }
152
153
154 @Override
155 public TaskType getType() {
156 return TaskType.INDEX;
157 }
158
159 public long getIndexId() {
160 return indexId;
161 }
162
163 public void setIndexId(long indexId) {
164 this.indexId = indexId;
165 }
166
167 @Override
168 public String getDescription() {
169 return description;
170 }
171
172 @Override
173 public void setDescription(String description) {
174 this.description = description;
175 }
176
177 }