001package ca.uhn.fhir.jpa.subscription.async; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2024 Smile CDR, Inc. 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.jpa.model.entity.IPersistedResourceModifiedMessage; 024import ca.uhn.fhir.subscription.api.IResourceModifiedConsumerWithRetries; 025import ca.uhn.fhir.subscription.api.IResourceModifiedMessagePersistenceSvc; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import java.util.List; 030 031/** 032 * The purpose of this service is to submit messages to the processing pipeline for which previous attempts at 033 * submission has failed. See also {@link AsyncResourceModifiedProcessingSchedulerSvc} and {@link IResourceModifiedMessagePersistenceSvc}. 034 * 035 */ 036public class AsyncResourceModifiedSubmitterSvc { 037 private static final Logger ourLog = LoggerFactory.getLogger(AsyncResourceModifiedSubmitterSvc.class); 038 039 private final IResourceModifiedMessagePersistenceSvc myResourceModifiedMessagePersistenceSvc; 040 private final IResourceModifiedConsumerWithRetries myResourceModifiedConsumer; 041 042 public AsyncResourceModifiedSubmitterSvc( 043 IResourceModifiedMessagePersistenceSvc theResourceModifiedMessagePersistenceSvc, 044 IResourceModifiedConsumerWithRetries theResourceModifiedConsumer) { 045 myResourceModifiedMessagePersistenceSvc = theResourceModifiedMessagePersistenceSvc; 046 myResourceModifiedConsumer = theResourceModifiedConsumer; 047 } 048 049 public void runDeliveryPass() { 050 051 List<IPersistedResourceModifiedMessage> allPersistedResourceModifiedMessages = 052 myResourceModifiedMessagePersistenceSvc.findAllOrderedByCreatedTime(); 053 ourLog.debug( 054 "Attempting to submit {} resources to consumer channel.", allPersistedResourceModifiedMessages.size()); 055 056 for (IPersistedResourceModifiedMessage persistedResourceModifiedMessage : 057 allPersistedResourceModifiedMessages) { 058 059 boolean wasProcessed = 060 myResourceModifiedConsumer.submitPersisedResourceModifiedMessage(persistedResourceModifiedMessage); 061 062 if (!wasProcessed) { 063 break; 064 } 065 } 066 } 067}