View Javadoc
1   /*
2    * Copyright (C) 2026 B3Partners B.V.
3    *
4    * SPDX-License-Identifier: MIT
5    */
6   package org.tailormap.api.service;
7   
8   import ch.rasc.sse.eventbus.SseEvent;
9   import ch.rasc.sse.eventbus.SseEventBus;
10  import jakarta.annotation.PostConstruct;
11  import java.io.File;
12  import java.io.IOException;
13  import java.io.UncheckedIOException;
14  import java.lang.invoke.MethodHandles;
15  import java.nio.charset.StandardCharsets;
16  import java.nio.file.Files;
17  import java.nio.file.Path;
18  import java.time.Instant;
19  import java.util.ArrayList;
20  import java.util.Comparator;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Objects;
24  import java.util.Set;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.stream.Stream;
28  import java.util.zip.ZipEntry;
29  import java.util.zip.ZipOutputStream;
30  import org.apache.commons.lang3.StringUtils;
31  import org.geotools.api.data.FeatureEvent;
32  import org.geotools.api.data.FileDataStore;
33  import org.geotools.api.data.Query;
34  import org.geotools.api.data.SimpleFeatureSource;
35  import org.geotools.api.data.SimpleFeatureStore;
36  import org.geotools.api.data.Transaction;
37  import org.geotools.api.feature.simple.SimpleFeatureType;
38  import org.geotools.api.filter.Filter;
39  import org.geotools.api.filter.FilterFactory;
40  import org.geotools.api.filter.sort.SortOrder;
41  import org.geotools.data.DataUtilities;
42  import org.geotools.data.DefaultTransaction;
43  import org.geotools.data.csv.CSVDataStoreFactory;
44  import org.geotools.data.geojson.store.GeoJSONDataStoreFactory;
45  import org.geotools.data.shapefile.ShapefileDumper;
46  import org.geotools.factory.CommonFactoryFinder;
47  import org.geotools.feature.SchemaException;
48  import org.geotools.geopkg.FeatureEntry;
49  import org.geotools.geopkg.GeoPackage;
50  import org.geotools.util.factory.GeoTools;
51  import org.jspecify.annotations.NonNull;
52  import org.jspecify.annotations.Nullable;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  import org.springframework.beans.factory.annotation.Qualifier;
56  import org.springframework.beans.factory.annotation.Value;
57  import org.springframework.scheduling.annotation.Async;
58  import org.springframework.scheduling.annotation.Scheduled;
59  import org.springframework.stereotype.Service;
60  import org.springframework.transaction.annotation.Transactional;
61  import org.tailormap.api.controller.LayerExtractController;
62  import org.tailormap.api.geotools.collection.ProgressReportingFeatureCollection;
63  import org.tailormap.api.geotools.data.excel.ExcelDataStore;
64  import org.tailormap.api.geotools.data.excel.ExcelDataStoreFactory;
65  import org.tailormap.api.geotools.featuresources.FeatureSourceFactoryHelper;
66  import org.tailormap.api.persistence.TMFeatureType;
67  import org.tailormap.api.util.UUIDv7;
68  import org.tailormap.api.viewer.model.ServerSentEventResponse;
69  import tools.jackson.databind.SerializationFeature;
70  import tools.jackson.databind.json.JsonMapper;
71  
72  @Service
73  public class CreateLayerExtractService {
74    private static final Logger logger =
75        LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
76    private final SseEventBus eventBus;
77    private final JsonMapper jsonMapper;
78    private final FeatureSourceFactoryHelper featureSourceFactoryHelper;
79    private final FilterFactory ff = CommonFactoryFinder.getFilterFactory(GeoTools.getDefaultHints());
80  
81    private static final String EXTRACT_SUBDIRECTORY = "tm-extracts";
82    // we can safely use the tmp dir as a default here because we are running in a docker container without a shell so
83    // access is limited
84    // Base directory from config; actual export dir is <base>/tm-extracts
85    @Value("${tailormap-api.extract.location:#{systemProperties['java.io.tmpdir']}}")
86    private String exportFilesBaseLocation;
87  
88    private String exportFilesLocation;
89  
90    @Value("${tailormap-api.extract.cleanup-minutes:120}")
91    private int cleanupIntervalMinutes;
92  
93    @Value("#{T(java.lang.Math).max(1, ${tailormap-api.extract.progress-report-interval:100})}")
94    private int progressReportInterval;
95  
96    @Value("${tailormap-api.features.wfs_count_exact:false}")
97    private boolean exactWfsCounts;
98  
99    @PostConstruct
100   void initializeExtractDirectory() {
101     try {
102       Path exportRoot = Path.of(exportFilesBaseLocation, EXTRACT_SUBDIRECTORY);
103       Files.createDirectories(exportRoot);
104       this.exportFilesLocation = exportRoot.toRealPath().toString();
105       logger.info("Using extract output directory: {}", this.exportFilesLocation);
106     } catch (IOException e) {
107       throw new UncheckedIOException(
108           "Failed to initialize extract directory under base path: " + exportFilesBaseLocation, e);
109     }
110   }
111 
112   public CreateLayerExtractService(
113       @Qualifier("viewerSseEventBus") SseEventBus eventBus,
114       JsonMapper jsonMapper,
115       FeatureSourceFactoryHelper featureSourceFactoryHelper) {
116     this.eventBus = eventBus;
117     this.featureSourceFactoryHelper = featureSourceFactoryHelper;
118     // force unindented/single line output for SSE messages, because we may have set
119     // spring.jackson.serialization.indent_output=true for debugging/development/test
120     if (jsonMapper.isEnabled(SerializationFeature.INDENT_OUTPUT)) {
121       this.jsonMapper = jsonMapper
122           .rebuild()
123           .configure(SerializationFeature.INDENT_OUTPUT, false)
124           .build();
125     } else {
126       this.jsonMapper = jsonMapper;
127     }
128   }
129 
130   public String getExportFilesLocation() {
131     return exportFilesLocation;
132   }
133 
134   private void emitError(@NonNull String clientId, String details) {
135     eventBus.handleEvent(SseEvent.builder()
136         .addClientId(clientId)
137         .data(jsonMapper.writeValueAsString(new ServerSentEventResponse()
138             .eventType(ServerSentEventResponse.EventTypeEnum.EXTRACT_FAILED)
139             .id(UUIDv7.randomV7())
140             .details(Map.of(
141                 "message", "An error occurred during extract creation", "explanation", details))))
142         .build());
143   }
144 
145   public void emitProgress(
146       @NonNull String clientId,
147       @Nullable String fileId,
148       int progress,
149       boolean completed,
150       @Nullable String message) {
151     message = StringUtils.isBlank(message) ? "Extract task started" : message;
152     fileId = StringUtils.isBlank(fileId) ? "" : fileId;
153     logger.debug("Emitting progress {}% for client [{}], message: '{}'", progress, clientId, message);
154 
155     eventBus.handleEvent(SseEvent.builder()
156         .addClientId(clientId)
157         .data(jsonMapper.writeValueAsString(new ServerSentEventResponse()
158             .eventType(
159                 completed
160                     ? ServerSentEventResponse.EventTypeEnum.EXTRACT_COMPLETED
161                     : ServerSentEventResponse.EventTypeEnum.EXTRACT_PROGRESS)
162             .id(UUIDv7.randomV7())
163             .details(Map.of(
164                 "progress",
165                 progress,
166                 "message",
167                 completed ? "Extract task completed" : message,
168                 "downloadId",
169                 fileId))))
170         .build());
171   }
172 
173   /**
174    * Check that the sse client id is valid and exists.
175    *
176    * @param clientId the SSE client id
177    * @throws IllegalArgumentException when the SSE client id is invalid or not found on the event bus
178    */
179   public void validateClientId(@NonNull String clientId) throws IllegalArgumentException {
180     if (!clientId.matches("[A-Za-z0-9_-]+")) {
181       logger.warn("Invalid clientId for SSE connection: {}", clientId);
182       throw new IllegalArgumentException("Invalid clientId");
183     }
184 
185     // validate the given clientId is known on the event bus
186     this.eventBus.getAllClientIds().stream()
187         .filter(id -> Objects.equals(id, clientId))
188         .findFirst()
189         .ifPresentOrElse(id -> logger.debug("Validated clientId {}", id), () -> {
190           throw new IllegalArgumentException("No active subscription found for clientId " + clientId);
191         });
192   }
193 
194   /**
195    * Create a validated filename for an extract. The naming follows the pattern
196    * {@code "%s_%s_%s%s".formatted(sourceFT.getName(), clientId, UUIDv7.randomV7(), outputFormat.getExtension()) }
197    * where the first part is the source feature type name (this is cleaned from some characters), the second part is
198    * the SSE client id, the third part is a random UUIDv7 and the fourth part is the file extension based on the
199    * requested output format.
200    *
201    * @param clientId the SSE client id
202    * @param sourceFT the source featuretype for the extract
203    * @param outputFormat the required format of the extract
204    * @return the filename used to create an extract
205    * @throws IllegalArgumentException when the SSE clientId is invalid or not found on the event bus
206    */
207   public String createExtractFilename(
208       @NonNull String clientId,
209       @NonNull TMFeatureType sourceFT,
210       LayerExtractController.@NonNull ExtractOutputFormat outputFormat)
211       throws IllegalArgumentException {
212 
213     this.validateClientId(clientId);
214 
215     String cleanFTName = sourceFT.getName();
216     if (cleanFTName.contains(":")) {
217       // clip off the WFS namespace part
218       cleanFTName = cleanFTName.substring(cleanFTName.lastIndexOf(":") + 1);
219       // remove: '.' and '_' which are used as separators in the filename and could cause issues when parsing the
220       // filename later on
221       cleanFTName = cleanFTName.replaceAll("[._]", "");
222     }
223     return "%s_%s_%s%s".formatted(cleanFTName, clientId, UUIDv7.randomV7(), outputFormat.getExtension());
224   }
225 
226   @Async("extractTaskExecutor")
227   @Transactional
228   public void createLayerExtract(
229       @NonNull String clientId,
230       @NonNull TMFeatureType inputTmFeatureType,
231       @NonNull Set<String> attributes,
232       @Nullable Filter filter,
233       String sortBy,
234       SortOrder sortOrder,
235       LayerExtractController.@NonNull ExtractOutputFormat extractOutputFormat,
236       @NonNull String outputFileName) {
237 
238     this.emitProgress(clientId, outputFileName, 0, false, "Starting extract");
239 
240     switch (extractOutputFormat) {
241       case GEOPACKAGE ->
242         this.handleGeoPackage(
243             clientId, inputTmFeatureType, attributes, filter, sortBy, sortOrder, outputFileName);
244       case SHAPE ->
245         this.handleWithShapeDumper(
246             clientId, inputTmFeatureType, attributes, filter, sortBy, sortOrder, outputFileName);
247       case CSV, GEOJSON, XLSX ->
248         this.handleSingleFileFormats(
249             clientId,
250             inputTmFeatureType,
251             attributes,
252             filter,
253             sortBy,
254             sortOrder,
255             extractOutputFormat,
256             outputFileName);
257     }
258   }
259 
260   private void handleGeoPackage(
261       @NonNull String clientId,
262       @NonNull TMFeatureType inputTmFeatureType,
263       @NonNull Set<String> attributes,
264       Filter filter,
265       String sortBy,
266       SortOrder sortOrder,
267       @NonNull String outputFileName) {
268 
269     SimpleFeatureSource inputFeatureSource = null;
270     File outputFile;
271     try {
272       outputFile = getValidatedOutputFile(outputFileName);
273       if (!logger.isDebugEnabled()) {
274         // delete in production after JVM exit because the event bus will be reset when the JVM exits, and then
275         // we are unlikely to have a reference to the file anymore.
276         // In debug/development mode we want to keep the file for inspection.
277         outputFile.deleteOnExit();
278       }
279     } catch (IOException e) {
280       emitError(clientId, e.getMessage());
281       logger.error("Creating extract failed", e);
282       return;
283     }
284 
285     try (GeoPackage geopkg = new GeoPackage(outputFile)) {
286       geopkg.init();
287 
288       inputFeatureSource = featureSourceFactoryHelper.openGeoToolsFeatureSource(inputTmFeatureType);
289 
290       Query q = createQuery(inputFeatureSource, attributes, filter, sortBy, sortOrder);
291 
292       int featCount = getFeatureCount(inputFeatureSource, q);
293       if (featCount < 0) {
294         logger.warn("Could not determine feature count for extract, progress reporting will be inaccurate");
295       }
296       final boolean hasKnownFeatureCount = featCount > 0;
297 
298       SimpleFeatureType fType =
299           DataUtilities.createSubType(inputFeatureSource.getSchema(), attributes.toArray(new String[0]));
300 
301       FeatureEntry entry = new FeatureEntry();
302       entry.setTableName(fType.getTypeName());
303       entry.setDescription(fType.getTypeName());
304 
305       AtomicInteger lastProgress = new AtomicInteger(0);
306       geopkg.add(
307           entry,
308           new ProgressReportingFeatureCollection(
309               inputFeatureSource.getFeatures(q), progressReportInterval, processed -> {
310                 int progress = hasKnownFeatureCount ? (int) ((processed / (double) featCount) * 99) : 0;
311                 lastProgress.set(progress);
312                 String progressMessage = hasKnownFeatureCount
313                     ? "Extracting geopackage: %d/%d features processed"
314                         .formatted(processed, featCount)
315                     : "Extracting geopackage: %d features processed".formatted(processed);
316                 this.emitProgress(clientId, outputFileName, progress, false, progressMessage);
317               }));
318 
319       this.emitProgress(
320           clientId,
321           outputFileName,
322           Math.max(99, lastProgress.get()),
323           false,
324           "Extract geopackage created successfully");
325       geopkg.createSpatialIndex(entry);
326       geopkg.close();
327       this.emitProgress(clientId, outputFileName, 100, true, "Extract completed successfully");
328     } catch (SchemaException | IOException | IllegalArgumentException e) {
329       emitError(clientId, e.getMessage());
330       logger.error("Creating extract failed", e);
331     } finally {
332       if (inputFeatureSource != null) {
333         try {
334           inputFeatureSource.getDataStore().dispose();
335         } catch (Exception e) {
336           logger.warn("Error disposing datastore for feature source {}", inputFeatureSource.getName(), e);
337         }
338       }
339     }
340   }
341 
342   private void handleSingleFileFormats(
343       @NonNull String clientId,
344       @NonNull TMFeatureType inputTmFeatureType,
345       @NonNull Set<String> attributes,
346       Filter filter,
347       String sortBy,
348       SortOrder sortOrder,
349       LayerExtractController.@NonNull ExtractOutputFormat extractOutputFormat,
350       @NonNull String outputFileName) {
351 
352     SimpleFeatureSource inputFeatureSource = null;
353     FileDataStore outputDataStore = null;
354     try (Transaction outputTransaction = new DefaultTransaction("tailormap-extract-output")) {
355       inputFeatureSource = featureSourceFactoryHelper.openGeoToolsFeatureSource(inputTmFeatureType);
356 
357       Query q = createQuery(inputFeatureSource, attributes, filter, sortBy, sortOrder);
358 
359       int featCount = getFeatureCount(inputFeatureSource, q);
360 
361       if (extractOutputFormat == LayerExtractController.ExtractOutputFormat.XLSX
362           && featCount >= ExcelDataStore.getMaxRows()) {
363         this.emitError(
364             clientId,
365             "Extract result contains %d features, which exceeds the maximum of %d for Excel output format. Please refine your filter or choose a different output format."
366                 .formatted(featCount, ExcelDataStore.getMaxRows()));
367         logger.error(
368             "Extract result contains {} features, which exceeds the maximum of {} for Excel output format. Please refine your filter or choose a different output format.",
369             featCount,
370             ExcelDataStore.getMaxRows());
371         // nothing we can do now as we are in a background/async process, so we just return without creating an
372         // extract file.
373         // The client will receive no extract completed event, and we have already emitted an error message with
374         // details.
375         return;
376       }
377 
378       outputDataStore = this.getExtractDataStore(
379           extractOutputFormat, outputFileName, clientId, inputTmFeatureType.getName());
380       SimpleFeatureType fType =
381           DataUtilities.createSubType(inputFeatureSource.getSchema(), attributes.toArray(new String[0]));
382       outputDataStore.createSchema(fType);
383 
384       final AtomicInteger featsAdded = new AtomicInteger();
385       if (outputDataStore.getFeatureSource() instanceof SimpleFeatureStore featureStore) {
386         featureStore.setTransaction(outputTransaction);
387         featureStore.addFeatureListener(event -> {
388           if (event.getType().equals(FeatureEvent.Type.ADDED)) {
389             featsAdded.getAndIncrement();
390           }
391           if (featCount > 0) {
392             if (featsAdded.get() % progressReportInterval == 0) {
393               this.emitProgress(
394                   clientId,
395                   outputFileName,
396                   (int) ((featsAdded.doubleValue() / featCount) * 100),
397                   false,
398                   null);
399             }
400           }
401         });
402         featureStore.addFeatures(inputFeatureSource.getFeatures(q));
403         outputTransaction.commit();
404         outputDataStore.dispose();
405         this.emitProgress(clientId, outputFileName, 100, true, "Extract completed successfully");
406       } else {
407         outputDataStore.dispose();
408         this.emitError(clientId, "Output datastore is not a SimpleFeatureStore, cannot write features");
409         logger.error("Output datastore is not a SimpleFeatureStore, cannot write features");
410       }
411     } catch (IOException | SchemaException | IllegalArgumentException | NullPointerException e) {
412       emitError(clientId, e.getMessage());
413       logger.error("Creating extract failed", e);
414     } finally {
415       if (outputDataStore != null) {
416         outputDataStore.dispose();
417       }
418       if (inputFeatureSource != null) {
419         try {
420           inputFeatureSource.getDataStore().dispose();
421         } catch (Exception e) {
422           logger.warn("Error disposing datastore for feature source {}", inputFeatureSource.getName(), e);
423         }
424       }
425     }
426   }
427 
428   private File getValidatedOutputFile(String outputFileName) throws IOException {
429     Path exportRoot = Path.of(exportFilesLocation).toRealPath();
430     Path outputPath = exportRoot.resolve(outputFileName).normalize();
431     if (!outputPath.startsWith(exportRoot)) {
432       throw new IOException("Invalid file path");
433     }
434     Path createdFilePath = Files.createFile(outputPath).toRealPath();
435     if (!createdFilePath.startsWith(exportRoot)) {
436       throw new IOException("Invalid file path");
437     }
438     return createdFilePath.toFile();
439   }
440 
441   /**
442    * Create a writable GeoTools {@link FileDataStore} for the requested extract format. The format must be must be
443    * supported by a {@link FileDataStore} implementation, for example CSV, Excel or GeoJSON. For unsupported formats
444    * (for example Shapefile) a custom handling is used in the calling method.
445    *
446    * <p>The output file is validated to ensure it is created under the configured extract location.
447    *
448    * @param extractOutputFormat the requested extract output format
449    * @param outputFileName the target output filename
450    * @param clientId the SSE client id, used for error reporting
451    * @param typeName the source feature type name, used to derive format-specific metadata (for example Excel sheet
452    *     name)
453    * @return a newly created {@link FileDataStore} configured for the requested format
454    * @throws IOException when the output file path is invalid or the datastore cannot be created
455    */
456   private FileDataStore getExtractDataStore(
457       LayerExtractController.ExtractOutputFormat extractOutputFormat,
458       String outputFileName,
459       String clientId,
460       String typeName)
461       throws IOException {
462 
463     final File outputFile = getValidatedOutputFile(outputFileName);
464     if (!logger.isDebugEnabled()) {
465       outputFile.deleteOnExit();
466     }
467 
468     return switch (extractOutputFormat) {
469       case CSV ->
470         (FileDataStore) new CSVDataStoreFactory()
471             .createNewDataStore(Map.of(
472                 CSVDataStoreFactory.FILE_PARAM.key,
473                 outputFile,
474                 CSVDataStoreFactory.STRATEGYP.key,
475                 CSVDataStoreFactory.WKT_STRATEGY,
476                 CSVDataStoreFactory.WKTP.key,
477                 "the_geom_wkt",
478                 CSVDataStoreFactory.WRITEPRJ.key,
479                 false,
480                 CSVDataStoreFactory.QUOTEALL.key,
481                 true));
482       case XLSX -> {
483         String processedTypeName = typeName.contains(":")
484             ? typeName.substring(typeName.lastIndexOf(":") + 1).replaceAll("[\\\\/?*\\[\\]:]", "_")
485             : typeName.replaceAll("[\\\\/?*\\[\\]:]", "_");
486         processedTypeName = processedTypeName.substring(0, Math.min(processedTypeName.length(), 31));
487         yield (FileDataStore) new ExcelDataStoreFactory()
488             .createNewDataStore(Map.of(
489                 ExcelDataStoreFactory.FILE_PARAM.key,
490                 outputFile,
491                 ExcelDataStoreFactory.SHEET_PARAM.key,
492                 processedTypeName));
493       }
494       case GEOJSON ->
495         (FileDataStore) new GeoJSONDataStoreFactory()
496             .createNewDataStore(Map.of(GeoJSONDataStoreFactory.FILE_PARAM.key, outputFile));
497       default -> {
498         emitError(clientId, "Unknown output format: " + extractOutputFormat);
499         logger.error("Unknown output format: {}", extractOutputFormat);
500         throw new IllegalArgumentException("Unknown output format: " + extractOutputFormat);
501       }
502     };
503   }
504 
505   private int getFeatureCount(SimpleFeatureSource source, Query query) throws IOException {
506     int count = source.getCount(query);
507     logger.debug("Filtered source counts {} features", count);
508     if (count < 0 && exactWfsCounts) {
509       count = source.getFeatures(query).size();
510     }
511     return count;
512   }
513 
514   private void handleWithShapeDumper(
515       @NonNull String clientId,
516       @NonNull TMFeatureType inputTmFeatureType,
517       @NonNull Set<String> attributes,
518       Filter filter,
519       String sortBy,
520       SortOrder sortOrder,
521       @NonNull String outputFileName) {
522     SimpleFeatureSource inputFeatureSource = null;
523     File outputDirectory = null;
524     try {
525       File outputFile = getValidatedOutputFile(outputFileName);
526       String baseName = outputFile
527           .getName()
528           .substring(
529               0,
530               outputFile
531                   .getName()
532                   .lastIndexOf(LayerExtractController.ExtractOutputFormat.SHAPE.getExtension()));
533       outputDirectory = outputFile
534           .getParentFile()
535           .toPath()
536           .resolve(baseName)
537           .toFile()
538           .getCanonicalFile();
539       if (!logger.isDebugEnabled()) {
540         // delete in production after JVM exit because the event bus will be reset when the JVM exits, and then
541         // we are unlikely to have a reference to the file anymore.
542         // In debug/development mode we want to keep the directory for inspection.
543         outputDirectory.deleteOnExit();
544       }
545       Files.createDirectories(outputDirectory.toPath());
546 
547       ShapefileDumper dumper = new ShapefileDumper(outputDirectory);
548       dumper.setCharset(StandardCharsets.UTF_8);
549       dumper.setEmptyShapefileAllowed(false);
550 
551       inputFeatureSource = featureSourceFactoryHelper.openGeoToolsFeatureSource(inputTmFeatureType);
552 
553       Query q = createQuery(inputFeatureSource, attributes, filter, sortBy, sortOrder);
554 
555       final int featCount = getFeatureCount(inputFeatureSource, q);
556       final boolean hasKnownFeatureCount = featCount > 0;
557 
558       AtomicInteger lastProgress = new AtomicInteger(0);
559 
560       dumper.dump(new ProgressReportingFeatureCollection(
561           inputFeatureSource.getFeatures(q), progressReportInterval, processed -> {
562             int progress = hasKnownFeatureCount ? (int) ((processed / (double) featCount) * 99) : 0;
563             lastProgress.set(progress);
564             String progressMessage = hasKnownFeatureCount
565                 ? "Extracting shapes: %d/%d features processed".formatted(processed, featCount)
566                 : "Extracting shapes: %d features processed".formatted(processed);
567             this.emitProgress(clientId, outputFileName, progress, false, progressMessage);
568           }));
569       this.emitProgress(
570           clientId,
571           outputFileName,
572           Math.max(99, lastProgress.get()),
573           false,
574           "Extract shapes dumped successfully");
575 
576       zipDirectory(outputDirectory.toPath(), outputFile.toPath());
577       this.emitProgress(clientId, outputFileName, 100, true, "Extract completed successfully");
578     } catch (IOException | IllegalArgumentException e) {
579       emitError(clientId, e.getMessage());
580       logger.error("Creating extract failed", e);
581     } finally {
582       if (outputDirectory != null) {
583         try {
584           deleteDirectoryRecursively(outputDirectory.toPath());
585         } catch (IOException e) {
586           logger.error("Failed to delete output directory {}", outputDirectory, e);
587         }
588       }
589       if (inputFeatureSource != null) {
590         try {
591           inputFeatureSource.getDataStore().dispose();
592         } catch (Exception e) {
593           logger.warn("Error disposing datastore for feature source {}", inputFeatureSource.getName(), e);
594         }
595       }
596     }
597   }
598 
599   private Query createQuery(
600       SimpleFeatureSource inputFeatureSource,
601       Set<String> attributes,
602       Filter filter,
603       String sortBy,
604       SortOrder sortOrder) {
605     Query q = new Query(inputFeatureSource.getName().toString());
606     if (!attributes.isEmpty()) {
607       q.setPropertyNames(attributes.toArray(new String[0]));
608     }
609 
610     if (filter != null) {
611       q.setFilter(filter);
612     }
613     if (!StringUtils.isBlank(sortBy)) {
614       q.setSortBy(ff.sort(sortBy, Objects.requireNonNullElse(sortOrder, SortOrder.ASCENDING)));
615     }
616     return q;
617   }
618 
619   /**
620    * Cleanup expired extract files. Filenames are created in {@link CreateLayerExtractService#createExtractFilename }
621    * and follow the pattern {@code "%s_%s_%s.%s".formatted(sourceFT.getName(), clientId, UUIDv7.randomV7(),
622    * outputFormat.getExtension()) }
623    */
624   @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.MINUTES, initialDelay = 15)
625   public void cleanupExpiredExtracts() {
626     logger.debug("Running expired extracts cleanup in {}", exportFilesLocation);
627     List<FileWithAttributes> oldDownloadFilesOnDisk = new ArrayList<>();
628     Set<String> validClientIds = eventBus.getAllClientIds();
629 
630     // list download files in export location and delete those that are not bound to an active sse stream client
631     try (Stream<Path> stream = Files.walk(Path.of(exportFilesLocation))) {
632       stream.filter(Files::isRegularFile).forEach(path -> {
633         File file = path.toFile();
634         String filename = file.getName();
635         String[] parts = filename.split("[_.]", -1);
636         if (parts.length < 4) {
637           logger.warn("Unexpected file in extract location: {}", filename);
638           return;
639         }
640         String clientId = parts[1];
641         if (!validClientIds.contains(clientId)) {
642           if (!file.delete()) {
643             logger.error("Failed to delete unattached extract file {}", filename);
644           }
645         } else {
646           try {
647             Instant timestampPart = UUIDv7.timestampAsInstant(UUIDv7.fromString(parts[2]));
648             oldDownloadFilesOnDisk.add(new FileWithAttributes(file, timestampPart, clientId));
649           } catch (IllegalArgumentException ignored) {
650             // not a valid v7 uuid
651           }
652         }
653       });
654 
655       try (Stream<Path> paths = Files.walk(Path.of(exportFilesLocation))) {
656         paths.filter(Files::isDirectory)
657             .filter(path -> !path.equals(Path.of(exportFilesLocation)))
658             .forEach(path -> {
659               logger.debug("Checking directory {} for expired extracts", path);
660               File file = path.toFile();
661               String filename = file.getName();
662               String[] parts = filename.split("[_]", -1);
663               if (parts.length < 3) {
664                 logger.warn("Unexpected directory in extract location: {}", filename);
665                 return;
666               }
667               String clientId = parts[1];
668               if (!validClientIds.contains(clientId)) {
669                 try {
670                   deleteDirectoryRecursively(file.toPath());
671                 } catch (IOException e) {
672                   logger.error("Failed to delete unattached extract directory {}", filename);
673                 }
674               } else {
675                 try {
676                   Instant timestampPart = UUIDv7.timestampAsInstant(UUIDv7.fromString(parts[2]));
677                   oldDownloadFilesOnDisk.add(new FileWithAttributes(file, timestampPart, clientId));
678                 } catch (IllegalArgumentException ignored) {
679                   // not a valid v7 uuid
680                 }
681               }
682             });
683       }
684 
685       // delete any files/directories are older than the cutoff
686       oldDownloadFilesOnDisk.stream()
687           .filter(f -> f.timestamp()
688               .isBefore(Instant.now().minusSeconds(TimeUnit.MINUTES.toSeconds(cleanupIntervalMinutes))))
689           .forEach(f -> {
690             if (f.file.isDirectory()) {
691               try {
692                 deleteDirectoryRecursively(f.file().toPath());
693               } catch (IOException ignored) {
694                 logger.warn("Failed to delete directory {}", f.file());
695               }
696             } else {
697               if (!f.file().delete()) {
698                 logger.error(
699                     "Failed to delete expired extract file {}",
700                     f.file().getName());
701               }
702             }
703           });
704     } catch (IOException e) {
705       logger.error("Error while cleaning up expired extracts", e);
706     }
707   }
708 
709   private void zipDirectory(Path sourceDir, Path zipFile) throws IOException {
710     try (ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(zipFile));
711         Stream<Path> pathStream = Files.walk(sourceDir)) {
712       pathStream.filter(Files::isRegularFile).forEach(path -> {
713         String entryName = sourceDir.relativize(path).toString().replace(File.separatorChar, '/');
714         try {
715           zos.putNextEntry(new ZipEntry(entryName));
716           Files.copy(path, zos);
717           zos.closeEntry();
718         } catch (IOException e) {
719           throw new RuntimeException("Failed to add file to zip: " + path, e);
720         }
721       });
722     } catch (RuntimeException e) {
723       if (e.getCause() instanceof IOException ioException) {
724         throw ioException;
725       }
726       throw e;
727     }
728   }
729 
730   private void deleteDirectoryRecursively(Path directory) throws IOException {
731     try (Stream<Path> paths = Files.walk(directory)) {
732       paths.sorted(Comparator.reverseOrder()).forEach(path -> {
733         try {
734           logger.debug("Deleting path {}", path);
735           Files.deleteIfExists(path);
736         } catch (IOException e) {
737           throw new RuntimeException("Failed to delete path: " + path, e);
738         }
739       });
740     } catch (RuntimeException e) {
741       if (e.getCause() instanceof IOException ioException) {
742         throw ioException;
743       }
744       throw e;
745     }
746   }
747 
748   private record FileWithAttributes(File file, Instant timestamp, String clientId) {}
749 }