View Javadoc
1   /*
2    * Copyright (C) 2024 B3Partners B.V.
3    *
4    * SPDX-License-Identifier: MIT
5    */
6   package org.tailormap.api.solr;
7   
8   import jakarta.validation.constraints.NotNull;
9   import java.io.IOException;
10  import java.lang.invoke.MethodHandles;
11  import java.time.Duration;
12  import java.time.Instant;
13  import java.time.ZoneId;
14  import java.util.ArrayList;
15  import java.util.HashSet;
16  import java.util.List;
17  import java.util.Map;
18  import java.util.Set;
19  import org.apache.solr.client.solrj.SolrClient;
20  import org.apache.solr.client.solrj.SolrQuery;
21  import org.apache.solr.client.solrj.SolrServerException;
22  import org.apache.solr.client.solrj.request.schema.SchemaRequest;
23  import org.apache.solr.client.solrj.response.QueryResponse;
24  import org.apache.solr.client.solrj.response.UpdateResponse;
25  import org.apache.solr.client.solrj.response.schema.SchemaResponse;
26  import org.apache.solr.common.SolrDocumentList;
27  import org.apache.solr.common.SolrException;
28  import org.geotools.api.data.Query;
29  import org.geotools.api.data.SimpleFeatureSource;
30  import org.geotools.api.feature.simple.SimpleFeature;
31  import org.geotools.data.simple.SimpleFeatureCollection;
32  import org.geotools.data.simple.SimpleFeatureIterator;
33  import org.locationtech.jts.geom.Geometry;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.tailormap.api.geotools.featuresources.FeatureSourceFactoryHelper;
37  import org.tailormap.api.geotools.processing.GeometryProcessor;
38  import org.tailormap.api.persistence.SearchIndex;
39  import org.tailormap.api.persistence.TMFeatureType;
40  import org.tailormap.api.util.Constants;
41  import org.tailormap.api.viewer.model.SearchDocument;
42  import org.tailormap.api.viewer.model.SearchResponse;
43  
44  /**
45   * Solr utility/wrapper class. This class provides methods to add or update a full-text feature type
46   * index for a layer, find in the index for a layer, and clear the index for a layer. It also
47   * provides a method to close the Solr client as well as automatically closing the client when used
48   * in a try-with-resources.
49   */
50  public class SolrHelper implements AutoCloseable, Constants {
51    private final SolrClient solrClient;
52  
53    private static final Logger logger =
54        LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
55  
56    public static final int SOLR_BATCH_SIZE = 1000;
57    // milliseconds
58    private static final int SOLR_TIMEOUT = 7000;
59  
60    /**
61     * Constructor
62     *
63     * @param solrClient the Solr client, this will be closed when this class is closed
64     */
65    public SolrHelper(@NotNull SolrClient solrClient) {
66      this.solrClient = solrClient;
67    }
68  
69    /**
70     * Add or update a feature type index for a layer.
71     *
72     * @param searchIndex the search index config
73     * @param tmFeatureType the feature type
74     * @throws UnsupportedOperationException if the operation is not supported, possibly because not
75     *     search field shave been defined
76     * @throws IOException if an I/O error occurs
77     * @throws SolrServerException if a Solr error occurs
78     */
79    @SuppressWarnings("FromTemporalAccessor")
80    public void addFeatureTypeIndex(
81        @NotNull SearchIndex searchIndex,
82        @NotNull TMFeatureType tmFeatureType,
83        @NotNull FeatureSourceFactoryHelper featureSourceFactoryHelper)
84        throws UnsupportedOperationException, IOException, SolrServerException {
85  
86      createSchemaIfNotExists();
87  
88      final Instant start = Instant.now();
89  
90      if (null == searchIndex.getSearchFieldsUsed()) {
91        logger.warn("No search fields configured for search index: {}", searchIndex.getName());
92        throw new UnsupportedOperationException(
93            "No search fields configured for search index: %s".formatted(searchIndex.getName()));
94      }
95  
96      // set fields while filtering out hidden fields
97      List<String> searchFields =
98          searchIndex.getSearchFieldsUsed().stream()
99              .filter(s -> !tmFeatureType.getSettings().getHideAttributes().contains(s))
100             .toList();
101     List<String> displayFields =
102         searchIndex.getSearchDisplayFieldsUsed().stream()
103             .filter(s -> !tmFeatureType.getSettings().getHideAttributes().contains(s))
104             .toList();
105 
106     searchIndex.setStatus(SearchIndex.Status.INDEXING);
107 
108     if (searchFields.isEmpty()) {
109       logger.info("No valid search fields configured for featuretype: {}", tmFeatureType.getName());
110       searchIndex.setStatus(SearchIndex.Status.ERROR);
111       throw new UnsupportedOperationException(
112           "No valid search fields configured for featuretype: %s"
113               .formatted(tmFeatureType.getName()));
114     }
115 
116     // add search and display properties to query
117     Set<String> propertyNames = new HashSet<>();
118     // always add primary key and default geometry to geotools query
119     propertyNames.add(tmFeatureType.getPrimaryKeyAttribute());
120     propertyNames.add(tmFeatureType.getDefaultGeometryAttribute());
121     propertyNames.addAll(searchFields);
122 
123     final boolean hasDisplayFields = !displayFields.isEmpty();
124     if (hasDisplayFields) {
125       propertyNames.addAll(displayFields);
126     }
127 
128     clearIndexForLayer(searchIndex.getId());
129 
130     logger.info(
131         "Indexing started for index id: {}, feature type: {}",
132         searchIndex.getId(),
133         tmFeatureType.getName());
134     // collect features to index
135     SimpleFeatureSource fs = featureSourceFactoryHelper.openGeoToolsFeatureSource(tmFeatureType);
136     Query q = new Query(fs.getName().toString());
137     // filter out any hidden properties (there should be none though)
138     tmFeatureType.getSettings().getHideAttributes().forEach(propertyNames::remove);
139     q.setPropertyNames(List.copyOf(propertyNames));
140     q.setStartIndex(0);
141     // TODO: make maxFeatures configurable? perhaps for WFS sources?
142     // q.setMaxFeatures(Integer.MAX_VALUE);
143     logger.trace("Indexing query: {}", q);
144     SimpleFeatureCollection simpleFeatureCollection = fs.getFeatures(q);
145     final int total = simpleFeatureCollection.size();
146     List<FeatureIndexingDocument> docsBatch = new ArrayList<>(SOLR_BATCH_SIZE);
147     // TODO this does not currently batch/page the feature source query, this doesn't seem to be an
148     //   issue for now but could be if the feature source is very large or slow e.g. WFS
149     UpdateResponse updateResponse;
150     try (SimpleFeatureIterator iterator = simpleFeatureCollection.features()) {
151       int indexCounter = 0;
152       while (iterator.hasNext()) {
153         indexCounter++;
154         SimpleFeature feature = iterator.next();
155         // note that this will create a unique document
156         FeatureIndexingDocument doc =
157             new FeatureIndexingDocument(feature.getID(), searchIndex.getId());
158         List<String> searchValues = new ArrayList<>();
159         List<String> displayValues = new ArrayList<>();
160         propertyNames.forEach(
161             propertyName -> {
162               Object value = feature.getAttribute(propertyName);
163               if (value != null) {
164                 if (value instanceof Geometry) {
165                   doc.setGeometry(GeometryProcessor.processGeometry(value, true, true, null));
166                 } else {
167                   // when display and/or search fields are configured, add the value to the search
168                   // and/or display field otherwise add the value to the search and display field
169                   if (searchFields.contains(propertyName)) {
170                     searchValues.add(value.toString());
171                   }
172                   if (hasDisplayFields) {
173                     if (displayFields.contains(propertyName)) {
174                       displayValues.add(value.toString());
175                     }
176                   }
177                 }
178               }
179             });
180         doc.setSearchFields(searchValues.toArray(new String[searchFields.size() + 2]));
181         doc.setDisplayFields(displayValues.toArray(new String[0]));
182         docsBatch.add(doc);
183         if (indexCounter % SOLR_BATCH_SIZE == 0) {
184           updateResponse = solrClient.addBeans(docsBatch);
185           logger.info(
186               "Added {} documents of {} to index, result status: {}",
187               indexCounter,
188               total,
189               updateResponse.getStatus());
190           docsBatch.clear();
191         }
192       }
193     } finally {
194       if (fs.getDataStore() != null) fs.getDataStore().dispose();
195     }
196 
197     if (!docsBatch.isEmpty()) {
198       updateResponse = solrClient.addBeans(docsBatch);
199       logger.info("Added last {} documents of {} to index", docsBatch.size(), total);
200       logger.debug("Update response status: {}", updateResponse.getStatus());
201     }
202     final Instant end = Instant.now();
203     Duration processTime = Duration.between(start, end).abs();
204     logger.info(
205         "Indexing finished for index id: {}, featuretype: {} at {} in {}",
206         searchIndex.getId(),
207         tmFeatureType.getName(),
208         end,
209         processTime);
210     searchIndex.setComment(
211         "Indexed %s features in %s.%s seconds, started at %s"
212             .formatted(total, processTime.getSeconds(), processTime.getNano(), start));
213 
214     searchIndex.setLastIndexed(end.atOffset(ZoneId.systemDefault().getRules().getOffset(end)));
215     searchIndex.setStatus(SearchIndex.Status.INDEXED);
216 
217     updateResponse = this.solrClient.commit();
218     logger.debug("Update response status: {}", updateResponse.getStatus());
219   }
220 
221   /**
222    * Clear the index for a layer.
223    *
224    * @param searchLayerId the layer id
225    * @throws IOException if an I/O error occurs
226    * @throws SolrServerException if a Solr error occurs
227    */
228   public void clearIndexForLayer(@NotNull Long searchLayerId)
229       throws IOException, SolrServerException {
230     QueryResponse response =
231         solrClient.query(
232             new SolrQuery("exists(query(" + SEARCH_LAYER + ":" + searchLayerId + "))"));
233     if (response.getResults().getNumFound() > 0) {
234       logger.info("Clearing index for searchLayer {}", searchLayerId);
235       UpdateResponse updateResponse = solrClient.deleteByQuery(SEARCH_LAYER + ":" + searchLayerId);
236       logger.debug("Update response status: {}", updateResponse.getStatus());
237       updateResponse = solrClient.commit();
238       logger.debug("Update response status: {}", updateResponse.getStatus());
239     } else {
240       logger.info("No index to clear for layer {}", searchLayerId);
241     }
242   }
243 
244   /**
245    * Search in the index for a layer. The given query is augmented to filter on the {@code
246    * solrLayerId}.
247    *
248    * @param searchIndex the search index
249    * @param solrQuery the query, when {@code null} or empty, the query is set to {@code *} (match
250    *     all)
251    * @param start the start index, starting at 0
252    * @param numResultsToReturn the number of results to return
253    * @return the documents
254    * @throws IOException if an I/O error occurs
255    * @throws SolrServerException if a Solr error occurs
256    */
257   public SearchResponse findInIndex(
258       @NotNull SearchIndex searchIndex, String solrQuery, int start, int numResultsToReturn)
259       throws IOException, SolrServerException, SolrException {
260     logger.info("Find in index for {}", searchIndex.getId());
261     if (null == solrQuery || solrQuery.isBlank()) {
262       solrQuery = "*";
263     }
264     // TODO We could escape special/syntax characters, but that also prevents using
265     //      keys like ~ and *
266     // solrQuery = ClientUtils.escapeQueryChars(solrQuery);
267 
268     final SolrQuery query =
269         new SolrQuery(INDEX_SEARCH_FIELD + ":" + solrQuery)
270             .setShowDebugInfo(logger.isDebugEnabled())
271             .setTimeAllowed(SOLR_TIMEOUT)
272             .setIncludeScore(true)
273             .setFields(SEARCH_ID_FIELD, INDEX_DISPLAY_FIELD, INDEX_GEOM_FIELD)
274             .addFilterQuery(SEARCH_LAYER + ":" + searchIndex.getId())
275             .setSort("score", SolrQuery.ORDER.desc)
276             .addSort(SEARCH_ID_FIELD, SolrQuery.ORDER.asc)
277             .setRows(numResultsToReturn)
278             .setStart(start);
279     query.set("q.op", "AND");
280     logger.debug("Solr query: {}", query);
281 
282     final QueryResponse response = solrClient.query(query);
283     logger.debug("response: {}", response);
284 
285     final SolrDocumentList solrDocumentList = response.getResults();
286     logger.debug("Found {} solr documents", solrDocumentList.getNumFound());
287     final SearchResponse searchResponse =
288         new SearchResponse()
289             .total(solrDocumentList.getNumFound())
290             .start(response.getResults().getStart())
291             .maxScore(solrDocumentList.getMaxScore());
292     response
293         .getResults()
294         .forEach(
295             solrDocument -> {
296               List<String> displayValues =
297                   solrDocument.getFieldValues(INDEX_DISPLAY_FIELD).stream()
298                       .map(Object::toString)
299                       .toList();
300               searchResponse.addDocumentsItem(
301                   new SearchDocument()
302                       .fid(solrDocument.getFieldValue(SEARCH_ID_FIELD).toString())
303                       .geometry(solrDocument.getFieldValue(INDEX_GEOM_FIELD).toString())
304                       .displayValues(displayValues));
305             });
306 
307     return searchResponse;
308   }
309 
310   /**
311    * Programmatically create (part of) the schema if it does not exist. Only checks for the
312    * existence of the search layer {@link Constants#SEARCH_LAYER}.
313    *
314    * @throws SolrServerException if a Solr error occurs
315    * @throws IOException if an I/O error occurs
316    */
317   private void createSchemaIfNotExists() throws SolrServerException, IOException {
318     SchemaRequest.Field fieldCheck = new SchemaRequest.Field(SEARCH_LAYER);
319     boolean schemaExists = true;
320     try {
321       SchemaResponse.FieldResponse isField = fieldCheck.process(solrClient);
322       logger.debug("Field type {} exists", isField.getField());
323     } catch (Exception e) {
324       logger.debug(e.getLocalizedMessage());
325       logger.info("Field type {} does not exist, creating it", SEARCH_LAYER);
326       schemaExists = false;
327     }
328 
329     if (schemaExists) {
330       return;
331     }
332 
333     logger.info("Creating Solr field type {}", SEARCH_LAYER);
334     SchemaRequest.AddField schemaRequest =
335         new SchemaRequest.AddField(
336             Map.of(
337                 "name", SEARCH_LAYER,
338                 "type", "string",
339                 "indexed", true,
340                 "stored", true,
341                 "multiValued", false,
342                 "required", true,
343                 "uninvertible", false));
344     schemaRequest.process(solrClient);
345 
346     logger.info("Creating Solr field type {}", INDEX_GEOM_FIELD);
347     // TODO https://b3partners.atlassian.net/browse/HTM-1091
348     //  this should be a spatial field type using ("type", "location_rpt")
349     //  but that requires some more work
350     SchemaRequest.AddField schemaRequestGeom =
351         new SchemaRequest.AddField(
352             Map.of(
353                 "name", INDEX_GEOM_FIELD,
354                 "type", "string",
355                 "indexed", false,
356                 "stored", true,
357                 "multiValued", false));
358     schemaRequestGeom.process(solrClient);
359   }
360 
361   /**
362    * Close the wrapped Solr client.
363    *
364    * @throws IOException if an I/O error occurs
365    */
366   @Override
367   public void close() throws IOException {
368     if (null != this.solrClient) this.solrClient.close();
369   }
370 }