001/*- 002 * #%L 003 * HAPI FHIR Subscription Server 004 * %% 005 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 006 * %% 007 * Licensed under the Apache License, Version 2.0 (the "License"); 008 * you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 * #L% 019 */ 020package ca.uhn.fhir.jpa.subscription.submit.interceptor; 021 022import ca.uhn.fhir.context.FhirContext; 023import ca.uhn.fhir.context.FhirVersionEnum; 024import ca.uhn.fhir.i18n.Msg; 025import ca.uhn.fhir.interceptor.api.Hook; 026import ca.uhn.fhir.interceptor.api.Interceptor; 027import ca.uhn.fhir.interceptor.api.Pointcut; 028import ca.uhn.fhir.interceptor.model.RequestPartitionId; 029import ca.uhn.fhir.jpa.api.config.JpaStorageSettings; 030import ca.uhn.fhir.jpa.api.dao.DaoRegistry; 031import ca.uhn.fhir.jpa.api.dao.IFhirResourceDao; 032import ca.uhn.fhir.jpa.model.entity.StorageSettings; 033import ca.uhn.fhir.jpa.partition.IRequestPartitionHelperSvc; 034import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 035import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionMatchingStrategy; 036import ca.uhn.fhir.jpa.subscription.match.matcher.matching.SubscriptionStrategyEvaluator; 037import ca.uhn.fhir.jpa.subscription.match.registry.SubscriptionCanonicalizer; 038import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscription; 039import ca.uhn.fhir.jpa.subscription.model.CanonicalSubscriptionChannelType; 040import ca.uhn.fhir.parser.DataFormatException; 041import ca.uhn.fhir.rest.api.EncodingEnum; 042import ca.uhn.fhir.rest.api.server.IBundleProvider; 043import ca.uhn.fhir.rest.api.server.RequestDetails; 044import ca.uhn.fhir.rest.api.server.SystemRequestDetails; 045import ca.uhn.fhir.rest.param.UriParam; 046import ca.uhn.fhir.rest.server.exceptions.InternalErrorException; 047import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException; 048import ca.uhn.fhir.rest.server.exceptions.UnprocessableEntityException; 049import ca.uhn.fhir.util.HapiExtensions; 050import ca.uhn.fhir.util.SubscriptionUtil; 051import com.google.common.annotations.VisibleForTesting; 052import org.hl7.fhir.instance.model.api.IBaseResource; 053import org.hl7.fhir.r5.model.SubscriptionTopic; 054import org.springframework.beans.factory.annotation.Autowired; 055 056import java.net.URI; 057import java.net.URISyntaxException; 058import java.util.Optional; 059 060import static org.apache.commons.lang3.StringUtils.isBlank; 061 062@Interceptor 063public class SubscriptionValidatingInterceptor { 064 065 @Autowired 066 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 067 068 @Autowired 069 private DaoRegistry myDaoRegistry; 070 071 @Autowired 072 private StorageSettings myStorageSettings; 073 074 @Autowired 075 private SubscriptionStrategyEvaluator mySubscriptionStrategyEvaluator; 076 077 private FhirContext myFhirContext; 078 079 @Autowired 080 private IRequestPartitionHelperSvc myRequestPartitionHelperSvc; 081 082 @Autowired 083 private SubscriptionQueryValidator mySubscriptionQueryValidator; 084 085 @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED) 086 public void resourcePreCreate( 087 IBaseResource theResource, RequestDetails theRequestDetails, RequestPartitionId theRequestPartitionId) { 088 validateSubmittedSubscription( 089 theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED); 090 } 091 092 @Hook(Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED) 093 public void resourceUpdated( 094 IBaseResource theOldResource, 095 IBaseResource theResource, 096 RequestDetails theRequestDetails, 097 RequestPartitionId theRequestPartitionId) { 098 validateSubmittedSubscription( 099 theResource, theRequestDetails, theRequestPartitionId, Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED); 100 } 101 102 @Autowired 103 public void setFhirContext(FhirContext theFhirContext) { 104 myFhirContext = theFhirContext; 105 } 106 107 @VisibleForTesting 108 void validateSubmittedSubscription( 109 IBaseResource theSubscription, 110 RequestDetails theRequestDetails, 111 RequestPartitionId theRequestPartitionId, 112 Pointcut thePointcut) { 113 if (Pointcut.STORAGE_PRESTORAGE_RESOURCE_CREATED != thePointcut 114 && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED != thePointcut) { 115 throw new UnprocessableEntityException(Msg.code(2267) 116 + "Expected Pointcut to be either STORAGE_PRESTORAGE_RESOURCE_CREATED or STORAGE_PRESTORAGE_RESOURCE_UPDATED but was: " 117 + thePointcut); 118 } 119 120 if (!"Subscription".equals(myFhirContext.getResourceType(theSubscription))) { 121 return; 122 } 123 124 CanonicalSubscription subscription; 125 try { 126 subscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); 127 } catch (InternalErrorException e) { 128 throw new UnprocessableEntityException(Msg.code(955) + e.getMessage()); 129 } 130 boolean finished = false; 131 if (subscription.getStatus() == null) { 132 throw new UnprocessableEntityException(Msg.code(8) 133 + "Can not process submitted Subscription - Subscription.status must be populated on this server"); 134 } 135 136 switch (subscription.getStatus()) { 137 case REQUESTED: 138 case ACTIVE: 139 break; 140 case ERROR: 141 case OFF: 142 case NULL: 143 finished = true; 144 break; 145 } 146 147 validatePermissions(theSubscription, subscription, theRequestDetails, theRequestPartitionId, thePointcut); 148 149 mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, null); 150 151 if (!finished) { 152 153 if (subscription.isTopicSubscription()) { 154 if (myFhirContext.getVersion().getVersion() 155 != FhirVersionEnum 156 .R4) { // In R4 topic subscriptions exist without a corresponidng SubscriptionTopic 157 // resource 158 Optional<IBaseResource> oTopic = findSubscriptionTopicByUrl(subscription.getTopic()); 159 if (!oTopic.isPresent()) { 160 throw new UnprocessableEntityException( 161 Msg.code(2322) + "No SubscriptionTopic exists with topic: " + subscription.getTopic()); 162 } 163 } 164 } else { 165 validateQuery(subscription.getCriteriaString(), "Subscription.criteria"); 166 167 if (subscription.getPayloadSearchCriteria() != null) { 168 validateQuery( 169 subscription.getPayloadSearchCriteria(), 170 "Subscription.extension(url='" + HapiExtensions.EXT_SUBSCRIPTION_PAYLOAD_SEARCH_CRITERIA 171 + "')"); 172 } 173 } 174 175 validateChannelType(subscription); 176 177 try { 178 SubscriptionMatchingStrategy strategy = mySubscriptionStrategyEvaluator.determineStrategy(subscription); 179 if (!(SubscriptionMatchingStrategy.IN_MEMORY == strategy) 180 && myStorageSettings.isOnlyAllowInMemorySubscriptions()) { 181 throw new InvalidRequestException( 182 Msg.code(2367) 183 + "This server is configured to only allow in-memory subscriptions. This subscription's criteria cannot be evaluated in-memory."); 184 } 185 mySubscriptionCanonicalizer.setMatchingStrategyTag(theSubscription, strategy); 186 } catch (InvalidRequestException | DataFormatException e) { 187 throw new UnprocessableEntityException(Msg.code(9) + "Invalid subscription criteria submitted: " 188 + subscription.getCriteriaString() + " " + e.getMessage()); 189 } 190 191 if (subscription.getChannelType() == null) { 192 throw new UnprocessableEntityException( 193 Msg.code(10) + "Subscription.channel.type must be populated on this server"); 194 } else if (subscription.getChannelType() == CanonicalSubscriptionChannelType.MESSAGE) { 195 validateMessageSubscriptionEndpoint(subscription.getEndpointUrl()); 196 } 197 } 198 } 199 200 protected void validatePermissions( 201 IBaseResource theSubscription, 202 CanonicalSubscription theCanonicalSubscription, 203 RequestDetails theRequestDetails, 204 RequestPartitionId theRequestPartitionId, 205 Pointcut thePointcut) { 206 // If the subscription has the cross partition tag 207 if (SubscriptionUtil.isCrossPartition(theSubscription) 208 && !(theRequestDetails instanceof SystemRequestDetails)) { 209 if (!myStorageSettings.isCrossPartitionSubscriptionEnabled()) { 210 throw new UnprocessableEntityException( 211 Msg.code(2009) + "Cross partition subscription is not enabled on this server"); 212 } 213 214 if (theRequestPartitionId == null && Pointcut.STORAGE_PRESTORAGE_RESOURCE_UPDATED == thePointcut) { 215 return; 216 } 217 218 // if we have a partition id already, we'll use that 219 // otherwise we might end up with READ and CREATE pointcuts 220 // returning conflicting partitions (say, all vs default) 221 RequestPartitionId toCheckPartitionId = theRequestPartitionId != null 222 ? theRequestPartitionId 223 : determinePartition(theRequestDetails, theSubscription); 224 225 if (!toCheckPartitionId.isDefaultPartition()) { 226 throw new UnprocessableEntityException( 227 Msg.code(2010) + "Cross partition subscription must be created on the default partition"); 228 } 229 } 230 } 231 232 private RequestPartitionId determinePartition(RequestDetails theRequestDetails, IBaseResource theResource) { 233 switch (theRequestDetails.getRestOperationType()) { 234 case CREATE: 235 return myRequestPartitionHelperSvc.determineCreatePartitionForRequest( 236 theRequestDetails, theResource, "Subscription"); 237 case UPDATE: 238 return myRequestPartitionHelperSvc.determineReadPartitionForRequestForRead( 239 theRequestDetails, "Subscription", theResource.getIdElement()); 240 default: 241 return null; 242 } 243 } 244 245 public void validateQuery(String theQuery, String theFieldName) { 246 mySubscriptionQueryValidator.validateCriteria(theQuery, theFieldName); 247 } 248 249 private Optional<IBaseResource> findSubscriptionTopicByUrl(String theCriteria) { 250 myDaoRegistry.getResourceDao("SubscriptionTopic"); 251 SearchParameterMap map = SearchParameterMap.newSynchronous(); 252 map.add(SubscriptionTopic.SP_URL, new UriParam(theCriteria)); 253 IFhirResourceDao subscriptionTopicDao = myDaoRegistry.getResourceDao("SubscriptionTopic"); 254 IBundleProvider search = subscriptionTopicDao.search(map, new SystemRequestDetails()); 255 return search.getResources(0, 1).stream().findFirst(); 256 } 257 258 public void validateMessageSubscriptionEndpoint(String theEndpointUrl) { 259 if (theEndpointUrl == null) { 260 throw new UnprocessableEntityException(Msg.code(16) + "No endpoint defined for message subscription"); 261 } 262 263 try { 264 URI uri = new URI(theEndpointUrl); 265 266 if (!"channel".equals(uri.getScheme())) { 267 throw new UnprocessableEntityException(Msg.code(17) 268 + "Only 'channel' protocol is supported for Subscriptions with channel type 'message'"); 269 } 270 String channelName = uri.getSchemeSpecificPart(); 271 if (isBlank(channelName)) { 272 throw new UnprocessableEntityException( 273 Msg.code(18) + "A channel name must appear after channel: in a message Subscription endpoint"); 274 } 275 } catch (URISyntaxException e) { 276 throw new UnprocessableEntityException( 277 Msg.code(19) + "Invalid subscription endpoint uri " + theEndpointUrl, e); 278 } 279 } 280 281 @SuppressWarnings("WeakerAccess") 282 protected void validateChannelType(CanonicalSubscription theSubscription) { 283 if (theSubscription.getChannelType() == null) { 284 throw new UnprocessableEntityException(Msg.code(20) + "Subscription.channel.type must be populated"); 285 } else if (theSubscription.getChannelType() == CanonicalSubscriptionChannelType.RESTHOOK) { 286 validateChannelPayload(theSubscription); 287 validateChannelEndpoint(theSubscription); 288 } 289 } 290 291 @SuppressWarnings("WeakerAccess") 292 protected void validateChannelEndpoint(CanonicalSubscription theResource) { 293 if (isBlank(theResource.getEndpointUrl())) { 294 throw new UnprocessableEntityException( 295 Msg.code(21) + "Rest-hook subscriptions must have Subscription.channel.endpoint defined"); 296 } 297 } 298 299 @SuppressWarnings("WeakerAccess") 300 protected void validateChannelPayload(CanonicalSubscription theResource) { 301 if (!isBlank(theResource.getPayloadString()) 302 && EncodingEnum.forContentType(theResource.getPayloadString()) == null) { 303 throw new UnprocessableEntityException(Msg.code(1985) + "Invalid value for Subscription.channel.payload: " 304 + theResource.getPayloadString()); 305 } 306 } 307 308 @SuppressWarnings("WeakerAccess") 309 @VisibleForTesting 310 public void setSubscriptionCanonicalizerForUnitTest(SubscriptionCanonicalizer theSubscriptionCanonicalizer) { 311 mySubscriptionCanonicalizer = theSubscriptionCanonicalizer; 312 } 313 314 @SuppressWarnings("WeakerAccess") 315 @VisibleForTesting 316 public void setDaoRegistryForUnitTest(DaoRegistry theDaoRegistry) { 317 myDaoRegistry = theDaoRegistry; 318 } 319 320 @VisibleForTesting 321 public void setStorageSettingsForUnitTest(JpaStorageSettings theStorageSettings) { 322 myStorageSettings = theStorageSettings; 323 } 324 325 @VisibleForTesting 326 public void setRequestPartitionHelperSvcForUnitTest(IRequestPartitionHelperSvc theRequestPartitionHelperSvc) { 327 myRequestPartitionHelperSvc = theRequestPartitionHelperSvc; 328 } 329 330 @VisibleForTesting 331 @SuppressWarnings("WeakerAccess") 332 public void setSubscriptionStrategyEvaluatorForUnitTest( 333 SubscriptionStrategyEvaluator theSubscriptionStrategyEvaluator) { 334 mySubscriptionStrategyEvaluator = theSubscriptionStrategyEvaluator; 335 mySubscriptionQueryValidator = new SubscriptionQueryValidator(myDaoRegistry, theSubscriptionStrategyEvaluator); 336 } 337}