001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.batch2;
021
022import ca.uhn.fhir.batch2.api.IJobPersistence;
023import ca.uhn.fhir.batch2.api.JobOperationResultJson;
024import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
025import ca.uhn.fhir.batch2.model.JobInstance;
026import ca.uhn.fhir.batch2.model.StatusEnum;
027import ca.uhn.fhir.batch2.model.WorkChunk;
028import ca.uhn.fhir.batch2.model.WorkChunkCompletionEvent;
029import ca.uhn.fhir.batch2.model.WorkChunkCreateEvent;
030import ca.uhn.fhir.batch2.model.WorkChunkErrorEvent;
031import ca.uhn.fhir.batch2.model.WorkChunkMetadata;
032import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
033import ca.uhn.fhir.batch2.models.JobInstanceFetchRequest;
034import ca.uhn.fhir.interceptor.api.HookParams;
035import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
036import ca.uhn.fhir.interceptor.api.Pointcut;
037import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
038import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkMetadataViewRepository;
039import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
040import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
041import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
042import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
043import ca.uhn.fhir.jpa.entity.Batch2WorkChunkMetadataView;
044import ca.uhn.fhir.model.api.PagingIterator;
045import ca.uhn.fhir.rest.api.server.RequestDetails;
046import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
047import ca.uhn.fhir.util.Batch2JobDefinitionConstants;
048import ca.uhn.fhir.util.Logs;
049import com.fasterxml.jackson.core.JsonParser;
050import com.fasterxml.jackson.databind.JsonNode;
051import com.fasterxml.jackson.databind.ObjectMapper;
052import com.fasterxml.jackson.databind.node.ObjectNode;
053import jakarta.annotation.Nonnull;
054import jakarta.annotation.Nullable;
055import jakarta.persistence.EntityManager;
056import jakarta.persistence.LockModeType;
057import jakarta.persistence.Query;
058import org.apache.commons.collections4.ListUtils;
059import org.apache.commons.lang3.Validate;
060import org.slf4j.Logger;
061import org.springframework.data.domain.Page;
062import org.springframework.data.domain.PageImpl;
063import org.springframework.data.domain.PageRequest;
064import org.springframework.data.domain.Pageable;
065import org.springframework.data.domain.Sort;
066import org.springframework.transaction.annotation.Propagation;
067import org.springframework.transaction.annotation.Transactional;
068import org.springframework.transaction.support.TransactionSynchronizationManager;
069
070import java.time.Instant;
071import java.util.Collections;
072import java.util.Date;
073import java.util.HashSet;
074import java.util.Iterator;
075import java.util.List;
076import java.util.Objects;
077import java.util.Optional;
078import java.util.Set;
079import java.util.UUID;
080import java.util.function.Consumer;
081import java.util.stream.Collectors;
082import java.util.stream.Stream;
083
084import static ca.uhn.fhir.batch2.coordinator.WorkChunkProcessor.MAX_CHUNK_ERROR_COUNT;
085import static ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity.ERROR_MSG_MAX_LENGTH;
086import static org.apache.commons.lang3.StringUtils.isBlank;
087
088public class JpaJobPersistenceImpl implements IJobPersistence {
089        private static final Logger ourLog = Logs.getBatchTroubleshootingLog();
090        public static final String CREATE_TIME = "myCreateTime";
091
092        private final IBatch2JobInstanceRepository myJobInstanceRepository;
093        private final IBatch2WorkChunkRepository myWorkChunkRepository;
094        private final IBatch2WorkChunkMetadataViewRepository myWorkChunkMetadataViewRepo;
095        private final EntityManager myEntityManager;
096        private final IHapiTransactionService myTransactionService;
097        private final IInterceptorBroadcaster myInterceptorBroadcaster;
098
099        /**
100         * Constructor
101         */
102        public JpaJobPersistenceImpl(
103                        IBatch2JobInstanceRepository theJobInstanceRepository,
104                        IBatch2WorkChunkRepository theWorkChunkRepository,
105                        IBatch2WorkChunkMetadataViewRepository theWorkChunkMetadataViewRepo,
106                        IHapiTransactionService theTransactionService,
107                        EntityManager theEntityManager,
108                        IInterceptorBroadcaster theInterceptorBroadcaster) {
109                Validate.notNull(theJobInstanceRepository, "theJobInstanceRepository");
110                Validate.notNull(theWorkChunkRepository, "theWorkChunkRepository");
111                myJobInstanceRepository = theJobInstanceRepository;
112                myWorkChunkRepository = theWorkChunkRepository;
113                myWorkChunkMetadataViewRepo = theWorkChunkMetadataViewRepo;
114                myTransactionService = theTransactionService;
115                myEntityManager = theEntityManager;
116                myInterceptorBroadcaster = theInterceptorBroadcaster;
117        }
118
119        @Override
120        @Transactional(propagation = Propagation.REQUIRED)
121        public String onWorkChunkCreate(WorkChunkCreateEvent theBatchWorkChunk) {
122                Batch2WorkChunkEntity entity = new Batch2WorkChunkEntity();
123                entity.setId(UUID.randomUUID().toString());
124                entity.setSequence(theBatchWorkChunk.sequence);
125                entity.setJobDefinitionId(theBatchWorkChunk.jobDefinitionId);
126                entity.setJobDefinitionVersion(theBatchWorkChunk.jobDefinitionVersion);
127                entity.setTargetStepId(theBatchWorkChunk.targetStepId);
128                entity.setInstanceId(theBatchWorkChunk.instanceId);
129                entity.setSerializedData(theBatchWorkChunk.serializedData);
130                entity.setCreateTime(new Date());
131                entity.setStartTime(new Date());
132                entity.setStatus(getOnCreateStatus(theBatchWorkChunk));
133
134                ourLog.debug("Create work chunk {}/{}/{}", entity.getInstanceId(), entity.getId(), entity.getTargetStepId());
135                ourLog.trace(
136                                "Create work chunk data {}/{}: {}", entity.getInstanceId(), entity.getId(), entity.getSerializedData());
137                myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> myWorkChunkRepository.save(entity));
138
139                return entity.getId();
140        }
141
142        /**
143         * Gets the initial onCreate state for the given workchunk.
144         * Gated job chunks start in GATE_WAITING; they will be transitioned to READY during maintenance pass when all
145         * chunks in the previous step are COMPLETED.
146         * Non gated job chunks start in READY
147         */
148        private static WorkChunkStatusEnum getOnCreateStatus(WorkChunkCreateEvent theBatchWorkChunk) {
149                if (theBatchWorkChunk.isGatedExecution) {
150                        return WorkChunkStatusEnum.GATE_WAITING;
151                } else {
152                        return WorkChunkStatusEnum.READY;
153                }
154        }
155
156        @Override
157        @Transactional(propagation = Propagation.REQUIRED)
158        public Optional<WorkChunk> onWorkChunkDequeue(String theChunkId) {
159                // take a lock on the chunk id to ensure that the maintenance run isn't doing anything.
160                Batch2WorkChunkEntity chunkLock =
161                                myEntityManager.find(Batch2WorkChunkEntity.class, theChunkId, LockModeType.PESSIMISTIC_WRITE);
162                // remove from the current state to avoid stale data.
163                myEntityManager.detach(chunkLock);
164
165                // NOTE: Ideally, IN_PROGRESS wouldn't be allowed here.  On chunk failure, we probably shouldn't be allowed.
166                // But how does re-run happen if k8s kills a processor mid run?
167                List<WorkChunkStatusEnum> priorStates =
168                                List.of(WorkChunkStatusEnum.QUEUED, WorkChunkStatusEnum.ERRORED, WorkChunkStatusEnum.IN_PROGRESS);
169                int rowsModified = myWorkChunkRepository.updateChunkStatusForStart(
170                                theChunkId, new Date(), WorkChunkStatusEnum.IN_PROGRESS, priorStates);
171
172                if (rowsModified == 0) {
173                        ourLog.info("Attempting to start chunk {} but it was already started.", theChunkId);
174                        return Optional.empty();
175                } else {
176                        Optional<Batch2WorkChunkEntity> chunk = myWorkChunkRepository.findById(theChunkId);
177                        return chunk.map(this::toChunk);
178                }
179        }
180
181        @Override
182        @Transactional(propagation = Propagation.REQUIRED)
183        public String storeNewInstance(JobInstance theInstance) {
184                Validate.isTrue(isBlank(theInstance.getInstanceId()));
185
186                invokePreStorageBatchHooks(theInstance);
187
188                Batch2JobInstanceEntity entity = new Batch2JobInstanceEntity();
189                entity.setId(UUID.randomUUID().toString());
190                entity.setDefinitionId(theInstance.getJobDefinitionId());
191                entity.setDefinitionVersion(theInstance.getJobDefinitionVersion());
192                entity.setStatus(theInstance.getStatus());
193                entity.setParams(theInstance.getParameters());
194                entity.setCurrentGatedStepId(theInstance.getCurrentGatedStepId());
195                entity.setFastTracking(theInstance.isFastTracking());
196                entity.setCreateTime(new Date());
197                entity.setStartTime(new Date());
198                entity.setReport(theInstance.getReport());
199                entity.setTriggeringUsername(theInstance.getTriggeringUsername());
200                entity.setTriggeringClientId(theInstance.getTriggeringClientId());
201
202                entity = myJobInstanceRepository.save(entity);
203                return entity.getId();
204        }
205
206        @Override
207        @Transactional(propagation = Propagation.REQUIRES_NEW)
208        public List<JobInstance> fetchInstances(
209                        String theJobDefinitionId, Set<StatusEnum> theStatuses, Date theCutoff, Pageable thePageable) {
210                return toInstanceList(myJobInstanceRepository.findInstancesByJobIdAndStatusAndExpiry(
211                                theJobDefinitionId, theStatuses, theCutoff, thePageable));
212        }
213
214        @Override
215        @Transactional(propagation = Propagation.REQUIRES_NEW)
216        public List<JobInstance> fetchInstancesByJobDefinitionIdAndStatus(
217                        String theJobDefinitionId, Set<StatusEnum> theRequestedStatuses, int thePageSize, int thePageIndex) {
218                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
219                return toInstanceList(myJobInstanceRepository.fetchInstancesByJobDefinitionIdAndStatus(
220                                theJobDefinitionId, theRequestedStatuses, pageRequest));
221        }
222
223        @Override
224        @Transactional(propagation = Propagation.REQUIRES_NEW)
225        public List<JobInstance> fetchInstancesByJobDefinitionId(
226                        String theJobDefinitionId, int thePageSize, int thePageIndex) {
227                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
228                return toInstanceList(myJobInstanceRepository.findInstancesByJobDefinitionId(theJobDefinitionId, pageRequest));
229        }
230
231        @Override
232        @Transactional(propagation = Propagation.REQUIRES_NEW)
233        public Page<JobInstance> fetchJobInstances(JobInstanceFetchRequest theRequest) {
234                PageRequest pageRequest =
235                                PageRequest.of(theRequest.getPageStart(), theRequest.getBatchSize(), theRequest.getSort());
236
237                String jobStatus = theRequest.getJobStatus();
238                if (Objects.equals(jobStatus, "")) {
239                        Page<Batch2JobInstanceEntity> pageOfEntities = myJobInstanceRepository.findAll(pageRequest);
240                        return pageOfEntities.map(this::toInstance);
241                }
242
243                StatusEnum status = StatusEnum.valueOf(jobStatus);
244                List<JobInstance> jobs = toInstanceList(myJobInstanceRepository.findInstancesByJobStatus(status, pageRequest));
245                Integer jobsOfStatus = myJobInstanceRepository.findTotalJobsOfStatus(status);
246                return new PageImpl<>(jobs, pageRequest, jobsOfStatus);
247        }
248
249        private List<JobInstance> toInstanceList(List<Batch2JobInstanceEntity> theInstancesByJobDefinitionId) {
250                return theInstancesByJobDefinitionId.stream().map(this::toInstance).collect(Collectors.toList());
251        }
252
253        @Override
254        @Nonnull
255        public Optional<JobInstance> fetchInstance(String theInstanceId) {
256                return myTransactionService
257                                .withSystemRequestOnDefaultPartition()
258                                .execute(() -> myJobInstanceRepository.findById(theInstanceId).map(this::toInstance));
259        }
260
261        @Override
262        @Transactional(propagation = Propagation.REQUIRES_NEW)
263        public List<JobInstance> fetchInstances(FetchJobInstancesRequest theRequest, int thePage, int theBatchSize) {
264                String definitionId = theRequest.getJobDefinition();
265                String params = theRequest.getParameters();
266                Set<StatusEnum> statuses = theRequest.getStatuses();
267
268                Pageable pageable = PageRequest.of(thePage, theBatchSize);
269
270                List<Batch2JobInstanceEntity> instanceEntities;
271
272                if (statuses != null && !statuses.isEmpty()) {
273                        if (Batch2JobDefinitionConstants.BULK_EXPORT.equals(definitionId)) {
274                                if (originalRequestUrlTruncation(params) != null) {
275                                        params = originalRequestUrlTruncation(params);
276                                }
277                        }
278                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdParamsAndStatus(
279                                        definitionId, params, statuses, pageable);
280                } else {
281                        instanceEntities = myJobInstanceRepository.findInstancesByJobIdAndParams(definitionId, params, pageable);
282                }
283                return toInstanceList(instanceEntities);
284        }
285
286        private String originalRequestUrlTruncation(String theParams) {
287                try {
288                        ObjectMapper mapper = new ObjectMapper();
289                        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
290                        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
291                        JsonNode rootNode = mapper.readTree(theParams);
292                        String originalUrl = "originalRequestUrl";
293
294                        if (rootNode instanceof ObjectNode) {
295                                ObjectNode objectNode = (ObjectNode) rootNode;
296
297                                if (objectNode.has(originalUrl)) {
298                                        String url = objectNode.get(originalUrl).asText();
299                                        if (url.contains("?")) {
300                                                objectNode.put(originalUrl, url.split("\\?")[0]);
301                                        }
302                                }
303                                return mapper.writeValueAsString(objectNode);
304                        }
305                } catch (Exception e) {
306                        ourLog.info("Error Truncating Original Request Url", e);
307                }
308                return null;
309        }
310
311        @Override
312        @Transactional(propagation = Propagation.REQUIRES_NEW)
313        public List<JobInstance> fetchInstances(int thePageSize, int thePageIndex) {
314                // default sort is myCreateTime Asc
315                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.ASC, CREATE_TIME);
316                return myTransactionService
317                                .withSystemRequestOnDefaultPartition()
318                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
319                                                .map(this::toInstance)
320                                                .collect(Collectors.toList()));
321        }
322
323        @Override
324        public void enqueueWorkChunkForProcessing(String theChunkId, Consumer<Integer> theCallback) {
325                int updated = myWorkChunkRepository.updateChunkStatus(
326                                theChunkId, WorkChunkStatusEnum.READY, WorkChunkStatusEnum.QUEUED);
327                theCallback.accept(updated);
328        }
329
330        @Override
331        public int updatePollWaitingChunksForJobIfReady(String theInstanceId) {
332                return myWorkChunkRepository.updateWorkChunksForPollWaiting(
333                                theInstanceId,
334                                Date.from(Instant.now()),
335                                Set.of(WorkChunkStatusEnum.POLL_WAITING),
336                                WorkChunkStatusEnum.READY);
337        }
338
339        @Override
340        @Transactional(propagation = Propagation.REQUIRES_NEW)
341        public List<JobInstance> fetchRecentInstances(int thePageSize, int thePageIndex) {
342                PageRequest pageRequest = PageRequest.of(thePageIndex, thePageSize, Sort.Direction.DESC, CREATE_TIME);
343                return myTransactionService
344                                .withSystemRequestOnDefaultPartition()
345                                .execute(() -> myJobInstanceRepository.findAll(pageRequest).stream()
346                                                .map(this::toInstance)
347                                                .collect(Collectors.toList()));
348        }
349
350        private WorkChunk toChunk(Batch2WorkChunkEntity theEntity) {
351                return JobInstanceUtil.fromEntityToWorkChunk(theEntity);
352        }
353
354        private JobInstance toInstance(Batch2JobInstanceEntity theEntity) {
355                return JobInstanceUtil.fromEntityToInstance(theEntity);
356        }
357
358        @Override
359        public WorkChunkStatusEnum onWorkChunkError(WorkChunkErrorEvent theParameters) {
360                String chunkId = theParameters.getChunkId();
361                String errorMessage = truncateErrorMessage(theParameters.getErrorMsg());
362
363                return myTransactionService.withSystemRequestOnDefaultPartition().execute(() -> {
364                        int changeCount = myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
365                                        chunkId, new Date(), errorMessage, WorkChunkStatusEnum.ERRORED);
366                        Validate.isTrue(changeCount > 0, "changed chunk matching %s", chunkId);
367
368                        Query query = myEntityManager.createQuery("update Batch2WorkChunkEntity " + "set myStatus = :failed "
369                                        + ",myErrorMessage = CONCAT('Too many errors: ', CAST(myErrorCount as string), '. Last error msg was ', myErrorMessage) "
370                                        + "where myId = :chunkId and myErrorCount > :maxCount");
371                        query.setParameter("chunkId", chunkId);
372                        query.setParameter("failed", WorkChunkStatusEnum.FAILED);
373                        query.setParameter("maxCount", MAX_CHUNK_ERROR_COUNT);
374                        int failChangeCount = query.executeUpdate();
375
376                        if (failChangeCount > 0) {
377                                return WorkChunkStatusEnum.FAILED;
378                        } else {
379                                return WorkChunkStatusEnum.ERRORED;
380                        }
381                });
382        }
383
384        @Override
385        public void onWorkChunkPollDelay(String theChunkId, Date theDeadline) {
386                int updated = myWorkChunkRepository.updateWorkChunkNextPollTime(
387                                theChunkId, WorkChunkStatusEnum.POLL_WAITING, Set.of(WorkChunkStatusEnum.IN_PROGRESS), theDeadline);
388
389                if (updated != 1) {
390                        ourLog.warn("Expected to update 1 work chunk's poll delay; but found {}", updated);
391                }
392        }
393
394        @Override
395        public void onWorkChunkFailed(String theChunkId, String theErrorMessage) {
396                ourLog.info("Marking chunk {} as failed with message: {}", theChunkId, theErrorMessage);
397                String errorMessage = truncateErrorMessage(theErrorMessage);
398                myTransactionService
399                                .withSystemRequestOnDefaultPartition()
400                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndIncrementErrorCountForEndError(
401                                                theChunkId, new Date(), errorMessage, WorkChunkStatusEnum.FAILED));
402        }
403
404        @Override
405        public void onWorkChunkCompletion(WorkChunkCompletionEvent theEvent) {
406                myTransactionService
407                                .withSystemRequestOnDefaultPartition()
408                                .execute(() -> myWorkChunkRepository.updateChunkStatusAndClearDataForEndSuccess(
409                                                theEvent.getChunkId(),
410                                                new Date(),
411                                                theEvent.getRecordsProcessed(),
412                                                theEvent.getRecoveredErrorCount(),
413                                                WorkChunkStatusEnum.COMPLETED,
414                                                theEvent.getRecoveredWarningMessage()));
415        }
416
417        @Nullable
418        private static String truncateErrorMessage(String theErrorMessage) {
419                String errorMessage;
420                if (theErrorMessage != null && theErrorMessage.length() > ERROR_MSG_MAX_LENGTH) {
421                        ourLog.warn("Truncating error message that is too long to store in database: {}", theErrorMessage);
422                        errorMessage = theErrorMessage.substring(0, ERROR_MSG_MAX_LENGTH);
423                } else {
424                        errorMessage = theErrorMessage;
425                }
426                return errorMessage;
427        }
428
429        @Override
430        public void markWorkChunksWithStatusAndWipeData(
431                        String theInstanceId, List<String> theChunkIds, WorkChunkStatusEnum theStatus, String theErrorMessage) {
432                assert TransactionSynchronizationManager.isActualTransactionActive();
433
434                ourLog.debug("Marking all chunks for instance {} to status {}", theInstanceId, theStatus);
435                String errorMessage = truncateErrorMessage(theErrorMessage);
436                List<List<String>> listOfListOfIds = ListUtils.partition(theChunkIds, 100);
437                for (List<String> idList : listOfListOfIds) {
438                        myWorkChunkRepository.updateAllChunksForInstanceStatusClearDataAndSetError(
439                                        idList, new Date(), theStatus, errorMessage);
440                }
441        }
442
443        @Override
444        public Set<WorkChunkStatusEnum> getDistinctWorkChunkStatesForJobAndStep(
445                        String theInstanceId, String theCurrentStepId) {
446                if (getRunningJob(theInstanceId) == null) {
447                        return Collections.unmodifiableSet(new HashSet<>());
448                }
449                return myWorkChunkRepository.getDistinctStatusesForStep(theInstanceId, theCurrentStepId);
450        }
451
452        private Batch2JobInstanceEntity getRunningJob(String theInstanceId) {
453                Optional<Batch2JobInstanceEntity> instance = myJobInstanceRepository.findById(theInstanceId);
454                if (instance.isEmpty()) {
455                        return null;
456                }
457                if (instance.get().getStatus().isEnded()) {
458                        return null;
459                }
460                return instance.get();
461        }
462
463        private void fetchChunks(
464                        String theInstanceId,
465                        boolean theIncludeData,
466                        int thePageSize,
467                        int thePageIndex,
468                        Consumer<WorkChunk> theConsumer) {
469                myTransactionService
470                                .withSystemRequestOnDefaultPartition()
471                                .withPropagation(Propagation.REQUIRES_NEW)
472                                .execute(() -> {
473                                        List<Batch2WorkChunkEntity> chunks;
474                                        if (theIncludeData) {
475                                                chunks = myWorkChunkRepository.fetchChunks(
476                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
477                                        } else {
478                                                chunks = myWorkChunkRepository.fetchChunksNoData(
479                                                                PageRequest.of(thePageIndex, thePageSize), theInstanceId);
480                                        }
481                                        for (Batch2WorkChunkEntity chunk : chunks) {
482                                                theConsumer.accept(toChunk(chunk));
483                                        }
484                                });
485        }
486
487        @Override
488        public void updateInstanceUpdateTime(String theInstanceId) {
489                myJobInstanceRepository.updateInstanceUpdateTime(theInstanceId, new Date());
490        }
491
492        @Override
493        public WorkChunk createWorkChunk(WorkChunk theWorkChunk) {
494                if (theWorkChunk.getId() == null) {
495                        theWorkChunk.setId(UUID.randomUUID().toString());
496                }
497                return toChunk(myWorkChunkRepository.save(Batch2WorkChunkEntity.fromWorkChunk(theWorkChunk)));
498        }
499
500        /**
501         * Note: Not @Transactional because the transaction happens in a lambda that's called outside of this method's scope
502         */
503        @Override
504        public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
505                return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) ->
506                                fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
507        }
508
509        @Override
510        public Stream<WorkChunk> fetchAllWorkChunksForStepStream(String theInstanceId, String theStepId) {
511                return myWorkChunkRepository
512                                .fetchChunksForStep(theInstanceId, theStepId)
513                                .map(this::toChunk);
514        }
515
516        @Override
517        public Page<WorkChunkMetadata> fetchAllWorkChunkMetadataForJobInStates(
518                        Pageable thePageable, String theInstanceId, Set<WorkChunkStatusEnum> theStates) {
519                Page<Batch2WorkChunkMetadataView> page =
520                                myWorkChunkMetadataViewRepo.fetchWorkChunkMetadataForJobInStates(thePageable, theInstanceId, theStates);
521
522                return page.map(Batch2WorkChunkMetadataView::toChunkMetadata);
523        }
524
525        @Override
526        public boolean updateInstance(String theInstanceId, JobInstanceUpdateCallback theModifier) {
527                Batch2JobInstanceEntity instanceEntity =
528                                myEntityManager.find(Batch2JobInstanceEntity.class, theInstanceId, LockModeType.PESSIMISTIC_WRITE);
529                if (null == instanceEntity) {
530                        ourLog.error("No instance found with Id {}", theInstanceId);
531                        return false;
532                }
533                // convert to JobInstance for public api
534                JobInstance jobInstance = JobInstanceUtil.fromEntityToInstance(instanceEntity);
535
536                // run the modification callback
537                boolean wasModified = theModifier.doUpdate(jobInstance);
538
539                if (wasModified) {
540                        // copy fields back for flush.
541                        JobInstanceUtil.fromInstanceToEntity(jobInstance, instanceEntity);
542                }
543
544                return wasModified;
545        }
546
547        @Override
548        @Transactional(propagation = Propagation.REQUIRES_NEW)
549        public void deleteInstanceAndChunks(String theInstanceId) {
550                ourLog.info("Deleting instance and chunks: {}", theInstanceId);
551                myWorkChunkRepository.deleteAllForInstance(theInstanceId);
552                myJobInstanceRepository.deleteById(theInstanceId);
553        }
554
555        @Override
556        @Transactional(propagation = Propagation.REQUIRES_NEW)
557        public void deleteChunksAndMarkInstanceAsChunksPurged(String theInstanceId) {
558                ourLog.info("Deleting all chunks for instance ID: {}", theInstanceId);
559                int updateCount = myJobInstanceRepository.updateWorkChunksPurgedTrue(theInstanceId);
560                int deleteCount = myWorkChunkRepository.deleteAllForInstance(theInstanceId);
561                ourLog.debug("Purged {} chunks, and updated {} instance.", deleteCount, updateCount);
562        }
563
564        @Override
565        public boolean markInstanceAsStatusWhenStatusIn(
566                        String theInstanceId, StatusEnum theStatusEnum, Set<StatusEnum> thePriorStates) {
567                int recordsChanged =
568                                myJobInstanceRepository.updateInstanceStatusIfIn(theInstanceId, theStatusEnum, thePriorStates);
569                ourLog.debug(
570                                "Update job {} to status {} if in status {}: {}",
571                                theInstanceId,
572                                theStatusEnum,
573                                thePriorStates,
574                                recordsChanged > 0);
575                return recordsChanged > 0;
576        }
577
578        @Override
579        @Transactional(propagation = Propagation.REQUIRES_NEW)
580        public JobOperationResultJson cancelInstance(String theInstanceId) {
581                int recordsChanged = myJobInstanceRepository.updateInstanceCancelled(theInstanceId, true);
582                String operationString = "Cancel job instance " + theInstanceId;
583
584                // wipmb For 6.8 - This is too detailed to be down here - this should be up at the api layer.
585                // Replace with boolean result or ResourceNotFound exception.  Build the message up at the ui.
586                String messagePrefix = "Job instance <" + theInstanceId + ">";
587                if (recordsChanged > 0) {
588                        return JobOperationResultJson.newSuccess(operationString, messagePrefix + " successfully cancelled.");
589                } else {
590                        Optional<JobInstance> instance = fetchInstance(theInstanceId);
591                        if (instance.isPresent()) {
592                                return JobOperationResultJson.newFailure(
593                                                operationString, messagePrefix + " was already cancelled.  Nothing to do.");
594                        } else {
595                                return JobOperationResultJson.newFailure(operationString, messagePrefix + " not found.");
596                        }
597                }
598        }
599
600        private void invokePreStorageBatchHooks(JobInstance theJobInstance) {
601                if (myInterceptorBroadcaster.hasHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE)) {
602                        HookParams params = new HookParams()
603                                        .add(JobInstance.class, theJobInstance)
604                                        .add(RequestDetails.class, new SystemRequestDetails());
605
606                        myInterceptorBroadcaster.callHooks(Pointcut.STORAGE_PRESTORAGE_BATCH_JOB_CREATE, params);
607                }
608        }
609
610        @Override
611        @Transactional(propagation = Propagation.REQUIRES_NEW)
612        public boolean advanceJobStepAndUpdateChunkStatus(
613                        String theJobInstanceId, String theNextStepId, boolean theIsReductionStep) {
614                boolean changed = updateInstance(theJobInstanceId, instance -> {
615                        if (instance.getCurrentGatedStepId().equals(theNextStepId)) {
616                                // someone else beat us here.  No changes
617                                return false;
618                        }
619                        ourLog.debug("Moving gated instance {} to the next step {}.", theJobInstanceId, theNextStepId);
620                        instance.setCurrentGatedStepId(theNextStepId);
621                        return true;
622                });
623
624                if (changed) {
625                        ourLog.debug(
626                                        "Updating chunk status from GATE_WAITING to READY for gated instance {} in step {}.",
627                                        theJobInstanceId,
628                                        theNextStepId);
629                        WorkChunkStatusEnum nextStep =
630                                        theIsReductionStep ? WorkChunkStatusEnum.REDUCTION_READY : WorkChunkStatusEnum.READY;
631                        // when we reach here, the current step id is equal to theNextStepId
632                        // Up to 7.1, gated jobs' work chunks are created in status QUEUED but not actually queued for the
633                        // workers.
634                        // In order to keep them compatible, turn QUEUED chunks into READY, too.
635                        // TODO: 'QUEUED' from the IN clause will be removed after 7.6.0.
636                        int numChanged = myWorkChunkRepository.updateAllChunksForStepWithStatus(
637                                        theJobInstanceId,
638                                        theNextStepId,
639                                        List.of(WorkChunkStatusEnum.GATE_WAITING, WorkChunkStatusEnum.QUEUED),
640                                        nextStep);
641                        ourLog.debug(
642                                        "Updated {} chunks of gated instance {} for step {} from fake QUEUED to READY.",
643                                        numChanged,
644                                        theJobInstanceId,
645                                        theNextStepId);
646                }
647
648                return changed;
649        }
650}