001package ca.uhn.fhir.jpa.subscription; 002 003/*- 004 * #%L 005 * HAPI FHIR JPA Server 006 * %% 007 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.context.ConfigurationException; 024import ca.uhn.fhir.context.FhirContext; 025import ca.uhn.fhir.i18n.Msg; 026import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 027import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 028import ca.uhn.fhir.jpa.dao.data.IResourceModifiedDao; 029import ca.uhn.fhir.jpa.dao.tx.HapiTransactionService; 030import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; 031import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessagePK; 032import ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK; 033import ca.uhn.fhir.jpa.model.entity.ResourceModifiedEntity; 034import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedSubmitterSvc; 035import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage; 036import ca.uhn.fhir.model.primitive.IdDt; 037import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 038import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException; 039import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 040import com.fasterxml.jackson.core.JsonProcessingException; 041import com.fasterxml.jackson.databind.ObjectMapper; 042import org.hl7.fhir.instance.model.api.IBaseResource; 043import org.hl7.fhir.instance.model.api.IIdType; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import java.util.Date; 048import java.util.List; 049import java.util.Optional; 050 051import static ca.uhn.fhir.jpa.model.entity.PersistedResourceModifiedMessageEntityPK.with; 052 053/** 054 * This implementer provides the capability to persist subscription messages for asynchronous submission 055 * to the subscription processing pipeline with the purpose of offering a retry mechanism 056 * upon submission failure (see @link {@link AsyncResourceModifiedSubmitterSvc}). 057 */ 058public class ResourceModifiedMessagePersistenceSvcImpl implements IResourceModifiedMessagePersistenceSvc { 059 060 private final FhirContext myFhirContext; 061 062 private final IResourceModifiedDao myResourceModifiedDao; 063 064 private final DaoRegistry myDaoRegistry; 065 066 private final ObjectMapper myObjectMapper; 067 068 private final HapiTransactionService myHapiTransactionService; 069 070 private static final Logger ourLog = LoggerFactory.getLogger(ResourceModifiedMessagePersistenceSvcImpl.class); 071 072 public ResourceModifiedMessagePersistenceSvcImpl( 073 FhirContext theFhirContext, 074 IResourceModifiedDao theResourceModifiedDao, 075 DaoRegistry theDaoRegistry, 076 HapiTransactionService theHapiTransactionService) { 077 myFhirContext = theFhirContext; 078 myResourceModifiedDao = theResourceModifiedDao; 079 myDaoRegistry = theDaoRegistry; 080 myHapiTransactionService = theHapiTransactionService; 081 myObjectMapper = new ObjectMapper(); 082 } 083 084 @Override 085 public List<IPersistedResourceModifiedMessage> findAllOrderedByCreatedTime() { 086 return myHapiTransactionService.withSystemRequest().execute(myResourceModifiedDao::findAllOrderedByCreatedTime); 087 } 088 089 @Override 090 public IPersistedResourceModifiedMessage persist(ResourceModifiedMessage theMsg) { 091 ResourceModifiedEntity resourceModifiedEntity = createEntityFrom(theMsg); 092 return myResourceModifiedDao.save(resourceModifiedEntity); 093 } 094 095 @Override 096 public ResourceModifiedMessage inflatePersistedResourceModifiedMessage( 097 ResourceModifiedMessage theResourceModifiedMessage) { 098 099 return inflateResourceModifiedMessageFromEntity(createEntityFrom(theResourceModifiedMessage)); 100 } 101 102 @Override 103 public Optional<ResourceModifiedMessage> inflatePersistedResourceModifiedMessageOrNull( 104 ResourceModifiedMessage theResourceModifiedMessage) { 105 ResourceModifiedMessage inflatedResourceModifiedMessage = null; 106 107 try { 108 inflatedResourceModifiedMessage = inflatePersistedResourceModifiedMessage(theResourceModifiedMessage); 109 } catch (ResourceNotFoundException e) { 110 IdDt idDt = new IdDt( 111 theResourceModifiedMessage.getPayloadType(myFhirContext), 112 theResourceModifiedMessage.getPayloadId(), 113 theResourceModifiedMessage.getPayloadVersion()); 114 115 ourLog.warn("Scheduled submission will be ignored since resource {} cannot be found", idDt.getIdPart(), e); 116 } catch (Exception ex) { 117 ourLog.error("Unknown error encountered on inflation of resources.", ex); 118 } 119 120 return Optional.ofNullable(inflatedResourceModifiedMessage); 121 } 122 123 @Override 124 public ResourceModifiedMessage createResourceModifiedMessageFromEntityWithoutInflation( 125 IPersistedResourceModifiedMessage thePersistedResourceModifiedMessage) { 126 ResourceModifiedMessage resourceModifiedMessage = getPayloadLessMessageFromString( 127 ((ResourceModifiedEntity) thePersistedResourceModifiedMessage).getSummaryResourceModifiedMessage()); 128 129 IdDt resourceId = 130 createIdDtFromResourceModifiedEntity((ResourceModifiedEntity) thePersistedResourceModifiedMessage); 131 resourceModifiedMessage.setPayloadId(resourceId); 132 133 return resourceModifiedMessage; 134 } 135 136 @Override 137 public long getMessagePersistedCount() { 138 return myResourceModifiedDao.count(); 139 } 140 141 @Override 142 public boolean deleteByPK(IPersistedResourceModifiedMessagePK theResourceModifiedPK) { 143 int removedCount = 144 myResourceModifiedDao.removeById((PersistedResourceModifiedMessageEntityPK) theResourceModifiedPK); 145 146 return removedCount == 1; 147 } 148 149 protected ResourceModifiedMessage inflateResourceModifiedMessageFromEntity( 150 ResourceModifiedEntity theResourceModifiedEntity) { 151 String resourceType = theResourceModifiedEntity.getResourceType(); 152 ResourceModifiedMessage retVal = 153 getPayloadLessMessageFromString(theResourceModifiedEntity.getSummaryResourceModifiedMessage()); 154 SystemRequestDetails systemRequestDetails = 155 new SystemRequestDetails().setRequestPartitionId(retVal.getPartitionId()); 156 157 IdDt resourceIdDt = createIdDtFromResourceModifiedEntity(theResourceModifiedEntity); 158 IFhirResourceDao dao = myDaoRegistry.getResourceDao(resourceType); 159 160 IBaseResource iBaseResource = dao.read(resourceIdDt, systemRequestDetails, true); 161 162 retVal.setNewPayload(myFhirContext, iBaseResource); 163 164 return retVal; 165 } 166 167 ResourceModifiedEntity createEntityFrom(ResourceModifiedMessage theMsg) { 168 IIdType theMsgId = theMsg.getPayloadId(myFhirContext); 169 170 ResourceModifiedEntity resourceModifiedEntity = new ResourceModifiedEntity(); 171 resourceModifiedEntity.setResourceModifiedEntityPK(with(theMsgId.getIdPart(), theMsgId.getVersionIdPart())); 172 173 String partialModifiedMessage = getPayloadLessMessageAsString(theMsg); 174 resourceModifiedEntity.setSummaryResourceModifiedMessage(partialModifiedMessage); 175 resourceModifiedEntity.setResourceType(theMsgId.getResourceType()); 176 resourceModifiedEntity.setCreatedTime(new Date()); 177 178 return resourceModifiedEntity; 179 } 180 181 private ResourceModifiedMessage getPayloadLessMessageFromString(String thePayloadLessMessage) { 182 try { 183 return myObjectMapper.readValue(thePayloadLessMessage, ResourceModifiedMessage.class); 184 } catch (JsonProcessingException e) { 185 throw new ConfigurationException(Msg.code(2334) + "Failed to json deserialize payloadless message", e); 186 } 187 } 188 189 private String getPayloadLessMessageAsString(ResourceModifiedMessage theMsg) { 190 ResourceModifiedMessage tempMessage = new PayloadLessResourceModifiedMessage(theMsg); 191 192 try { 193 return myObjectMapper.writeValueAsString(tempMessage); 194 } catch (JsonProcessingException e) { 195 throw new ConfigurationException(Msg.code(2335) + "Failed to serialize empty ResourceModifiedMessage", e); 196 } 197 } 198 199 private IdDt createIdDtFromResourceModifiedEntity(ResourceModifiedEntity theResourceModifiedEntity) { 200 String resourcePid = 201 theResourceModifiedEntity.getResourceModifiedEntityPK().getResourcePid(); 202 String resourceVersion = 203 theResourceModifiedEntity.getResourceModifiedEntityPK().getResourceVersion(); 204 String resourceType = theResourceModifiedEntity.getResourceType(); 205 206 return new IdDt(resourceType, resourcePid, resourceVersion); 207 } 208 209 private static class PayloadLessResourceModifiedMessage extends ResourceModifiedMessage { 210 211 public PayloadLessResourceModifiedMessage(ResourceModifiedMessage theMsg) { 212 this.myPayloadId = theMsg.getPayloadId(); 213 this.myPayloadVersion = theMsg.getPayloadVersion(); 214 setSubscriptionId(theMsg.getSubscriptionId()); 215 setMediaType(theMsg.getMediaType()); 216 setOperationType(theMsg.getOperationType()); 217 setPartitionId(theMsg.getPartitionId()); 218 setTransactionId(theMsg.getTransactionId()); 219 setMessageKey(theMsg.getMessageKeyOrNull()); 220 copyAdditionalPropertiesFrom(theMsg); 221 } 222 } 223}