001/*-
002 * #%L
003 * HAPI FHIR JPA Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.dao.data;
021
022import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
023import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
024import org.springframework.data.domain.Pageable;
025import org.springframework.data.jpa.repository.JpaRepository;
026import org.springframework.data.jpa.repository.Modifying;
027import org.springframework.data.jpa.repository.Query;
028import org.springframework.data.repository.query.Param;
029
030import java.util.Collection;
031import java.util.Date;
032import java.util.List;
033import java.util.Set;
034import java.util.stream.Stream;
035
036public interface IBatch2WorkChunkRepository
037                extends JpaRepository<Batch2WorkChunkEntity, String>, IHapiFhirJpaRepository {
038
039        // NOTE we need a stable sort so paging is reliable.
040        // Warning: mySequence is not unique - it is reset for every chunk.  So we also sort by myId.
041        @Query(
042                        "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC")
043        List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
044
045        /**
046         * A projection query to avoid fetching the CLOB over the wire.
047         * Otherwise, the same as fetchChunks.
048         */
049        @Query("SELECT new Batch2WorkChunkEntity("
050                        + "e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus,"
051                        + "e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime,"
052                        + "e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed, e.myWarningMessage,"
053                        + "e.myNextPollTime, e.myPollAttempts"
054                        + ") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC, e.myId ASC")
055        List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);
056
057        @Query(
058                        "SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
059        Set<WorkChunkStatusEnum> getDistinctStatusesForStep(
060                        @Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);
061
062        @Query(
063                        "SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :targetStepId ORDER BY e.mySequence ASC")
064        Stream<Batch2WorkChunkEntity> fetchChunksForStep(
065                        @Param("instanceId") String theInstanceId, @Param("targetStepId") String theTargetStepId);
066
067        @Modifying
068        @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, "
069                        + "e.myRecordsProcessed = :rp, e.myErrorCount = e.myErrorCount + :errorRetries, e.mySerializedData = null, e.mySerializedDataVc = null, "
070                        + "e.myWarningMessage = :warningMessage WHERE e.myId = :id")
071        void updateChunkStatusAndClearDataForEndSuccess(
072                        @Param("id") String theChunkId,
073                        @Param("et") Date theEndTime,
074                        @Param("rp") int theRecordsProcessed,
075                        @Param("errorRetries") int theErrorRetries,
076                        @Param("status") WorkChunkStatusEnum theInProgress,
077                        @Param("warningMessage") String theWarningMessage);
078
079        @Modifying
080        @Query(
081                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = :nextPollTime, e.myPollAttempts = e.myPollAttempts + 1 WHERE e.myId = :id AND e.myStatus IN(:states)")
082        int updateWorkChunkNextPollTime(
083                        @Param("id") String theChunkId,
084                        @Param("status") WorkChunkStatusEnum theStatus,
085                        @Param("states") Set<WorkChunkStatusEnum> theInitialStates,
086                        @Param("nextPollTime") Date theNextPollTime);
087
088        @Modifying
089        @Query(
090                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myNextPollTime = null WHERE e.myInstanceId = :instanceId AND e.myStatus IN(:states) AND e.myNextPollTime <= :pollTime")
091        int updateWorkChunksForPollWaiting(
092                        @Param("instanceId") String theInstanceId,
093                        @Param("pollTime") Date theTime,
094                        @Param("states") Set<WorkChunkStatusEnum> theInitialStates,
095                        @Param("status") WorkChunkStatusEnum theNewStatus);
096
097        @Modifying
098        @Query(
099                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.mySerializedData = null, e.mySerializedDataVc = null, e.myErrorMessage = :em WHERE e.myId IN(:ids)")
100        void updateAllChunksForInstanceStatusClearDataAndSetError(
101                        @Param("ids") List<String> theChunkIds,
102                        @Param("et") Date theEndTime,
103                        @Param("status") WorkChunkStatusEnum theInProgress,
104                        @Param("em") String theError);
105
106        @Modifying
107        @Query(
108                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myEndTime = :et, e.myErrorMessage = :em, e.myErrorCount = e.myErrorCount + 1 WHERE e.myId = :id")
109        int updateChunkStatusAndIncrementErrorCountForEndError(
110                        @Param("id") String theChunkId,
111                        @Param("et") Date theEndTime,
112                        @Param("em") String theErrorMessage,
113                        @Param("status") WorkChunkStatusEnum theInProgress);
114
115        @Modifying
116        @Query(
117                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :status, e.myStartTime = :st WHERE e.myId = :id AND e.myStatus IN :startStatuses")
118        int updateChunkStatusForStart(
119                        @Param("id") String theChunkId,
120                        @Param("st") Date theStartedTime,
121                        @Param("status") WorkChunkStatusEnum theInProgress,
122                        @Param("startStatuses") Collection<WorkChunkStatusEnum> theStartStatuses);
123
124        @Modifying
125        @Query("UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myId = :id AND e.myStatus = :oldStatus")
126        int updateChunkStatus(
127                        @Param("id") String theChunkId,
128                        @Param("oldStatus") WorkChunkStatusEnum theOldStatus,
129                        @Param("newStatus") WorkChunkStatusEnum theNewStatus);
130
131        @Modifying
132        @Query(
133                        "UPDATE Batch2WorkChunkEntity e SET e.myStatus = :newStatus WHERE e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus IN ( :oldStatuses )")
134        int updateAllChunksForStepWithStatus(
135                        @Param("instanceId") String theInstanceId,
136                        @Param("stepId") String theStepId,
137                        @Param("oldStatuses") List<WorkChunkStatusEnum> theOldStatuses,
138                        @Param("newStatus") WorkChunkStatusEnum theNewStatus);
139
140        @Modifying
141        @Query("DELETE FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId")
142        int deleteAllForInstance(@Param("instanceId") String theInstanceId);
143
144        @Query(
145                        "SELECT e.myId from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId AND e.myStatus = :status")
146        List<String> fetchAllChunkIdsForStepWithStatus(
147                        @Param("instanceId") String theInstanceId,
148                        @Param("stepId") String theStepId,
149                        @Param("status") WorkChunkStatusEnum theStatus);
150}