View Javadoc
1   /*
2    * Copyright (C) 2024 B3Partners B.V.
3    *
4    * SPDX-License-Identifier: MIT
5    */
6   package org.tailormap.api.scheduling;
7   
8   import static ch.rasc.sse.eventbus.SseEvent.DEFAULT_EVENT;
9   import static org.tailormap.api.admin.model.ServerSentEvent.EventTypeEnum.TASK_PROGRESS;
10  
11  import ch.rasc.sse.eventbus.SseEvent;
12  import ch.rasc.sse.eventbus.SseEventBus;
13  import com.fasterxml.jackson.core.JsonProcessingException;
14  import com.fasterxml.jackson.databind.ObjectMapper;
15  import io.micrometer.core.annotation.Counted;
16  import io.micrometer.core.annotation.Timed;
17  import java.io.IOException;
18  import java.lang.invoke.MethodHandles;
19  import java.time.Instant;
20  import java.util.UUID;
21  import org.apache.solr.client.solrj.SolrClient;
22  import org.apache.solr.client.solrj.SolrServerException;
23  import org.apache.solr.common.SolrException;
24  import org.quartz.DisallowConcurrentExecution;
25  import org.quartz.JobDataMap;
26  import org.quartz.JobExecutionContext;
27  import org.quartz.JobExecutionException;
28  import org.quartz.PersistJobDataAfterExecution;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  import org.springframework.beans.factory.annotation.Autowired;
32  import org.springframework.beans.factory.annotation.Value;
33  import org.springframework.lang.NonNull;
34  import org.springframework.scheduling.quartz.QuartzJobBean;
35  import org.tailormap.api.admin.model.SearchIndexSummary;
36  import org.tailormap.api.admin.model.ServerSentEvent;
37  import org.tailormap.api.admin.model.TaskProgressEvent;
38  import org.tailormap.api.geotools.featuresources.FeatureSourceFactoryHelper;
39  import org.tailormap.api.persistence.SearchIndex;
40  import org.tailormap.api.persistence.TMFeatureType;
41  import org.tailormap.api.repository.FeatureTypeRepository;
42  import org.tailormap.api.repository.SearchIndexRepository;
43  import org.tailormap.api.solr.SolrHelper;
44  import org.tailormap.api.solr.SolrService;
45  
46  @DisallowConcurrentExecution
47  @PersistJobDataAfterExecution
48  public class IndexTask extends QuartzJobBean implements Task {
49    public static final String INDEX_KEY = "indexId";
50    private static final Logger logger =
51        LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
52    private final FeatureSourceFactoryHelper featureSourceFactoryHelper;
53    private final FeatureTypeRepository featureTypeRepository;
54    private final SearchIndexRepository searchIndexRepository;
55    private final SolrService solrService;
56    private final SseEventBus eventBus;
57    private final ObjectMapper objectMapper;
58  
59    @Value("${tailormap-api.solr-batch-size:1000}")
60    private int solrBatchSize;
61  
62    @Value("${tailormap-api.solr-geometry-validation-rule:repairBuffer0}")
63    private String solrGeometryValidationRule;
64  
65    private long indexId;
66    private String description;
67  
68    public IndexTask(
69        @Autowired SearchIndexRepository searchIndexRepository,
70        @Autowired FeatureTypeRepository featureTypeRepository,
71        @Autowired FeatureSourceFactoryHelper featureSourceFactoryHelper,
72        @Autowired SolrService solrService,
73        @Autowired SseEventBus eventBus,
74        @Autowired ObjectMapper objectMapper) {
75  
76      this.featureSourceFactoryHelper = featureSourceFactoryHelper;
77      this.solrService = solrService;
78      this.featureTypeRepository = featureTypeRepository;
79      this.searchIndexRepository = searchIndexRepository;
80      this.eventBus = eventBus;
81      this.objectMapper = objectMapper;
82    }
83  
84    @Timed(value = "indexTask", description = "Time taken to execute index task")
85    @Counted(value = "indexTaskCount", description = "Number of times index task executed")
86    @Override
87    protected void executeInternal(@NonNull JobExecutionContext context) throws JobExecutionException {
88  
89      final JobDataMap persistedJobData = context.getJobDetail().getJobDataMap();
90      logger.info(
91          "Start Executing IndexTask {} for index {}, described with '{}'",
92          context.getJobDetail().getKey(),
93          getIndexId(),
94          getDescription());
95  
96      SearchIndex searchIndex = searchIndexRepository
97          .findById(getIndexId())
98          .orElseThrow(() -> new JobExecutionException("Search index not found"));
99  
100     TMFeatureType indexingFT = featureTypeRepository
101         .findById(searchIndex.getFeatureTypeId())
102         .orElseThrow(() -> new JobExecutionException("Feature type for indexing not found"));
103 
104     try (SolrClient solrClient = solrService.getSolrClientForIndexing();
105         SolrHelper solrHelper = new SolrHelper(solrClient)
106             .withBatchSize(solrBatchSize)
107             .withGeometryValidationRule(solrGeometryValidationRule)) {
108 
109       persistedJobData.put(EXECUTION_FINISHED_KEY, null);
110       persistedJobData.put(LAST_RESULT_KEY, null);
111       searchIndex = searchIndexRepository.save(searchIndex.setStatus(SearchIndex.Status.INDEXING));
112 
113       searchIndex = solrHelper.addFeatureTypeIndex(
114           searchIndex,
115           indexingFT,
116           featureSourceFactoryHelper,
117           searchIndexRepository,
118           this::taskProgress,
119           UUID.fromString(context.getTrigger().getJobKey().getName()));
120       searchIndex = searchIndexRepository.save(searchIndex.setStatus(SearchIndex.Status.INDEXED));
121       persistedJobData.put(
122           EXECUTION_COUNT_KEY,
123           (1 + (int) context.getMergedJobDataMap().getOrDefault(EXECUTION_COUNT_KEY, 0)));
124       persistedJobData.put(EXECUTION_FINISHED_KEY, Instant.now());
125       persistedJobData.put(LAST_RESULT_KEY, "Index task executed successfully");
126       context.setResult("Index task executed successfully");
127     } catch (UnsupportedOperationException | IOException | SolrServerException | SolrException e) {
128       logger.error("Error indexing", e);
129       persistedJobData.put(EXECUTION_FINISHED_KEY, null);
130       persistedJobData.put(
131           LAST_RESULT_KEY, "Index task failed with " + e.getMessage() + ". Check logs for details");
132       searchIndexRepository.save(searchIndex
133           .setStatus(SearchIndex.Status.ERROR)
134           .setSummary(new SearchIndexSummary().errorMessage(e.getMessage())));
135       context.setResult("Error indexing. Check logs for details.");
136       throw new JobExecutionException("Error indexing", e);
137     }
138   }
139 
140   @Override
141   public void taskProgress(TaskProgressEvent event) {
142     ServerSentEvent serverSentEvent =
143         new ServerSentEvent().eventType(TASK_PROGRESS).details(event);
144     try {
145       eventBus.handleEvent(SseEvent.of(DEFAULT_EVENT, objectMapper.writeValueAsString(serverSentEvent)));
146     } catch (JsonProcessingException e) {
147       logger.error("Error publishing indexing task progress event", e);
148     }
149   }
150 
151   // <editor-fold desc="Getters and Setters">
152   @Override
153   public TaskType getType() {
154     return TaskType.INDEX;
155   }
156 
157   public long getIndexId() {
158     return indexId;
159   }
160 
161   public void setIndexId(long indexId) {
162     this.indexId = indexId;
163   }
164 
165   @Override
166   public String getDescription() {
167     return description;
168   }
169 
170   @Override
171   public void setDescription(String description) {
172     this.description = description;
173   }
174   // </editor-fold>
175 }