45.2.1FHIR Storage Processors

 

FHIR Storage processors perform work using a FHIR Storage module.

45.2.2Resource Operation Processor

 
  • Example URI: smile:persistence/resourceOperationProcessor
  • Description: Performs the operation represented by the input message. E.g. If the Resource Operation was a Patient CREATE, then it would create that patient in the specified Storage Module.
  • Input: ResourceOperationJsonMessage
  • Output: N/A

45.2.2.1Example Route

Resource Operations messages posted to the resource-operation-message-in-topic will get processed by the module with id persistence.

<routes xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="kafka:resource-operation-message-in-topic?brokers=localhost:9092"/>
        <to uri="smile:persistence/resourceOperationProcessor"/>
    </route>
</routes>

45.2.3Transaction/Batch Bundle Processor

 

This processor accepts a FHIR Transaction or a FHIR Batch Bundle as input and submits it to a FHIR Storage module for processing. FHIR Transactions are commonly used as a mechanism for submitting and loading data into a FHIR repository. They are generally the most efficient way of accomplishing this task, especially at large scale.

  • Example URI: smile:persistence/bundleProcessor
  • Input: IBaseBundle — A FHIR Bundle resource containing a FHIR Transaction or Batch request.
  • Output: An IBaseBundle containing the transaction/batch response will be added to the exchange in the variable named response. This bundle contains details about the outcomes of the individual items in the transaction/batch.

45.2.3.1Processor Parameters

The following parameters are available to this processor:

  • partitionId – (optional) Specifies a hard-coded partition ID to use when processing the Bundle. Value should be an integer specifying the ID of the partition to use.
  • retries – (optional) Specifies a number of retries to attempt if the processing fails. A value of retries=2 means that if the initial attempt to process the transaction fails, it will be retried two more times before giving up and throwing an exception. This parameter can be useful when data being submitted is likely to have collisions, meaning concurrent processing is likely to be modifying the same resources. Value should be a positive integer, or 0 (which is the default). When this parameter is used, the following parameters may also be added:
    • retryDelayMin – (optional) Specifies the minimum amount of time to sleep before attempting a retry. Value should be an integer and is specified in milliseconds.
    • retryDelayMax – (optional) Specifies the maximum amount of time to sleep before attempting a retry. If this number is greater than the retryDelayMin value, a random amount of time will be chosen for each retry. This can be helpful when processing data with high numbers of collisions. Value should be an integer and is specified in milliseconds.
    • useBatchOnFinalAttempt – (optional) If the value useBatchOnFinalAttempt=true is specified, a submitted FHIR Transaction bundle will be converted into a FHIR Batch bundle prior to the final retry attempt. This can be useful if you are loading transaction bundles containing unrelated resources (i.e. resources which to not have references to each other within the Bundle) and want to ensure that the failure to process one resource does not prevent the loading of another. Value should be true or false (which is the default).
  • logProgressInterval – (optional) If specified as a positive number, a log entry will be logged to the system log containing information about recent transaction/batch processing.

45.2.3.2Example Route: Process from Kafka

Bundles posted to the bundle-in-topic will get processed by the module with id persistence. A log entry will be emitted to the system log after every 100 bundles has been processed.

Authentication parameters for the Kafka consumer (<from...>) are not included here and may be required, but several important parameters are included:

groupId=my-group-id The Consumer Group ID must be specified in order to ensure that topic offsets are preserved between restarts of the system. The exact value can be any string, but it must never change or be reused by other applications.
maxPollRecords=1 This setting indicates to Kafka that messages should be consumed one-by-one. This is important because by default Kafka uses large batches and expects that they will be processed very quickly, which is more appropriate for large numbers of very small payloads which require minimal processing each. This is not typically the case in a FHIR server, so a small poll size is chosen.
allowManualCommit=true
autoCommitEnable=false
These are required in order to use the kafkaManualCommit processor at the end of the flow. This processor ensures that the topic offset (the current position within the queue) is not advanced until the message has been successfully processed.
autoOffsetReset=earliest This setting instructs the Kafka consumer to begin at the very beginning of the topic, consuming messages that were produced (added to the topic) before Smile CDR was started.

45.2.3.3Example Route: Guaranteed Delivery

This route receives messages from a Kafka topic, where each message on the topic should contain a FHIR Transaction Bundle or a FHIR Batch Bundle. The messages are routed for transaction/batch processing on the FHIR Storage module names persistence.

This route uses a Kafka manual commit only after the message has been processed successfully, and does not have any error handler. This means that any failing messages will be retried indefinitely until processing the message succeeds.

<route>
	<from uri="kafka:bundle-in-topic?brokers=localhost:9092&amp;groupId=my-group-id&amp;maxPollRecords=1&amp;allowManualCommit=true&amp;autoCommitEnable=false&amp;autoOffsetReset=earliest"/>
	<to uri="smile:persistence/bundleProcessor&amp;logProgressInterval=100"/>
	<to uri="smile:clustermgr/kafkaManualCommit" />
</route>

45.2.3.4Example Route: Retry with Dead Letter Queue

The following example shows a processor which retries twice, and delivers the payload to a failure topic (ie. a Dead Letter Queue) if the final processing attempt fails. Messages on the DLQ topic, named dlq-topic, will contain the failing FHIR transaction.

Note that the Kafka manual commit is only invoked after a successful processed message has completed. This means that if a long series of failing messages is encountered and no messages are successfully processed before the Smile CDR process is terminated, the consumer offset will not be incremented and these failing messages may be processed again by a Smile CDR process which is started later.

<route>
	<errorHandler>
		<deadLetterChannel deadLetterUri="kafka:dlq-topic?brokers=localhost:9092">
			<redeliveryPolicy maximumRedeliveries="3" redeliveryDelay="250"/>
		</deadLetterChannel>
	</errorHandler>
	<from uri="kafka:delivery-topic-with-dlq?brokers=localhost:9092&amp;groupId=my-group-id&amp;maxPollRecords=1&amp;allowManualCommit=true&amp;autoCommitEnable=false&amp;autoOffsetReset=earliest"/>
	<to uri="smile:persistence/bundleProcessor?logProgressInterval=5"/>
	<to uri="smile:clustermgr/kafkaManualCommit" />
</route>

45.2.3.5Example Route: Retry with Batch Processing Attempt

The following example shows a processor which retries twice, and then attempts the final processing using a FHIR Batch instead of a FHIR Transaction. The batch processing mode is slower than the transaction processing mode, but means that the system will process as many entries as it can. If any entries fail on the final attempt, then a new FHIR Batch Bundle containing the failing entries will be sent to the Kafka dlq-failed-entry-topic topic.

This kind of route can be used in cases where your FHIR transaction bundles contain a large number of unrelated resources, and it is therefore desirable to process as many of them as possible even if one or more of them is unprocessable. This route can result in these transaction bundles being partially processed, with the remaining partial bundle ending up in the Dead Letter Queue.

<route>
	<from uri="kafka:guaranteed-delivery-topic?brokers=localhost:9092&amp;groupId=my-group-id&amp;maxPollRecords=1&amp;allowManualCommit=true&amp;autoCommitEnable=false&amp;autoOffsetReset=earliest"/>
	<to uri="smile:persistence/bundleProcessor?logProgressInterval=5&amp;retries=3&amp;retryDelayMin=50&amp;retryDelayMax=100&amp;useBatchOnFinalAttempt=true"/>
	<when>
		<variable>failedEntryBundle</variable>
		<to uri="kafka:dlq-failed-entry-topic?brokers=localhost:9092" variableSend="failedEntryBundle"/>
	</when>
	<to uri="smile:clustermgr/kafkaManualCommit" />
</route>

45.2.4FHIR NDJSON Converter

 
  • Example URI: smile:persistence/ndjsonToBundleProcessor
  • Description: This processor converts NDJSON strings into IBaseBundles. The optional parameter type can be used to specify the kind of bundle to create: transaction (default, if none specified), or batch. This bundle can then be passed to a bundleProcessor processor to persist the bundle to the repository. Additionally, if the optional parameter ensureHomogeneousResourceTypes is defined and set to true, NDJSON parsing will fail if resources of different types are encountered in the same NDJSON file.

45.2.4.1Example Routes

Explicitly defining the output bundle as a transaction bundle.

<routes xmlns="http://camel.apache.org/schema/spring">
	<route id="my-route">
		<from uri="kafka:bundle-in-topic?brokers=localhost:9092"/>
		<to uri="smile:persistence/ndjsonToBundleProcessor?type=transaction"/>
		<!-- to persist the result of the transformation -->
		<to uri="smile:persistence/bundleProcessor"/>
	</route>
</routes>

Failing processing if NDJSON source has multiple resource types when only one is expected.

<routes xmlns="http://camel.apache.org/schema/spring">
	<route id="my-route">
		<from uri="kafka:bundle-in-topic?brokers=localhost:9092"/>
		<to uri="smile:persistence/ndjsonToBundleProcessor?ensureHomogeneousResourceTypes=true"/>
		<!-- to persist the result of the transformation -->
		<to uri="smile:persistence/bundleProcessor"/>
	</route>
</routes>