001/*-
002 * #%L
003 * HAPI FHIR Subscription Server
004 * %%
005 * Copyright (C) 2014 - 2024 Smile CDR, Inc.
006 * %%
007 * Licensed under the Apache License, Version 2.0 (the "License");
008 * you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *      http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 * #L%
019 */
020package ca.uhn.fhir.jpa.subscription.submit.interceptor;
021
022import ca.uhn.fhir.jpa.subscription.async.AsyncResourceModifiedProcessingSchedulerSvc;
023import ca.uhn.fhir.jpa.subscription.channel.api.PayloadTooLargeException;
024import ca.uhn.fhir.jpa.subscription.match.matcher.matching.IResourceModifiedConsumer;
025import ca.uhn.fhir.jpa.subscription.model.ResourceModifiedMessage;
026import org.springframework.beans.factory.annotation.Autowired;
027import org.springframework.messaging.MessageDeliveryException;
028import org.springframework.transaction.support.TransactionSynchronizationAdapter;
029import org.springframework.transaction.support.TransactionSynchronizationManager;
030
031/**
032 * The purpose of this interceptor is to synchronously submit ResourceModifiedMessage to the
033 * subscription processing pipeline, ie, as part of processing the operation on a resource.
034 * It is meant to replace the SubscriptionMatcherInterceptor in integrated tests where
035 * scheduling is disabled.  See {@link AsyncResourceModifiedProcessingSchedulerSvc}
036 * for further details on asynchronous submissions.
037 */
038public class SynchronousSubscriptionMatcherInterceptor extends SubscriptionMatcherInterceptor {
039
040        @Autowired
041        private IResourceModifiedConsumer myResourceModifiedConsumer;
042
043        @Override
044        protected void processResourceModifiedMessage(ResourceModifiedMessage theResourceModifiedMessage) {
045                if (TransactionSynchronizationManager.isSynchronizationActive()) {
046                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
047                                @Override
048                                public int getOrder() {
049                                        return 0;
050                                }
051
052                                @Override
053                                public void afterCommit() {
054                                        doSubmitResourceModified(theResourceModifiedMessage);
055                                }
056                        });
057                } else {
058                        doSubmitResourceModified(theResourceModifiedMessage);
059                }
060        }
061
062        /**
063         * Submit the message through the broker channel to the matcher.
064         *
065         * Note: most of our integrated tests for subscription assume we can successfully inflate the message and therefore
066         * does not run with an actual database to persist the data. In these cases, submitting the complete message (i.e.
067         * with payload) is OK. However, there are a few tests that do not assume it and do run with an actual DB. For them,
068         * we should null out the payload body before submitting. This try-catch block only covers the case where the
069         * payload is too large, which is enough for now. However, for better practice we might want to consider splitting
070         * this interceptor into two, each for tests with/without DB connection.
071         * @param theResourceModifiedMessage
072         */
073        private void doSubmitResourceModified(ResourceModifiedMessage theResourceModifiedMessage) {
074                try {
075                        myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
076                } catch (MessageDeliveryException e) {
077                        if (e.getCause() instanceof PayloadTooLargeException) {
078                                theResourceModifiedMessage.setPayloadToNull();
079                                myResourceModifiedConsumer.submitResourceModified(theResourceModifiedMessage);
080                        }
081                }
082        }
083}