1
2
3
4
5
6 package org.tailormap.api.scheduling;
7
8 import static ch.rasc.sse.eventbus.SseEvent.DEFAULT_EVENT;
9 import static org.tailormap.api.admin.model.ServerSentEvent.EventTypeEnum.TASK_PROGRESS;
10
11 import ch.rasc.sse.eventbus.SseEvent;
12 import ch.rasc.sse.eventbus.SseEventBus;
13 import com.fasterxml.jackson.core.JsonProcessingException;
14 import com.fasterxml.jackson.databind.ObjectMapper;
15 import io.micrometer.core.annotation.Counted;
16 import io.micrometer.core.annotation.Timed;
17 import java.io.IOException;
18 import java.lang.invoke.MethodHandles;
19 import java.time.Instant;
20 import java.util.UUID;
21 import org.apache.solr.client.solrj.SolrClient;
22 import org.apache.solr.client.solrj.SolrServerException;
23 import org.apache.solr.common.SolrException;
24 import org.quartz.DisallowConcurrentExecution;
25 import org.quartz.JobDataMap;
26 import org.quartz.JobExecutionContext;
27 import org.quartz.JobExecutionException;
28 import org.quartz.PersistJobDataAfterExecution;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.beans.factory.annotation.Value;
33 import org.springframework.lang.NonNull;
34 import org.springframework.scheduling.quartz.QuartzJobBean;
35 import org.tailormap.api.admin.model.SearchIndexSummary;
36 import org.tailormap.api.admin.model.ServerSentEvent;
37 import org.tailormap.api.admin.model.TaskProgressEvent;
38 import org.tailormap.api.geotools.featuresources.FeatureSourceFactoryHelper;
39 import org.tailormap.api.persistence.SearchIndex;
40 import org.tailormap.api.persistence.TMFeatureType;
41 import org.tailormap.api.repository.FeatureTypeRepository;
42 import org.tailormap.api.repository.SearchIndexRepository;
43 import org.tailormap.api.solr.SolrHelper;
44 import org.tailormap.api.solr.SolrService;
45
46 @DisallowConcurrentExecution
47 @PersistJobDataAfterExecution
48 public class IndexTask extends QuartzJobBean implements Task {
49 public static final String INDEX_KEY = "indexId";
50 private static final Logger logger =
51 LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
52 private final FeatureSourceFactoryHelper featureSourceFactoryHelper;
53 private final FeatureTypeRepository featureTypeRepository;
54 private final SearchIndexRepository searchIndexRepository;
55 private final SolrService solrService;
56 private final SseEventBus eventBus;
57 private final ObjectMapper objectMapper;
58
59 @Value("${tailormap-api.solr-batch-size:1000}")
60 private int solrBatchSize;
61
62 @Value("${tailormap-api.solr-geometry-validation-rule:repairBuffer0}")
63 private String solrGeometryValidationRule;
64
65 private long indexId;
66 private String description;
67
68 public IndexTask(
69 @Autowired SearchIndexRepository searchIndexRepository,
70 @Autowired FeatureTypeRepository featureTypeRepository,
71 @Autowired FeatureSourceFactoryHelper featureSourceFactoryHelper,
72 @Autowired SolrService solrService,
73 @Autowired SseEventBus eventBus,
74 @Autowired ObjectMapper objectMapper) {
75
76 this.featureSourceFactoryHelper = featureSourceFactoryHelper;
77 this.solrService = solrService;
78 this.featureTypeRepository = featureTypeRepository;
79 this.searchIndexRepository = searchIndexRepository;
80 this.eventBus = eventBus;
81 this.objectMapper = objectMapper;
82 }
83
84 @Timed(value = "indexTask", description = "Time taken to execute index task")
85 @Counted(value = "indexTaskCount", description = "Number of times index task executed")
86 @Override
87 protected void executeInternal(@NonNull JobExecutionContext context) throws JobExecutionException {
88
89 final JobDataMap persistedJobData = context.getJobDetail().getJobDataMap();
90 logger.info(
91 "Start Executing IndexTask {} for index {}, described with '{}'",
92 context.getJobDetail().getKey(),
93 getIndexId(),
94 getDescription());
95
96 SearchIndex searchIndex = searchIndexRepository
97 .findById(getIndexId())
98 .orElseThrow(() -> new JobExecutionException("Search index not found"));
99
100 TMFeatureType indexingFT = featureTypeRepository
101 .findById(searchIndex.getFeatureTypeId())
102 .orElseThrow(() -> new JobExecutionException("Feature type for indexing not found"));
103
104 try (SolrClient solrClient = solrService.getSolrClientForIndexing();
105 SolrHelper solrHelper = new SolrHelper(solrClient)
106 .withBatchSize(solrBatchSize)
107 .withGeometryValidationRule(solrGeometryValidationRule)) {
108
109 persistedJobData.put(EXECUTION_FINISHED_KEY, null);
110 persistedJobData.put(LAST_RESULT_KEY, null);
111 searchIndex = searchIndexRepository.save(searchIndex.setStatus(SearchIndex.Status.INDEXING));
112
113 searchIndex = solrHelper.addFeatureTypeIndex(
114 searchIndex,
115 indexingFT,
116 featureSourceFactoryHelper,
117 searchIndexRepository,
118 this::taskProgress,
119 UUID.fromString(context.getTrigger().getJobKey().getName()));
120 searchIndex = searchIndexRepository.save(searchIndex.setStatus(SearchIndex.Status.INDEXED));
121 persistedJobData.put(
122 EXECUTION_COUNT_KEY,
123 (1 + (int) context.getMergedJobDataMap().getOrDefault(EXECUTION_COUNT_KEY, 0)));
124 persistedJobData.put(EXECUTION_FINISHED_KEY, Instant.now());
125 persistedJobData.put(LAST_RESULT_KEY, "Index task executed successfully");
126 context.setResult("Index task executed successfully");
127 } catch (UnsupportedOperationException | IOException | SolrServerException | SolrException e) {
128 logger.error("Error indexing", e);
129 persistedJobData.put(EXECUTION_FINISHED_KEY, null);
130 persistedJobData.put(
131 LAST_RESULT_KEY, "Index task failed with " + e.getMessage() + ". Check logs for details");
132 searchIndexRepository.save(searchIndex
133 .setStatus(SearchIndex.Status.ERROR)
134 .setSummary(new SearchIndexSummary().errorMessage(e.getMessage())));
135 context.setResult("Error indexing. Check logs for details.");
136 throw new JobExecutionException("Error indexing", e);
137 }
138 }
139
140 @Override
141 public void taskProgress(TaskProgressEvent event) {
142 ServerSentEvent serverSentEvent =
143 new ServerSentEvent().eventType(TASK_PROGRESS).details(event);
144 try {
145 eventBus.handleEvent(SseEvent.of(DEFAULT_EVENT, objectMapper.writeValueAsString(serverSentEvent)));
146 } catch (JsonProcessingException e) {
147 logger.error("Error publishing indexing task progress event", e);
148 }
149 }
150
151
152 @Override
153 public TaskType getType() {
154 return TaskType.INDEX;
155 }
156
157 public long getIndexId() {
158 return indexId;
159 }
160
161 public void setIndexId(long indexId) {
162 this.indexId = indexId;
163 }
164
165 @Override
166 public String getDescription() {
167 return description;
168 }
169
170 @Override
171 public void setDescription(String description) {
172 this.description = description;
173 }
174
175 }