IndexTask.java
/*
* Copyright (C) 2024 B3Partners B.V.
*
* SPDX-License-Identifier: MIT
*/
package org.tailormap.api.scheduling;
import static ch.rasc.sse.eventbus.SseEvent.DEFAULT_EVENT;
import static org.tailormap.api.admin.model.ServerSentEvent.EventTypeEnum.TASK_PROGRESS;
import ch.rasc.sse.eventbus.SseEvent;
import ch.rasc.sse.eventbus.SseEventBus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.UUID;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrException;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.tailormap.api.admin.model.SearchIndexSummary;
import org.tailormap.api.admin.model.ServerSentEvent;
import org.tailormap.api.admin.model.TaskProgressEvent;
import org.tailormap.api.geotools.featuresources.FeatureSourceFactoryHelper;
import org.tailormap.api.persistence.SearchIndex;
import org.tailormap.api.persistence.TMFeatureType;
import org.tailormap.api.repository.FeatureTypeRepository;
import org.tailormap.api.repository.SearchIndexRepository;
import org.tailormap.api.solr.SolrHelper;
import org.tailormap.api.solr.SolrService;
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class IndexTask extends QuartzJobBean implements Task {
public static final String INDEX_KEY = "indexId";
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final FeatureSourceFactoryHelper featureSourceFactoryHelper;
private final FeatureTypeRepository featureTypeRepository;
private final SearchIndexRepository searchIndexRepository;
private final SolrService solrService;
private final SseEventBus eventBus;
private final ObjectMapper objectMapper;
@Value("${tailormap-api.solr-batch-size:1000}")
private int solrBatchSize;
@Value("${tailormap-api.solr-geometry-validation-rule:repairBuffer0}")
private String solrGeometryValidationRule;
private long indexId;
private String description;
public IndexTask(
@Autowired SearchIndexRepository searchIndexRepository,
@Autowired FeatureTypeRepository featureTypeRepository,
@Autowired FeatureSourceFactoryHelper featureSourceFactoryHelper,
@Autowired SolrService solrService,
@Autowired SseEventBus eventBus,
@Autowired ObjectMapper objectMapper) {
this.featureSourceFactoryHelper = featureSourceFactoryHelper;
this.solrService = solrService;
this.featureTypeRepository = featureTypeRepository;
this.searchIndexRepository = searchIndexRepository;
this.eventBus = eventBus;
this.objectMapper = objectMapper;
}
@Timed(value = "indexTask", description = "Time taken to execute index task")
@Counted(value = "indexTaskCount", description = "Number of times index task executed")
@Override
protected void executeInternal(@NonNull JobExecutionContext context) throws JobExecutionException {
final JobDataMap persistedJobData = context.getJobDetail().getJobDataMap();
logger.info(
"Start Executing IndexTask {} for index {}, described with '{}'",
context.getJobDetail().getKey(),
getIndexId(),
getDescription());
SearchIndex searchIndex = searchIndexRepository
.findById(getIndexId())
.orElseThrow(() -> new JobExecutionException("Search index not found"));
TMFeatureType indexingFT = featureTypeRepository
.findById(searchIndex.getFeatureTypeId())
.orElseThrow(() -> new JobExecutionException("Feature type for indexing not found"));
try (SolrClient solrClient = solrService.getSolrClientForIndexing();
SolrHelper solrHelper = new SolrHelper(solrClient)
.withBatchSize(solrBatchSize)
.withGeometryValidationRule(solrGeometryValidationRule)) {
persistedJobData.put(EXECUTION_FINISHED_KEY, null);
persistedJobData.put(LAST_RESULT_KEY, null);
searchIndex = searchIndexRepository.save(searchIndex.setStatus(SearchIndex.Status.INDEXING));
searchIndex = solrHelper.addFeatureTypeIndex(
searchIndex,
indexingFT,
featureSourceFactoryHelper,
searchIndexRepository,
this::taskProgress,
UUID.fromString(context.getTrigger().getJobKey().getName()));
searchIndex = searchIndexRepository.save(searchIndex.setStatus(SearchIndex.Status.INDEXED));
persistedJobData.put(
EXECUTION_COUNT_KEY,
(1 + (int) context.getMergedJobDataMap().getOrDefault(EXECUTION_COUNT_KEY, 0)));
persistedJobData.put(EXECUTION_FINISHED_KEY, Instant.now());
persistedJobData.put(LAST_RESULT_KEY, "Index task executed successfully");
context.setResult("Index task executed successfully");
} catch (UnsupportedOperationException | IOException | SolrServerException | SolrException e) {
logger.error("Error indexing", e);
persistedJobData.put(EXECUTION_FINISHED_KEY, null);
persistedJobData.put(
LAST_RESULT_KEY, "Index task failed with " + e.getMessage() + ". Check logs for details");
searchIndexRepository.save(searchIndex
.setStatus(SearchIndex.Status.ERROR)
.setSummary(new SearchIndexSummary().errorMessage(e.getMessage())));
context.setResult("Error indexing. Check logs for details.");
throw new JobExecutionException("Error indexing", e);
}
}
@Override
public void taskProgress(TaskProgressEvent event) {
ServerSentEvent serverSentEvent =
new ServerSentEvent().eventType(TASK_PROGRESS).details(event);
try {
eventBus.handleEvent(SseEvent.of(DEFAULT_EVENT, objectMapper.writeValueAsString(serverSentEvent)));
} catch (JsonProcessingException e) {
logger.error("Error publishing indexing task progress event", e);
}
}
// <editor-fold desc="Getters and Setters">
@Override
public TaskType getType() {
return TaskType.INDEX;
}
public long getIndexId() {
return indexId;
}
public void setIndexId(long indexId) {
this.indexId = indexId;
}
@Override
public String getDescription() {
return description;
}
@Override
public void setDescription(String description) {
this.description = description;
}
// </editor-fold>
}