001package ca.uhn.fhir.jpa.subscription;
002
003/*-
004 * #%L
005 * HAPI FHIR JPA Server
006 * %%
007 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.context.ConfigurationException;
024import ca.uhn.fhir.context.FhirContext;
025import ca.uhn.fhir.i18n.Msg;
026import ca.uhn.fhir.jpa.api.dao.DaoRegistry;
027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao;
028import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao;
029import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService;
030import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage;
031import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK;
032import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK;
033import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity;
034import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedSubmitterSvc;
035import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
036import ca.uhn.fhir.model.primitive.IdDt;
037import ca.uhn.fhir.rest.api.server.SystemRequestDetails;
038import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
039import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc;
040import com.fasterxml.jackson.core.JsonProcessingException;
041import com.fasterxml.jackson.databind.ObjectMapper;
042import org.hl7.fhir.instance.model.api.IBaseResource;
043import org.hl7.fhir.instance.model.api.IIdType;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import java.util.Date;
048import java.util.List;
049import java.util.Optional;
050
051import static ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK.with;
052
053/**
054 * This implementer provides the capability to persist subscription messages for asynchronous submission
055 * to the subscription processing pipeline with the purpose of offering a retry mechanism
056 * upon submission failure (see @link {@link AsyncResourceModifiedSubmitterSvc}).
057 */
058public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModifiedMessagePersistenceSvc {
059
060        private final FhirContext myFhirContext;
061
062        private final IResourceModifiedDao myResourceModifiedDao;
063
064        private final DaoRegistry myDaoRegistry;
065
066        private final ObjectMapper myObjectMapper;
067
068        private final HapiTransactionService myHapiTransactionService;
069
070        private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedMessagePersistenceSvcImpl.class);
071
072        public ResourceModifiedMessagePersistenceSvcImpl(
073                        FhirContext theFhirContext,
074                        IResourceModifiedDao theResourceModifiedDao,
075                        DaoRegistry theDaoRegistry,
076                        HapiTransactionService theHapiTransactionService) {
077                myFhirContext = theFhirContext;
078                myResourceModifiedDao = theResourceModifiedDao;
079                myDaoRegistry = theDaoRegistry;
080                myHapiTransactionService = theHapiTransactionService;
081                myObjectMapper = new ObjectMapper();
082        }
083
084        @Override
085        public List<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime() {
086                return myHapiTransactionService.withSystemRequest().execute(myResourceModifiedDao::findAllOrderedByCreatedTime);
087        }
088
089        @Override
090        public IPersistedResourceModifiedMessage persist(ResourceModifiedMessage theMsg) {
091                ResourceModifiedEntity resourceModifiedEntity = createEntityFrom(theMsg);
092                return myResourceModifiedDao.save(resourceModifiedEntity);
093        }
094
095        @Override
096        public ResourceModifiedMessage inflatePersistedResourceModifiedMessage(
097                        ResourceModifiedMessage theResourceModifiedMessage) {
098
099                return inflateResourceModifiedMessageFromEntity(createEntityFrom(theResourceModifiedMessage));
100        }
101
102        @Override
103        public Optional<ResourceModifiedMessage> inflatePersistedResourceModifiedMessageOrNull(
104                        ResourceModifiedMessage theResourceModifiedMessage) {
105                ResourceModifiedMessage inflatedResourceModifiedMessage = null;
106
107                try {
108                        inflatedResourceModifiedMessage = inflatePersistedResourceModifiedMessage(theResourceModifiedMessage);
109                } catch (ResourceNotFoundException e) {
110                        IdDt idDt = new IdDt(
111                                        theResourceModifiedMessage.getPayloadType(myFhirContext),
112                                        theResourceModifiedMessage.getPayloadId(),
113                                        theResourceModifiedMessage.getPayloadVersion());
114
115                        ourLog.warn("Scheduled submission will be ignored since resource {} cannot be found", idDt.getIdPart(), e);
116                } catch (Exception ex) {
117                        ourLog.error("Unknown error encountered on inflation of resources.", ex);
118                }
119
120                return Optional.ofNullable(inflatedResourceModifiedMessage);
121        }
122
123        @Override
124        public ResourceModifiedMessage createResourceModifiedMessageFromEntityWithoutInflation(
125                        IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) {
126                ResourceModifiedMessage resourceModifiedMessage = getPayloadLessMessageFromString(
127                                ((ResourceModifiedEntity) thePersistedResourceModifiedMessage).getSummaryResourceModifiedMessage());
128
129                IdDt resourceId =
130                                createIdDtFromResourceModifiedEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage);
131                resourceModifiedMessage.setPayloadId(resourceId);
132
133                return resourceModifiedMessage;
134        }
135
136        @Override
137        public long getMessagePersistedCount() {
138                return myResourceModifiedDao.count();
139        }
140
141        @Override
142        public boolean deleteByPK(IPersistedResourceModifiedMessagePK theResourceModifiedPK) {
143                int removedCount =
144                                myResourceModifiedDao.removeById((PersistedResourceModifiedMessageEntityPK) theResourceModifiedPK);
145
146                return removedCount == 1;
147        }
148
149        protected ResourceModifiedMessage inflateResourceModifiedMessageFromEntity(
150                        ResourceModifiedEntity theResourceModifiedEntity) {
151                String resourceType = theResourceModifiedEntity.getResourceType();
152                ResourceModifiedMessage retVal =
153                                getPayloadLessMessageFromString(theResourceModifiedEntity.getSummaryResourceModifiedMessage());
154                SystemRequestDetails systemRequestDetails =
155                                new SystemRequestDetails().setRequestPartitionId(retVal.getPartitionId());
156
157                IdDt resourceIdDt = createIdDtFromResourceModifiedEntity(theResourceModifiedEntity);
158                IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType);
159
160                IBaseResource iBaseResource = dao.read(resourceIdDt, systemRequestDetails, true);
161
162                retVal.setNewPayload(myFhirContext, iBaseResource);
163
164                return retVal;
165        }
166
167        ResourceModifiedEntity createEntityFrom(ResourceModifiedMessage theMsg) {
168                IIdType theMsgId = theMsg.getPayloadId(myFhirContext);
169
170                ResourceModifiedEntity resourceModifiedEntity = new ResourceModifiedEntity();
171                resourceModifiedEntity.setResourceModifiedEntityPK(with(theMsgId.getIdPart(), theMsgId.getVersionIdPart()));
172
173                String partialModifiedMessage = getPayloadLessMessageAsString(theMsg);
174                resourceModifiedEntity.setSummaryResourceModifiedMessage(partialModifiedMessage);
175                resourceModifiedEntity.setResourceType(theMsgId.getResourceType());
176                resourceModifiedEntity.setCreatedTime(new Date());
177
178                return resourceModifiedEntity;
179        }
180
181        private ResourceModifiedMessage getPayloadLessMessageFromString(String thePayloadLessMessage) {
182                try {
183                        return myObjectMapper.readValue(thePayloadLessMessage, ResourceModifiedMessage.class);
184                } catch (JsonProcessingException e) {
185                        throw new ConfigurationException(Msg.code(2334) + "Failed to json deserialize payloadless  message", e);
186                }
187        }
188
189        private String getPayloadLessMessageAsString(ResourceModifiedMessage theMsg) {
190                ResourceModifiedMessage tempMessage = new PayloadLessResourceModifiedMessage(theMsg);
191
192                try {
193                        return myObjectMapper.writeValueAsString(tempMessage);
194                } catch (JsonProcessingException e) {
195                        throw new ConfigurationException(Msg.code(2335) + "Failed to serialize empty ResourceModifiedMessage", e);
196                }
197        }
198
199        private IdDt createIdDtFromResourceModifiedEntity(ResourceModifiedEntity theResourceModifiedEntity) {
200                String resourcePid =
201                                theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid();
202                String resourceVersion =
203                                theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion();
204                String resourceType = theResourceModifiedEntity.getResourceType();
205
206                return new IdDt(resourceType, resourcePid, resourceVersion);
207        }
208
209        private static class PayloadLessResourceModifiedMessage extends ResourceModifiedMessage {
210
211                public PayloadLessResourceModifiedMessage(ResourceModifiedMessage theMsg) {
212                        this.myPayloadId = theMsg.getPayloadId();
213                        this.myPayloadVersion = theMsg.getPayloadVersion();
214                        setSubscriptionId(theMsg.getSubscriptionId());
215                        setMediaType(theMsg.getMediaType());
216                        setOperationType(theMsg.getOperationType());
217                        setPartitionId(theMsg.getPartitionId());
218                        setTransactionId(theMsg.getTransactionId());
219                        setMessageKey(theMsg.getMessageKeyOrNull());
220                        copyAdditionalPropertiesFrom(theMsg);
221                }
222        }
223}