View Javadoc
1   /*
2    * Copyright (C) 2026 B3Partners B.V.
3    *
4    * SPDX-License-Identifier: MIT
5    */
6   package org.tailormap.api.service;
7   
8   import ch.rasc.sse.eventbus.SseEvent;
9   import ch.rasc.sse.eventbus.SseEventBus;
10  import jakarta.annotation.PostConstruct;
11  import java.io.File;
12  import java.io.IOException;
13  import java.io.UncheckedIOException;
14  import java.lang.invoke.MethodHandles;
15  import java.nio.charset.StandardCharsets;
16  import java.nio.file.Files;
17  import java.nio.file.Path;
18  import java.time.Instant;
19  import java.util.ArrayList;
20  import java.util.Comparator;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Objects;
24  import java.util.Set;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.stream.Stream;
28  import java.util.zip.ZipEntry;
29  import java.util.zip.ZipOutputStream;
30  import org.apache.commons.lang3.StringUtils;
31  import org.geotools.api.data.FeatureEvent;
32  import org.geotools.api.data.FileDataStore;
33  import org.geotools.api.data.Query;
34  import org.geotools.api.data.SimpleFeatureSource;
35  import org.geotools.api.data.SimpleFeatureStore;
36  import org.geotools.api.data.Transaction;
37  import org.geotools.api.feature.simple.SimpleFeatureType;
38  import org.geotools.api.filter.Filter;
39  import org.geotools.api.filter.FilterFactory;
40  import org.geotools.api.filter.sort.SortOrder;
41  import org.geotools.data.DataUtilities;
42  import org.geotools.data.DefaultTransaction;
43  import org.geotools.data.csv.CSVDataStoreFactory;
44  import org.geotools.data.geojson.store.GeoJSONDataStoreFactory;
45  import org.geotools.data.shapefile.ShapefileDumper;
46  import org.geotools.factory.CommonFactoryFinder;
47  import org.geotools.feature.SchemaException;
48  import org.geotools.geopkg.FeatureEntry;
49  import org.geotools.geopkg.GeoPackage;
50  import org.geotools.util.factory.GeoTools;
51  import org.jspecify.annotations.NonNull;
52  import org.jspecify.annotations.Nullable;
53  import org.slf4j.Logger;
54  import org.slf4j.LoggerFactory;
55  import org.springframework.beans.factory.annotation.Qualifier;
56  import org.springframework.beans.factory.annotation.Value;
57  import org.springframework.scheduling.annotation.Async;
58  import org.springframework.scheduling.annotation.Scheduled;
59  import org.springframework.stereotype.Service;
60  import org.springframework.transaction.annotation.Transactional;
61  import org.tailormap.api.controller.LayerExtractController;
62  import org.tailormap.api.geotools.collection.ProgressReportingFeatureCollection;
63  import org.tailormap.api.geotools.data.excel.ExcelDataStore;
64  import org.tailormap.api.geotools.data.excel.ExcelDataStoreFactory;
65  import org.tailormap.api.geotools.featuresources.FeatureSourceFactoryHelper;
66  import org.tailormap.api.persistence.TMFeatureType;
67  import org.tailormap.api.util.UUIDv7;
68  import org.tailormap.api.viewer.model.ServerSentEventResponse;
69  import tools.jackson.databind.SerializationFeature;
70  import tools.jackson.databind.json.JsonMapper;
71  
72  @Service
73  public class CreateLayerExtractService {
74    private static final Logger logger =
75        LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
76    private final SseEventBus eventBus;
77    private final JsonMapper jsonMapper;
78    private final FeatureSourceFactoryHelper featureSourceFactoryHelper;
79    private final FilterFactory ff = CommonFactoryFinder.getFilterFactory(GeoTools.getDefaultHints());
80  
81    private static final String EXTRACT_SUBDIRECTORY = "tm-extracts";
82    // we can safely use the tmp dir as a default here because we are running in a docker container without a shell so
83    // access is limited
84    // Base directory from config; actual export dir is <base>/tm-extracts
85    @Value("${tailormap-api.extract.location:#{systemProperties['java.io.tmpdir']}}")
86    private String exportFilesBaseLocation;
87  
88    private String exportFilesLocation;
89  
90    @Value("${tailormap-api.extract.cleanup-minutes:120}")
91    private int cleanupIntervalMinutes;
92  
93    @Value("#{T(java.lang.Math).max(1, ${tailormap-api.extract.progress-report-interval:100})}")
94    private int progressReportInterval;
95  
96    @Value("${tailormap-api.features.wfs_count_exact:false}")
97    private boolean exactWfsCounts;
98  
99    @PostConstruct
100   void initializeExtractDirectory() {
101     try {
102       Path exportRoot = Path.of(exportFilesBaseLocation, EXTRACT_SUBDIRECTORY);
103       Files.createDirectories(exportRoot);
104       this.exportFilesLocation = exportRoot.toRealPath().toString();
105       logger.info("Using extract output directory: {}", this.exportFilesLocation);
106     } catch (IOException e) {
107       throw new UncheckedIOException(
108           "Failed to initialize extract directory under base path: " + exportFilesBaseLocation, e);
109     }
110   }
111 
112   public CreateLayerExtractService(
113       @Qualifier("viewerSseEventBus") SseEventBus eventBus,
114       JsonMapper jsonMapper,
115       FeatureSourceFactoryHelper featureSourceFactoryHelper) {
116     this.eventBus = eventBus;
117     this.featureSourceFactoryHelper = featureSourceFactoryHelper;
118     // force unindented/single line output for SSE messages, because we may have set
119     // spring.jackson.serialization.indent_output=true for debugging/development/test
120     if (jsonMapper.isEnabled(SerializationFeature.INDENT_OUTPUT)) {
121       this.jsonMapper = jsonMapper
122           .rebuild()
123           .configure(SerializationFeature.INDENT_OUTPUT, false)
124           .build();
125     } else {
126       this.jsonMapper = jsonMapper;
127     }
128   }
129 
130   public String getExportFilesLocation() {
131     return exportFilesLocation;
132   }
133 
134   private void emitError(@NonNull String clientId, String details) {
135     eventBus.handleEvent(SseEvent.builder()
136         .addClientId(clientId)
137         .data(jsonMapper.writeValueAsString(new ServerSentEventResponse()
138             .eventType(ServerSentEventResponse.EventTypeEnum.EXTRACT_FAILED)
139             .id(UUIDv7.randomV7())
140             .details(Map.of(
141                 "message", "An error occurred during extract creation", "explanation", details))))
142         .build());
143   }
144 
145   public void emitProgress(
146       @NonNull String clientId,
147       @Nullable String fileId,
148       int progress,
149       boolean completed,
150       @Nullable String message) {
151     message = StringUtils.isBlank(message) ? "Extract task started" : message;
152     fileId = StringUtils.isBlank(fileId) ? "" : fileId;
153     logger.debug("Emitting progress {}% for client [{}], message: '{}'", progress, clientId, message);
154 
155     eventBus.handleEvent(SseEvent.builder()
156         .addClientId(clientId)
157         .data(jsonMapper.writeValueAsString(new ServerSentEventResponse()
158             .eventType(
159                 completed
160                     ? ServerSentEventResponse.EventTypeEnum.EXTRACT_COMPLETED
161                     : ServerSentEventResponse.EventTypeEnum.EXTRACT_PROGRESS)
162             .id(UUIDv7.randomV7())
163             .details(Map.of(
164                 "progress",
165                 progress,
166                 "message",
167                 completed ? "Extract task completed" : message,
168                 "downloadId",
169                 fileId))))
170         .build());
171   }
172 
173   /**
174    * Check that the sse client id is valid and exists.
175    *
176    * @param clientId the SSE client id
177    * @throws IllegalArgumentException when the SSE client id is invalid or not found on the event bus
178    */
179   public void validateClientId(@NonNull String clientId) throws IllegalArgumentException {
180     if (!clientId.matches("[A-Za-z0-9_-]+")) {
181       logger.warn("Invalid clientId for SSE connection: {}", clientId);
182       throw new IllegalArgumentException("Invalid clientId");
183     }
184 
185     // validate the given clientId is known on the event bus
186     this.eventBus.getAllClientIds().stream()
187         .filter(id -> Objects.equals(id, clientId))
188         .findFirst()
189         .ifPresentOrElse(id -> logger.debug("Validated clientId {}", id), () -> {
190           throw new IllegalArgumentException("No active subscription found for clientId " + clientId);
191         });
192   }
193 
194   /**
195    * Create a validated filename for an extract. The naming follows the pattern
196    * {@code "%s_%s_%s%s".formatted(sourceFT.getName(), clientId, UUIDv7.randomV7(), outputFormat.getExtension()) }
197    * where the first part is the source feature type name (this is cleaned from some characters), the second part is
198    * the SSE client id, the third part is a random UUIDv7 and the fourth part is the file extension based on the
199    * requested output format.
200    *
201    * @param clientId the SSE client id
202    * @param sourceFT the source featuretype for the extract
203    * @param outputFormat the required format of the extract
204    * @return the filename used to create an extract
205    * @throws IllegalArgumentException when the SSE clientId is invalid or not found on the event bus
206    */
207   public String createExtractFilename(
208       @NonNull String clientId,
209       @NonNull TMFeatureType sourceFT,
210       LayerExtractController.@NonNull ExtractOutputFormat outputFormat)
211       throws IllegalArgumentException {
212 
213     this.validateClientId(clientId);
214 
215     String cleanFTName = sourceFT.getName();
216     if (cleanFTName.contains(":")) {
217       // clip off the WFS namespace part
218       cleanFTName = cleanFTName.substring(cleanFTName.lastIndexOf(":") + 1);
219       // remove: '.' and '_' which are used as separators in the filename and could cause issues when parsing the
220       // filename later on
221       cleanFTName = cleanFTName.replaceAll("[._]", "");
222     }
223     return "%s_%s_%s%s".formatted(cleanFTName, clientId, UUIDv7.randomV7(), outputFormat.getExtension());
224   }
225 
226   @Async("extractTaskExecutor")
227   @Transactional
228   public void createLayerExtract(
229       @NonNull String clientId,
230       @NonNull TMFeatureType inputTmFeatureType,
231       @NonNull Set<String> attributes,
232       @Nullable Filter filter,
233       String sortBy,
234       SortOrder sortOrder,
235       LayerExtractController.@NonNull ExtractOutputFormat extractOutputFormat,
236       @NonNull String outputFileName) {
237 
238     this.emitProgress(clientId, outputFileName, 0, false, "Starting extract");
239 
240     switch (extractOutputFormat) {
241       case GEOPACKAGE ->
242         this.handleGeoPackage(
243             clientId, inputTmFeatureType, attributes, filter, sortBy, sortOrder, outputFileName);
244       case SHAPE ->
245         this.handleWithShapeDumper(
246             clientId, inputTmFeatureType, attributes, filter, sortBy, sortOrder, outputFileName);
247       case CSV, GEOJSON, XLSX ->
248         this.handleSingleFileFormats(
249             clientId,
250             inputTmFeatureType,
251             attributes,
252             filter,
253             sortBy,
254             sortOrder,
255             extractOutputFormat,
256             outputFileName);
257     }
258   }
259 
260   private void handleGeoPackage(
261       @NonNull String clientId,
262       @NonNull TMFeatureType inputTmFeatureType,
263       @NonNull Set<String> attributes,
264       Filter filter,
265       String sortBy,
266       SortOrder sortOrder,
267       @NonNull String outputFileName) {
268 
269     SimpleFeatureSource inputFeatureSource = null;
270     File outputFile;
271     try {
272       outputFile = getValidatedOutputFile(outputFileName);
273       if (!logger.isDebugEnabled()) {
274         // delete in production after JVM exit because the event bus will be reset when the JVM exits, and then
275         // we are unlikely to have a reference to the file anymore.
276         // In debug/development mode we want to keep the file for inspection.
277         outputFile.deleteOnExit();
278       }
279     } catch (IOException e) {
280       emitError(clientId, e.getMessage());
281       logger.error("Creating extract failed", e);
282       return;
283     }
284 
285     try (GeoPackage geopkg = new GeoPackage(outputFile)) {
286       geopkg.init();
287 
288       inputFeatureSource = featureSourceFactoryHelper.openGeoToolsFeatureSource(inputTmFeatureType);
289 
290       Query q = createQuery(inputFeatureSource, attributes, filter, sortBy, sortOrder);
291 
292       int featCount = getFeatureCount(inputFeatureSource, q);
293       if (featCount < 0) {
294         logger.warn("Could not determine feature count for extract, progress reporting will be inaccurate");
295       }
296       final boolean hasKnownFeatureCount = featCount > 0;
297 
298       SimpleFeatureType fType =
299           DataUtilities.createSubType(inputFeatureSource.getSchema(), attributes.toArray(new String[0]));
300 
301       FeatureEntry entry = new FeatureEntry();
302       entry.setTableName(fType.getTypeName());
303       entry.setDescription(fType.getTypeName());
304 
305       AtomicInteger lastProgress = new AtomicInteger(0);
306       geopkg.add(
307           entry,
308           new ProgressReportingFeatureCollection(
309               inputFeatureSource.getFeatures(q), progressReportInterval, processed -> {
310                 int progress = hasKnownFeatureCount ? (int) ((processed / (double) featCount) * 99) : 0;
311                 lastProgress.set(progress);
312                 String progressMessage = hasKnownFeatureCount
313                     ? "Extracting geopackage: %d/%d features processed"
314                         .formatted(processed, featCount)
315                     : "Extracting geopackage: %d features processed".formatted(processed);
316                 this.emitProgress(clientId, outputFileName, progress, false, progressMessage);
317               }));
318 
319       this.emitProgress(
320           clientId,
321           outputFileName,
322           Math.max(99, lastProgress.get()),
323           false,
324           "Extract geopackage created successfully");
325       geopkg.createSpatialIndex(entry);
326       geopkg.close();
327       this.emitProgress(clientId, outputFileName, 100, true, "Extract completed successfully");
328     } catch (SchemaException | IOException | IllegalArgumentException e) {
329       emitError(clientId, e.getMessage());
330       logger.error("Creating extract failed", e);
331     } finally {
332       if (inputFeatureSource != null) {
333         try {
334           inputFeatureSource.getDataStore().dispose();
335         } catch (Exception e) {
336           logger.warn("Error disposing datastore for feature source {}", inputFeatureSource.getName(), e);
337         }
338       }
339     }
340   }
341 
342   private void handleSingleFileFormats(
343       @NonNull String clientId,
344       @NonNull TMFeatureType inputTmFeatureType,
345       @NonNull Set<String> attributes,
346       Filter filter,
347       String sortBy,
348       SortOrder sortOrder,
349       LayerExtractController.@NonNull ExtractOutputFormat extractOutputFormat,
350       @NonNull String outputFileName) {
351 
352     SimpleFeatureSource inputFeatureSource = null;
353     FileDataStore outputDataStore = null;
354     try (Transaction outputTransaction = new DefaultTransaction("tailormap-extract-output")) {
355       inputFeatureSource = featureSourceFactoryHelper.openGeoToolsFeatureSource(inputTmFeatureType);
356 
357       Query q = createQuery(inputFeatureSource, attributes, filter, sortBy, sortOrder);
358 
359       int featCount = getFeatureCount(inputFeatureSource, q);
360 
361       if (extractOutputFormat == LayerExtractController.ExtractOutputFormat.XLSX
362           && featCount >= ExcelDataStore.getMaxRows()) {
363         this.emitError(
364             clientId,
365             "Extract result contains %d features, which exceeds the maximum of %d for Excel output format. Please refine your filter or choose a different output format."
366                 .formatted(featCount, ExcelDataStore.getMaxRows()));
367         logger.error(
368             "Extract result contains {} features, which exceeds the maximum of {} for Excel output format. Please refine your filter or choose a different output format.",
369             featCount,
370             ExcelDataStore.getMaxRows());
371         // nothing we can do now as we are in a background/async process, so we just return without creating an
372         // extract file.
373         // The client will receive no extract completed event, and we have already emitted an error message with
374         // details.
375         return;
376       }
377 
378       outputDataStore = this.getExtractDataStore(
379           extractOutputFormat, outputFileName, clientId, inputTmFeatureType.getName());
380       SimpleFeatureType fType =
381           DataUtilities.createSubType(inputFeatureSource.getSchema(), attributes.toArray(new String[0]));
382       outputDataStore.createSchema(fType);
383 
384       if (outputDataStore instanceof ExcelDataStore excelDataStore) {
385         excelDataStore.setEnableCellAutoSizing(featCount >= 0 && featCount < 1000);
386       }
387 
388       final AtomicInteger featsAdded = new AtomicInteger();
389       if (outputDataStore.getFeatureSource() instanceof SimpleFeatureStore featureStore) {
390         featureStore.setTransaction(outputTransaction);
391         featureStore.addFeatureListener(event -> {
392           if (event.getType().equals(FeatureEvent.Type.ADDED)) {
393             featsAdded.getAndIncrement();
394           }
395           if (featCount > 0) {
396             if (featsAdded.get() % progressReportInterval == 0) {
397               this.emitProgress(
398                   clientId,
399                   outputFileName,
400                   (int) ((featsAdded.doubleValue() / featCount) * 100),
401                   false,
402                   null);
403             }
404           }
405         });
406         featureStore.addFeatures(inputFeatureSource.getFeatures(q));
407         outputTransaction.commit();
408         outputDataStore.dispose();
409         this.emitProgress(clientId, outputFileName, 100, true, "Extract completed successfully");
410       } else {
411         outputDataStore.dispose();
412         this.emitError(clientId, "Output datastore is not a SimpleFeatureStore, cannot write features");
413         logger.error("Output datastore is not a SimpleFeatureStore, cannot write features");
414       }
415     } catch (IOException | SchemaException | IllegalArgumentException | NullPointerException e) {
416       emitError(clientId, e.getMessage());
417       logger.error("Creating extract failed", e);
418     } finally {
419       if (outputDataStore != null) {
420         outputDataStore.dispose();
421       }
422       if (inputFeatureSource != null) {
423         try {
424           inputFeatureSource.getDataStore().dispose();
425         } catch (Exception e) {
426           logger.warn("Error disposing datastore for feature source {}", inputFeatureSource.getName(), e);
427         }
428       }
429     }
430   }
431 
432   private File getValidatedOutputFile(String outputFileName) throws IOException {
433     Path exportRoot = Path.of(exportFilesLocation).toRealPath();
434     Path outputPath = exportRoot.resolve(outputFileName).normalize();
435     if (!outputPath.startsWith(exportRoot)) {
436       throw new IOException("Invalid file path");
437     }
438     Path createdFilePath = Files.createFile(outputPath).toRealPath();
439     if (!createdFilePath.startsWith(exportRoot)) {
440       throw new IOException("Invalid file path");
441     }
442     return createdFilePath.toFile();
443   }
444 
445   /**
446    * Create a writable GeoTools {@link FileDataStore} for the requested extract format. The format must be must be
447    * supported by a {@link FileDataStore} implementation, for example CSV, Excel or GeoJSON. For unsupported formats
448    * (for example Shapefile) a custom handling is used in the calling method.
449    *
450    * <p>The output file is validated to ensure it is created under the configured extract location.
451    *
452    * @param extractOutputFormat the requested extract output format
453    * @param outputFileName the target output filename
454    * @param clientId the SSE client id, used for error reporting
455    * @param typeName the source feature type name, used to derive format-specific metadata (for example Excel sheet
456    *     name)
457    * @return a newly created {@link FileDataStore} configured for the requested format
458    * @throws IOException when the output file path is invalid or the datastore cannot be created
459    */
460   private FileDataStore getExtractDataStore(
461       LayerExtractController.ExtractOutputFormat extractOutputFormat,
462       String outputFileName,
463       String clientId,
464       String typeName)
465       throws IOException {
466 
467     final File outputFile = getValidatedOutputFile(outputFileName);
468     if (!logger.isDebugEnabled()) {
469       // delete in production after JVM exit because the event bus will be reset when the JVM exits, and then we
470       // are unlikely to have a reference to the file anymore.
471       // In debug/development mode we want to keep the file for inspection.
472       outputFile.deleteOnExit();
473     }
474 
475     switch (extractOutputFormat) {
476       case CSV -> {
477         return (FileDataStore) new CSVDataStoreFactory()
478             .createNewDataStore(Map.of(
479                 CSVDataStoreFactory.FILE_PARAM.key,
480                 outputFile,
481                 CSVDataStoreFactory.STRATEGYP.key,
482                 CSVDataStoreFactory.WKT_STRATEGY,
483                 CSVDataStoreFactory.WKTP.key,
484                 "the_geom_wkt",
485                 CSVDataStoreFactory.WRITEPRJ.key,
486                 false,
487                 CSVDataStoreFactory.QUOTEALL.key,
488                 true));
489       }
490       case XLSX -> {
491         // replace any invalid characters such as /\?*[] with '_' and clip to 31 characters because Excel has
492         // limitations on sheet names. Also clip off any WFS namespace prefix in the type name, which is often
493         // separated by a ':' character, because ':' is not allowed in Excel sheet names.
494         typeName = typeName.contains(":")
495             ? typeName.substring(typeName.lastIndexOf(":") + 1).replaceAll("[\\\\/?*\\[\\]:]", "_")
496             : typeName.replaceAll("[\\\\/?*\\[\\]:]", "_");
497         typeName = typeName.substring(0, Math.min(typeName.length(), 31));
498         return (FileDataStore) new ExcelDataStoreFactory()
499             .createNewDataStore(Map.of(
500                 ExcelDataStoreFactory.FILE_PARAM.key,
501                 outputFile,
502                 ExcelDataStoreFactory.SHEET_PARAM.key,
503                 typeName));
504       }
505       case GEOJSON -> {
506         return (FileDataStore) new GeoJSONDataStoreFactory()
507             .createNewDataStore(Map.of(GeoJSONDataStoreFactory.FILE_PARAM.key, outputFile));
508       }
509       default -> {
510         // should never happen
511         emitError(clientId, "Unknown output format: " + extractOutputFormat);
512         logger.error("Unknown output format: {}", extractOutputFormat);
513         throw new IllegalArgumentException("Unknown output format: " + extractOutputFormat);
514       }
515     }
516   }
517 
518   private int getFeatureCount(SimpleFeatureSource source, Query query) throws IOException {
519     int count = source.getCount(query);
520     logger.debug("Filtered source counts {} features", count);
521     if (count < 0 && exactWfsCounts) {
522       count = source.getFeatures(query).size();
523     }
524     return count;
525   }
526 
527   private void handleWithShapeDumper(
528       @NonNull String clientId,
529       @NonNull TMFeatureType inputTmFeatureType,
530       @NonNull Set<String> attributes,
531       Filter filter,
532       String sortBy,
533       SortOrder sortOrder,
534       @NonNull String outputFileName) {
535     SimpleFeatureSource inputFeatureSource = null;
536     File outputDirectory = null;
537     try {
538       File outputFile = getValidatedOutputFile(outputFileName);
539       String baseName = outputFile
540           .getName()
541           .substring(
542               0,
543               outputFile
544                   .getName()
545                   .lastIndexOf(LayerExtractController.ExtractOutputFormat.SHAPE.getExtension()));
546       outputDirectory = outputFile
547           .getParentFile()
548           .toPath()
549           .resolve(baseName)
550           .toFile()
551           .getCanonicalFile();
552       if (!logger.isDebugEnabled()) {
553         // delete in production after JVM exit because the event bus will be reset when the JVM exits, and then
554         // we are unlikely to have a reference to the file anymore.
555         // In debug/development mode we want to keep the directory for inspection.
556         outputDirectory.deleteOnExit();
557       }
558       Files.createDirectories(outputDirectory.toPath());
559 
560       ShapefileDumper dumper = new ShapefileDumper(outputDirectory);
561       dumper.setCharset(StandardCharsets.UTF_8);
562       dumper.setEmptyShapefileAllowed(false);
563 
564       inputFeatureSource = featureSourceFactoryHelper.openGeoToolsFeatureSource(inputTmFeatureType);
565 
566       Query q = createQuery(inputFeatureSource, attributes, filter, sortBy, sortOrder);
567 
568       final int featCount = getFeatureCount(inputFeatureSource, q);
569       final boolean hasKnownFeatureCount = featCount > 0;
570 
571       AtomicInteger lastProgress = new AtomicInteger(0);
572 
573       dumper.dump(new ProgressReportingFeatureCollection(
574           inputFeatureSource.getFeatures(q), progressReportInterval, processed -> {
575             int progress = hasKnownFeatureCount ? (int) ((processed / (double) featCount) * 99) : 0;
576             lastProgress.set(progress);
577             String progressMessage = hasKnownFeatureCount
578                 ? "Extracting shapes: %d/%d features processed".formatted(processed, featCount)
579                 : "Extracting shapes: %d features processed".formatted(processed);
580             this.emitProgress(clientId, outputFileName, progress, false, progressMessage);
581           }));
582       this.emitProgress(
583           clientId,
584           outputFileName,
585           Math.max(99, lastProgress.get()),
586           false,
587           "Extract shapes dumped successfully");
588 
589       zipDirectory(outputDirectory.toPath(), outputFile.toPath());
590       this.emitProgress(clientId, outputFileName, 100, true, "Extract completed successfully");
591     } catch (IOException | IllegalArgumentException e) {
592       emitError(clientId, e.getMessage());
593       logger.error("Creating extract failed", e);
594     } finally {
595       if (outputDirectory != null) {
596         try {
597           deleteDirectoryRecursively(outputDirectory.toPath());
598         } catch (IOException e) {
599           logger.error("Failed to delete output directory {}", outputDirectory, e);
600         }
601       }
602       if (inputFeatureSource != null) {
603         try {
604           inputFeatureSource.getDataStore().dispose();
605         } catch (Exception e) {
606           logger.warn("Error disposing datastore for feature source {}", inputFeatureSource.getName(), e);
607         }
608       }
609     }
610   }
611 
612   private Query createQuery(
613       SimpleFeatureSource inputFeatureSource,
614       Set<String> attributes,
615       Filter filter,
616       String sortBy,
617       SortOrder sortOrder) {
618     Query q = new Query(inputFeatureSource.getName().toString());
619     if (!attributes.isEmpty()) {
620       q.setPropertyNames(attributes.toArray(new String[0]));
621     }
622 
623     if (filter != null) {
624       q.setFilter(filter);
625     }
626     if (!StringUtils.isBlank(sortBy)) {
627       q.setSortBy(ff.sort(sortBy, Objects.requireNonNullElse(sortOrder, SortOrder.ASCENDING)));
628     }
629     return q;
630   }
631 
632   /**
633    * Cleanup expired extract files. Filenames are created in {@link CreateLayerExtractService#createExtractFilename }
634    * and follow the pattern {@code "%s_%s_%s.%s".formatted(sourceFT.getName(), clientId, UUIDv7.randomV7(),
635    * outputFormat.getExtension()) }
636    */
637   @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.MINUTES, initialDelay = 15)
638   public void cleanupExpiredExtracts() {
639     logger.debug("Running expired extracts cleanup...");
640     List<FileWithAttributes> oldDownloadFilesOnDisk = new ArrayList<>();
641     Set<String> validClientIds = eventBus.getAllClientIds();
642 
643     // list download files in export location and delete those that are not bound to an active sse stream client
644     try (Stream<Path> stream = Files.walk(Path.of(exportFilesLocation))) {
645       stream.filter(Files::isRegularFile).forEach(path -> {
646         File file = path.toFile();
647         String filename = file.getName();
648         String[] parts = filename.split("[_.]", -1);
649         if (parts.length < 4) {
650           logger.warn("Unexpected file in extract location: {}", filename);
651           return;
652         }
653         String clientId = parts[1];
654         if (!validClientIds.contains(clientId)) {
655           if (!file.delete()) {
656             logger.error("Failed to delete unattached extract file {}", filename);
657           }
658         } else {
659           try {
660             Instant timestampPart = UUIDv7.timestampAsInstant(UUIDv7.fromString(parts[2]));
661             oldDownloadFilesOnDisk.add(new FileWithAttributes(file, timestampPart, clientId));
662           } catch (IllegalArgumentException ignored) {
663             // not a valid v7 uuid
664           }
665         }
666       });
667 
668       try (Stream<Path> paths = Files.walk(Path.of(exportFilesLocation))) {
669         paths.filter(Files::isDirectory).forEach(path -> {
670           File file = path.toFile();
671           String filename = file.getName();
672           String[] parts = filename.split("[_]", -1);
673           if (parts.length < 3) {
674             logger.warn("Unexpected directory in extract location: {}", filename);
675             return;
676           }
677           String clientId = parts[1];
678           if (!validClientIds.contains(clientId)) {
679             try {
680               deleteDirectoryRecursively(file.toPath());
681             } catch (IOException e) {
682               logger.error("Failed to delete unattached extract directory {}", filename);
683             }
684           } else {
685             try {
686               Instant timestampPart = UUIDv7.timestampAsInstant(UUIDv7.fromString(parts[2]));
687               oldDownloadFilesOnDisk.add(new FileWithAttributes(file, timestampPart, clientId));
688             } catch (IllegalArgumentException ignored) {
689               // not a valid v7 uuid
690             }
691           }
692         });
693       }
694 
695       // delete any files/directories are older than the cutoff
696       oldDownloadFilesOnDisk.stream()
697           .filter(f -> f.timestamp()
698               .isBefore(Instant.now().minusSeconds(TimeUnit.MINUTES.toSeconds(cleanupIntervalMinutes))))
699           .forEach(f -> {
700             if (f.file.isDirectory()) {
701               try {
702                 deleteDirectoryRecursively(f.file().toPath());
703               } catch (IOException ignored) {
704                 logger.warn("Failed to delete directory {}", f.file());
705               }
706             } else {
707               if (!f.file().delete()) {
708                 logger.error(
709                     "Failed to delete expired extract file {}",
710                     f.file().getName());
711               }
712             }
713           });
714     } catch (IOException e) {
715       logger.error("Error while cleaning up expired extracts", e);
716     }
717   }
718 
719   private void zipDirectory(Path sourceDir, Path zipFile) throws IOException {
720     try (ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(zipFile));
721         Stream<Path> pathStream = Files.walk(sourceDir)) {
722       pathStream.filter(Files::isRegularFile).forEach(path -> {
723         String entryName = sourceDir.relativize(path).toString().replace(File.separatorChar, '/');
724         try {
725           zos.putNextEntry(new ZipEntry(entryName));
726           Files.copy(path, zos);
727           zos.closeEntry();
728         } catch (IOException e) {
729           throw new RuntimeException("Failed to add file to zip: " + path, e);
730         }
731       });
732     } catch (RuntimeException e) {
733       if (e.getCause() instanceof IOException ioException) {
734         throw ioException;
735       }
736       throw e;
737     }
738   }
739 
740   private void deleteDirectoryRecursively(Path directory) throws IOException {
741     try (Stream<Path> paths = Files.walk(directory)) {
742       paths.sorted(Comparator.reverseOrder()).forEach(path -> {
743         try {
744           logger.debug("Deleting path {}", path);
745           Files.deleteIfExists(path);
746         } catch (IOException e) {
747           throw new RuntimeException("Failed to delete path: " + path, e);
748         }
749       });
750     } catch (RuntimeException e) {
751       if (e.getCause() instanceof IOException ioException) {
752         throw ioException;
753       }
754       throw e;
755     }
756   }
757 
758   private record FileWithAttributes(File file, Instant timestamp, String clientId) {}
759 }