View Javadoc
1   /*
2    * Copyright (C) 2024 B3Partners B.V.
3    *
4    * SPDX-License-Identifier: MIT
5    */
6   package org.tailormap.api.scheduling;
7   
8   import static ch.rasc.sse.eventbus.SseEvent.DEFAULT_EVENT;
9   import static org.tailormap.api.admin.model.ServerSentEvent.EventTypeEnum.TASK_PROGRESS;
10  
11  import ch.rasc.sse.eventbus.SseEvent;
12  import ch.rasc.sse.eventbus.SseEventBus;
13  import com.fasterxml.jackson.core.JsonProcessingException;
14  import com.fasterxml.jackson.databind.ObjectMapper;
15  import io.micrometer.core.annotation.Timed;
16  import java.io.IOException;
17  import java.lang.invoke.MethodHandles;
18  import java.time.Instant;
19  import java.util.UUID;
20  import org.apache.solr.client.solrj.SolrClient;
21  import org.apache.solr.client.solrj.SolrServerException;
22  import org.apache.solr.common.SolrException;
23  import org.quartz.DisallowConcurrentExecution;
24  import org.quartz.JobDataMap;
25  import org.quartz.JobExecutionContext;
26  import org.quartz.JobExecutionException;
27  import org.quartz.PersistJobDataAfterExecution;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  import org.springframework.beans.factory.annotation.Autowired;
31  import org.springframework.beans.factory.annotation.Value;
32  import org.springframework.lang.NonNull;
33  import org.springframework.scheduling.quartz.QuartzJobBean;
34  import org.tailormap.api.admin.model.ServerSentEvent;
35  import org.tailormap.api.admin.model.TaskProgressEvent;
36  import org.tailormap.api.geotools.featuresources.FeatureSourceFactoryHelper;
37  import org.tailormap.api.persistence.SearchIndex;
38  import org.tailormap.api.persistence.TMFeatureType;
39  import org.tailormap.api.repository.FeatureTypeRepository;
40  import org.tailormap.api.repository.SearchIndexRepository;
41  import org.tailormap.api.solr.SolrHelper;
42  import org.tailormap.api.solr.SolrService;
43  
44  @DisallowConcurrentExecution
45  @PersistJobDataAfterExecution
46  public class IndexTask extends QuartzJobBean implements Task {
47    public static final String INDEX_KEY = "indexId";
48    private static final Logger logger =
49        LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
50    private final FeatureSourceFactoryHelper featureSourceFactoryHelper;
51    private final FeatureTypeRepository featureTypeRepository;
52    private final SearchIndexRepository searchIndexRepository;
53    private final SolrService solrService;
54    private final SseEventBus eventBus;
55    private final ObjectMapper objectMapper;
56  
57    @Value("${tailormap-api.solr-batch-size:1000}")
58    private int solrBatchSize;
59  
60    @Value("${tailormap-api.solr-geometry-validation-rule:repairBuffer0}")
61    private String solrGeometryValidationRule;
62  
63    private long indexId;
64    private String description;
65  
66    public IndexTask(
67        @Autowired SearchIndexRepository searchIndexRepository,
68        @Autowired FeatureTypeRepository featureTypeRepository,
69        @Autowired FeatureSourceFactoryHelper featureSourceFactoryHelper,
70        @Autowired SolrService solrService,
71        @Autowired SseEventBus eventBus,
72        @Autowired ObjectMapper objectMapper) {
73  
74      this.featureSourceFactoryHelper = featureSourceFactoryHelper;
75      this.solrService = solrService;
76      this.featureTypeRepository = featureTypeRepository;
77      this.searchIndexRepository = searchIndexRepository;
78      this.eventBus = eventBus;
79      this.objectMapper = objectMapper;
80    }
81  
82    @Timed(value = "indexTask", description = "Time taken to execute index task")
83    @Override
84    protected void executeInternal(@NonNull JobExecutionContext context)
85        throws JobExecutionException {
86  
87      final JobDataMap persistedJobData = context.getJobDetail().getJobDataMap();
88      // final long indexId = persistedJobData.getLong(INDEX_KEY);
89      logger.info(
90          "Start Executing IndexTask {} for index {}, described with '{}'",
91          context.getJobDetail().getKey(),
92          getIndexId(),
93          getDescription());
94  
95      SearchIndex searchIndex =
96          searchIndexRepository
97              .findById(getIndexId())
98              .orElseThrow(() -> new JobExecutionException("Search index not found"));
99  
100     TMFeatureType indexingFT =
101         featureTypeRepository
102             .findById(searchIndex.getFeatureTypeId())
103             .orElseThrow(() -> new JobExecutionException("Feature type not found"));
104 
105     try (SolrClient solrClient = solrService.getSolrClientForIndexing();
106         SolrHelper solrHelper =
107             new SolrHelper(solrClient)
108                 .withBatchSize(solrBatchSize)
109                 .withGeometryValidationRule(solrGeometryValidationRule)) {
110 
111       searchIndex.setStatus(SearchIndex.Status.INDEXING);
112       searchIndex = searchIndexRepository.save(searchIndex);
113 
114       searchIndex =
115           solrHelper.addFeatureTypeIndex(
116               searchIndex,
117               indexingFT,
118               featureSourceFactoryHelper,
119               searchIndexRepository,
120               this::taskProgress,
121               UUID.fromString(context.getTrigger().getJobKey().getName()));
122       searchIndex = searchIndex.setStatus(SearchIndex.Status.INDEXED);
123       searchIndexRepository.save(searchIndex);
124       persistedJobData.put(
125           "executions", (1 + (int) context.getMergedJobDataMap().getOrDefault("executions", 0)));
126       persistedJobData.put("lastExecutionFinished", Instant.now());
127       persistedJobData.put(Task.LAST_RESULT_KEY, "Index task executed successfully");
128       context.setResult("Index task executed successfully");
129     } catch (UnsupportedOperationException | IOException | SolrServerException | SolrException e) {
130       logger.error("Error indexing", e);
131       searchIndex.setStatus(SearchIndex.Status.ERROR).setComment(e.getMessage());
132       persistedJobData.put("lastExecutionFinished", null);
133       persistedJobData.put(
134           Task.LAST_RESULT_KEY,
135           "Index task failed with " + e.getMessage() + ". Check logs for details");
136       searchIndexRepository.save(searchIndex);
137       context.setResult("Error indexing. Check logs for details.");
138       throw new JobExecutionException("Error indexing", e);
139     }
140   }
141 
142   @Override
143   public void taskProgress(TaskProgressEvent event) {
144     ServerSentEvent serverSentEvent = new ServerSentEvent().eventType(TASK_PROGRESS).details(event);
145     try {
146       eventBus.handleEvent(
147           SseEvent.of(DEFAULT_EVENT, objectMapper.writeValueAsString(serverSentEvent)));
148     } catch (JsonProcessingException e) {
149       logger.error("Error publishing indexing task progress event", e);
150     }
151   }
152 
153   // <editor-fold desc="Getters and Setters">
154   @Override
155   public TaskType getType() {
156     return TaskType.INDEX;
157   }
158 
159   public long getIndexId() {
160     return indexId;
161   }
162 
163   public void setIndexId(long indexId) {
164     this.indexId = indexId;
165   }
166 
167   @Override
168   public String getDescription() {
169     return description;
170   }
171 
172   @Override
173   public void setDescription(String description) {
174     this.description = description;
175   }
176   // </editor-fold>
177 }