FHIR Storage processors perform work using a FHIR Storage module.
smile:persistence/resourceOperationProcessor
Resource Operations messages posted to the resource-operation-message-in-topic will get processed by the module with id persistence
.
<routes xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="kafka:resource-operation-message-in-topic?brokers=localhost:9092"/>
<to uri="smile:persistence/resourceOperationProcessor"/>
</route>
</routes>
This processor accepts a FHIR Transaction or a FHIR Batch Bundle as input and submits it to a FHIR Storage module for processing. FHIR Transactions are commonly used as a mechanism for submitting and loading data into a FHIR repository. They are generally the most efficient way of accomplishing this task, especially at large scale.
smile:persistence/bundleProcessor
response
. This bundle contains details about the outcomes of the individual items in the transaction/batch.The following parameters are available to this processor:
retries=2
means that if the initial attempt to process the transaction fails, it will be retried two more times before giving up and throwing an exception. This parameter can be useful when data being submitted is likely to have collisions, meaning concurrent processing is likely to be modifying the same resources. Value should be a positive integer, or 0 (which is the default). When this parameter is used, the following parameters may also be added:
useBatchOnFinalAttempt=true
is specified, a submitted FHIR Transaction bundle will be converted into a FHIR Batch bundle prior to the final retry attempt. This can be useful if you are loading transaction bundles containing unrelated resources (i.e. resources which to not have references to each other within the Bundle) and want to ensure that the failure to process one resource does not prevent the loading of another. Value should be true
or false
(which is the default).Bundles posted to the bundle-in-topic will get processed by the module with id persistence
. A log entry will be emitted to the system log after every 100 bundles has been processed.
Authentication parameters for the Kafka consumer (<from...>
) are not included here and may be required, but several important parameters are included:
groupId=my-group-id | The Consumer Group ID must be specified in order to ensure that topic offsets are preserved between restarts of the system. The exact value can be any string, but it must never change or be reused by other applications. |
maxPollRecords=1 | This setting indicates to Kafka that messages should be consumed one-by-one. This is important because by default Kafka uses large batches and expects that they will be processed very quickly, which is more appropriate for large numbers of very small payloads which require minimal processing each. This is not typically the case in a FHIR server, so a small poll size is chosen. |
allowManualCommit=true autoCommitEnable=false |
These are required in order to use the kafkaManualCommit processor at the end of the flow. This processor ensures that the topic offset (the current position within the queue) is not advanced until the message has been successfully processed. |
autoOffsetReset=earliest | This setting instructs the Kafka consumer to begin at the very beginning of the topic, consuming messages that were produced (added to the topic) before Smile CDR was started. |
This route receives messages from a Kafka topic, where each message on the topic should contain a FHIR Transaction Bundle or a FHIR Batch Bundle. The messages are routed for transaction/batch processing on the FHIR Storage module names persistence
.
This route uses a Kafka manual commit only after the message has been processed successfully, and does not have any error handler. This means that any failing messages will be retried indefinitely until processing the message succeeds.
<route>
<from uri="kafka:bundle-in-topic?brokers=localhost:9092&groupId=my-group-id&maxPollRecords=1&allowManualCommit=true&autoCommitEnable=false&autoOffsetReset=earliest"/>
<to uri="smile:persistence/bundleProcessor&logProgressInterval=100"/>
<to uri="smile:clustermgr/kafkaManualCommit" />
</route>
The following example shows a processor which retries twice, and delivers the payload to a failure topic (ie. a Dead Letter Queue) if the final processing attempt fails. Messages on the DLQ topic, named dlq-topic
, will contain the failing FHIR transaction.
Note that the Kafka manual commit is only invoked after a successful processed message has completed. This means that if a long series of failing messages is encountered and no messages are successfully processed before the Smile CDR process is terminated, the consumer offset will not be incremented and these failing messages may be processed again by a Smile CDR process which is started later.
<route>
<errorHandler>
<deadLetterChannel deadLetterUri="kafka:dlq-topic?brokers=localhost:9092">
<redeliveryPolicy maximumRedeliveries="3" redeliveryDelay="250"/>
</deadLetterChannel>
</errorHandler>
<from uri="kafka:delivery-topic-with-dlq?brokers=localhost:9092&groupId=my-group-id&maxPollRecords=1&allowManualCommit=true&autoCommitEnable=false&autoOffsetReset=earliest"/>
<to uri="smile:persistence/bundleProcessor?logProgressInterval=5"/>
<to uri="smile:clustermgr/kafkaManualCommit" />
</route>
The following example shows a processor which retries twice, and then attempts the final processing using a FHIR Batch instead of a FHIR Transaction. The batch processing mode is slower than the transaction processing mode, but means that the system will process as many entries as it can. If any entries fail on the final attempt, then a new FHIR Batch Bundle containing the failing entries will be sent to the Kafka dlq-failed-entry-topic
topic.
This kind of route can be used in cases where your FHIR transaction bundles contain a large number of unrelated resources, and it is therefore desirable to process as many of them as possible even if one or more of them is unprocessable. This route can result in these transaction bundles being partially processed, with the remaining partial bundle ending up in the Dead Letter Queue.
<route>
<from uri="kafka:guaranteed-delivery-topic?brokers=localhost:9092&groupId=my-group-id&maxPollRecords=1&allowManualCommit=true&autoCommitEnable=false&autoOffsetReset=earliest"/>
<to uri="smile:persistence/bundleProcessor?logProgressInterval=5&retries=3&retryDelayMin=50&retryDelayMax=100&useBatchOnFinalAttempt=true"/>
<when>
<variable>failedEntryBundle</variable>
<to uri="kafka:dlq-failed-entry-topic?brokers=localhost:9092" variableSend="failedEntryBundle"/>
</when>
<to uri="smile:clustermgr/kafkaManualCommit" />
</route>
smile:persistence/ndjsonToBundleProcessor
IBaseBundles
. The optional parameter type
can be used to specify the kind of bundle to create: transaction
(default, if none specified), or batch
. This bundle can then be passed to a bundleProcessor
processor to persist the bundle to the repository.
Additionally, if the optional parameter ensureHomogeneousResourceTypes
is defined and set to true
, NDJSON parsing will fail if resources of different types are encountered in the same NDJSON file.Explicitly defining the output bundle as a transaction bundle.
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="my-route">
<from uri="kafka:bundle-in-topic?brokers=localhost:9092"/>
<to uri="smile:persistence/ndjsonToBundleProcessor?type=transaction"/>
<!-- to persist the result of the transformation -->
<to uri="smile:persistence/bundleProcessor"/>
</route>
</routes>
Failing processing if NDJSON source has multiple resource types when only one is expected.
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="my-route">
<from uri="kafka:bundle-in-topic?brokers=localhost:9092"/>
<to uri="smile:persistence/ndjsonToBundleProcessor?ensureHomogeneousResourceTypes=true"/>
<!-- to persist the result of the transformation -->
<to uri="smile:persistence/bundleProcessor"/>
</route>
</routes>