Smile provides a number of custom processors that can be used to call Smile module services.
A list of Smile processors by module can be found below.
Persistence processors require the FHIR Storage (PERSISTENCE_ALL
) module.
smile:persistence/resourceOperationProcessor
Resource Operations messages posted to the resource-operation-message-in-topic will get processed by the module with id persistence
.
<routes xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="kafka:resource-operation-message-in-topic?brokers=localhost:9092"/>
<to uri="smile:persistence/resourceOperationProcessor"/>
</route>
</routes>
smile:persistence/bundleProcessor
partitionId
can be used to specify the partition to use for the bundle transaction.Bundles posted to the bundle-in-topic will get processed by the module with id persistence
.
<routes xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="kafka:bundle-in-topic?brokers=localhost:9092"/>
<to uri="smile:persistence/bundleProcessor"/>
</route>
</routes>
with partitionId
parameter:
<routes xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="kafka:bundle-in-topic?brokers=localhost:9092"/>
<to uri="smile:persistence/bundleProcessor?partitionId=1"/>
</route>
</routes>
smile:persistence/ndjsonToBundleProcessor
IBaseBundles
. The optional parameter type
can be used to specify the kind of bundle to create: transaction
(default, if none specified), or batch
. This bundle can then be passed to a bundleProcessor
processor to persist the bundle to the repository.
Additionally, if the optional parameter ensureHomogeneousResourceTypes
is defined and set to true
, NDJSON parsing will fail if resources of different types are encountered in the same NDJSON file.Explicitly defining the output bundle as a transaction bundle.
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="my-route">
<from uri="kafka:bundle-in-topic?brokers=localhost:9092"/>
<to uri="smile:persistence/ndjsonToBundleProcessor?type=transaction"/>
<!-- to persist the result of the transformation -->
<to uri="smile:persistence/bundleProcessor"/>
</route>
</routes>
Failing processing if NDJSON source has multiple resource types when only one is expected.
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="my-route">
<from uri="kafka:bundle-in-topic?brokers=localhost:9092"/>
<to uri="smile:persistence/ndjsonToBundleProcessor?ensureHomogeneousResourceTypes=true"/>
<!-- to persist the result of the transformation -->
<to uri="smile:persistence/bundleProcessor"/>
</route>
</routes>
ETL Importer Processors require the ETL Importer (ETL_IMPORTER
) module.
smile:etl_importer/csvProcessor
Use the steps below to send a .csv
file to a csvProcessor
using Camel's File Component.
.csv
file using the sample data from the documentation.<routes xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="file:[directory-from-step-3-above]?fileName=[filename-from-step-3-above.csv]&noop=true"/>
<convertBodyTo type="String"/>
<to uri="smile:etl_importer/csvProcessor"/>
</route>
</routes>
The following route reads a multi-line CSV String (including headers) from the Kafka etl-in-topic
topic and sends it to be processed by an ETL Import Module with an id of etl_importer
.
<routes xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="kafka:etl-in-topic?brokers=localhost:9092"/>
<to uri="smile:etl_importer/csvProcessor"/>
</route>
</routes>
Camel processors only require the Camel (CAMEL
) module.
smile:camel/channelImportProcessor
The following routes fully replicate the functionality in the Channel Import Module when a ResourceOperationJsonMessage message is posted to the kafka topic called channel-import-in-topic.
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="channelImportProcessor">
<from uri="kafka:channel-import-in-topic?brokers=localhost:9092"/>
<to uri="direct:processMessage"/>
</route>
<route id="processResourceOperationMessage">
<from uri="direct:processMessage"/>
<onException>
<exception>ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException</exception>
<exception>ca.uhn.fhir.rest.server.exceptions.InvalidRequestException</exception>
<exception>ca.uhn.fhir.rest.server.exceptions.InternalErrorException</exception>
<handled>
<constant>true</constant>
</handled>
<to uri="smile:camel/incrementRetryCountProcessor"/>
<log logName="ca.cdr.camel" message="Error processing ${body}. Sending to retry queue." loggingLevel="DEBUG"/>
<to uri="kafka:test.retry?brokers=localhost:9092"/>
</onException>
<onException>
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<to uri="direct:failureQueue"/>
</onException>
<log logName="ca.cdr.camel" message="Processing Resource Operation ${body}" loggingLevel="DEBUG"/>
<to uri="smile:camel/channelImportProcessor"/>
</route>
<route id="retryResourceOperationMessage">
<from uri="kafka:test.retry?brokers=localhost:9092"/>
<when>
<jsonpath>$.headers[?(@.retryCount < 3)]</jsonpath>
<delay>
<constant>1000</constant>
</delay>
<log logName="ca.cdr.camel" message="Retrying ${body}" loggingLevel="DEBUG"/>
<to uri="direct:processMessage"/>
</when>
<otherwise>
<to uri="direct:failureQueue"/>
</otherwise>
</route>
<route id="failureQueue">
<from uri="direct:failureQueue"/>
<log logName="ca.cdr.camel" message="Error processing ${body}. Sending to Failure queue." loggingLevel="DEBUG"/>
<to uri="kafka:test.failed?brokers=localhost:9092"/>
</route>
</routes>
smile:camel/incrementRetryCountProcessor
See the Example Route in channelImportProcessor above for an example of how it can be used.
smile:camel/script?function=myFunction&input=bundle
input
parameter will convert the input data before calling the function. If no input parameter is specified, the input will be passed to the function as a String. The following input formats are currently supported:
json
the input message will be converted into a Json document before calling the functionbundle
the input message will be converted into a FHIR bundle before calling the functionhl7v2
the input message will be converted into an HL7v2 message before calling the functionProcessor scripts can access the documented JavaScript Execution Environment APIs. However in cases where a Camel module does NOT have a FHIR Storage module dependency configured, the Fhir API will NOT be available to its processor scripts.
The return value of the function will be passed to the next step in the route.
All functions called by this processor must be defined in the Camel Module.
Below are example routes using the script processor with each of these input types:
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="string">
<from uri="direct:stringStart"/>
<to uri="smile:camel/script?function=processString"/>
<to uri="mock:stringResult"/>
</route>
<route id="json">
<from uri="direct:jsonStart"/>
<to uri="smile:camel/script?function=processJson&input=json"/>
<to uri="mock:jsonResult"/>
</route>
<route id="bundle">
<from uri="direct:bundleStart"/>
<to uri="smile:camel/script?function=processBundle&input=bundle"/>
<to uri="mock:bundleResult"/>
</route>
<route id="hl7v2">
<from uri="direct:hl7v2Start"/>
<to uri="smile:camel/script?function=processHl7v2&input=hl7v2"/>
<to uri="mock:hl7v2Result"/>
</route>
</routes>
Here are some example JavaScript functions that would work with the above routes:
function processString(string) {
var parts = string.split(/\s+/)
var patient = ResourceBuilder.build('Patient');
patient.name[0].given[0] = parts[0];
patient.name[0].family = parts[1];
return patient;
}
function processJson(json) {
// Create a patient
var parts = json.name.split(/\s+/)
var patient = ResourceBuilder.build('Patient');
patient.name[0].given[0] = parts[0];
patient.name[0].family = parts[1];
// Add the patient to a transaction bundle
var transaction = TransactionBuilder.newTransactionBuilder();
transaction.create(patient);
return transaction;
}
function processBundle(bundle) {
var resources = bundle.entryResources();
return resources[0].name[0].family;
}
function processHl7v2(message) {
// Create a patient from an ORU_R01 message
var patient = ResourceBuilder.build('Patient');
patient.name[0].family = message['/PATIENT_RESULT[0]/PATIENT/PID-5[0]-1'];
patient.name[0].given[0] = message['/PATIENT_RESULT[0]/PATIENT/PID-5[0]-2'];
// Add the patient to a transaction bundle
var transaction = TransactionBuilder.newTransactionBuilder();
transaction.create(patient);
return transaction;
}
Cluster Manager module is available to all other modules, so no specific dependency needs to be configured to use it.
Smile Camel broker is both a consumer and a processor, which means that it can participate in a route as a consumer (from)
or producer (to)
endpoint.
Example from
URI: <from uri="smile:clustermgr/broker?topic=?my-kafka-from-topic">
Description: Takes messages from indicated topic, wraps them in a Camel Exchange and sends them to the following route node.
Example to
URI: <to uri="smile:clustermgr/broker?topic=?my-kafka-to-topic">
Description: Sends received Camel Exchange
message to defined Smile internal topic.
The Smile broker allows to reference topics simply by the topic name, which allows replacing a route like the following:
<route>
<from uri="kafka:v2-in-topic?brokers=localhost:9092&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL" />
<to uri="smile:hl7v2/hl7v2ToFhirProcessor" />
<to uri="kafka:bundle-out-topic/brokers=localhost:9092&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL" />
</route>
by the simpler definition:
<route>
<from uri="smile:clustermgr/broker?topic=v2-in-topic" />
<to uri="smile:persistence/bundleProcessor" />
<to uri="smile:clustermgr/broker?topic=bundle-out-topic" />
</route>
When transaction logging is globally enabled, a new transaction log can be started for a route by adding a smile:camel/txLogStart
processor before the first node in which a transaction log step should be added.
After a new transaction log is created for a route, Smile processors will automatically add a log step for each subsequent node (unless explicitly disabled by the txLogStep
parameter). This will occur regardless of whether a node executes successfully or if a node fails with an Exception.
Once a new transaction log is created for a route, custom processors have two alternatives to add transaction log steps:
smile:camel/txLogAddStep
processor after the node which output logging is desired.ICamelProcessorTxLogHelper
(IHl7V2CamelProcessorTxLogHelper
for Hl7V2 processors) bean which provides transaction logging functionality.Transaction log and log steps are accumulated during route execution in a CamelExchangeRequestDetails
object which is stored in the exchange-request
Camel Exchange Property. After a route is terminated (either successfully or from an Exception) the transaction log steps inside the CamelExchangeRequestDetails
are automatically persisted.
Hl7V2 intermediate route processors will add log steps if the intermediate_logging_enabled property is set to true
.
smile:camel/txLogStart
smile:camel/txLogAddStep
Logging Parameter | Used In | Function | Default Value |
---|---|---|---|
txLogStep | Smile processors | (false value) disable automatic logging for processor | true |
showMsgBody | Smile processors | (false value) avoid adding message body to log step | true |
rawMsg | Smile Hl7V2 processors | (false value) disable raw input message body to log step | true |
loggerUri | All processors | Used to override the processor uri identifier in the transaction log steps[^1] | NONE |
[^1]: If your route has more than one processor with the same URI, (which could be the case when using multiple smile:camel/txLogAddStep
processors), the automatic detection of the previous processor label could fail, which would result in a logging step with an empty URI. To avoid transaction log steps with empty URIs, the desired logging step URI can be specified using this parameter.
The following route produces one transaction log with six associated log steps as commented:
<route>
<from uri="direct:start" />
<!-- following processor doesn't log because no tx log initiated -->
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPreConvertScriptProcessor" />
<!-- initiate tx logging -->
<to uri="smile:camel/txLogStart" />
<!-- log step with raw input msg (step 0) + step for processed msg (step 1) -->
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPreConvertMapperBeanProcessor" />
<to uri="bean:customBeanProcessor1" />
<!-- tx log requested for previous step not including msg body (step 2) -->
<to uri="smile:camel/txLogAddStep?showMsgBody=false" />
<!-- automatic tx log (step 3) -->
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPreConvertInterceptorProcessor?txLogStep=false" />
<!-- "automatic" tx log, no body (step 4) -->
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirProcessor" />
<!-- custom processor with no tx log step requested -->
<to uri="bean:customBeanProcessor2" />
<!-- "automatic" tx log, no body (step 5) -->
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPostConvertScriptProcessor?showMsgBody=false" />
<!-- "automatic" tx log show body (step 6) -->
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPostConvertInterceptorProcessor" />
<to uri="mock:result" />
</route>
The following custom Camel processor adds an eagerly generated transaction log step if a transaction log was created earlier within the route:
/**
* Custom processor showing transaction log step addition
*/
public class SampleCustomProcessorAddingTxLogSteps implements Processor {
@Autowired
private ICamelProcessorTxLogHelper myTransactionLogHelper;
@Override
public void process(Exchange exchange) throws Exception {
// if transaction log was already started, adds provided step to it, otherwise do nothing
// steps are eagerly generated
myTransactionLogHelper.addStepIfTxLogActive(exchange, buildTxLogSteps(exchange));
}
public List<TransactionLogStepJson> buildTxLogSteps(Exchange theExchange) {
TransactionLogStepJson step = new TransactionLogStepJson();
step.setInitialTimestamp(new Date());
step.setType(TransactionLogStepTypeEnum.PROCESSING);
step.setOutcome(TransactionLogOutcomeEnum.SUCCESS);
step.setBody(theExchange.getMessage().getBody(String.class));
step.setBodyType(TransactionLogBodyTypeEnum.JSON);
// a string you provide to identify your processor in the transaction log step
step.setRequestUrl(this.getClass().getSimpleName());
return List.of(step);
}
}
The following custom Camel processor adds a lazily generated transaction log step if a transaction log was created earlier within the route:
/**
* Custom processor showing lazily generated transaction log step addition
*/
public class SampleCustomProcessorAddingTxLogStepsLazily implements Processor, ITxLogStepsProvider {
@Autowired
private ICamelProcessorTxLogHelper myTransactionLogHelper;
@Override
public void process(Exchange exchange) throws Exception {
// if transaction log was already started, adds provided step to it, otherwise do nothing
myTransactionLogHelper.addStepIfTxLogActive(exchange, this);
}
// this method is invoked lazily. Useful when steps generation involve heavy process
@Override
public List<TransactionLogStepJson> provideTxLogSteps(Exchange theExchange) {
TransactionLogStepJson step = new TransactionLogStepJson();
step.setInitialTimestamp(new Date());
step.setType(TransactionLogStepTypeEnum.PROCESSING);
step.setOutcome(TransactionLogOutcomeEnum.SUCCESS);
step.setBody(theExchange.getMessage().getBody(String.class));
step.setBodyType(TransactionLogBodyTypeEnum.JSON);
// a string you provide to identify your processor in the transaction log step
step.setRequestUrl(this.getClass().getSimpleName());
return List.of(step);
}
}
Hl7v2 Inbound Processors require the HL7 v2.x Listening Endpoint (ENDPOINT_HL7V2_IN
) module. These processors execute each message transformation step of the HL7 v2.x listening endpoint module.
Each Hl7v2 processor has the specifications below:
String
, or an Hl7v2ToFhirConversionResultJsonA description of each Hl7v2 processor is provided below:
Executes the logic inside the Javascript onPreConvertHl7V2ToFhir
callback function.
This processor is for backwards compatibility to support custom Mapper Bean Types.
For pre-convert and post-convert customizations, Javascript callbacks (onPreConvertHl7V2ToFhir
, onPostConvertHl7V2ToFhir
),
or Java interceptors (HL7V2IN_PRE_HL7V2_TO_FHIR_MAPPING_PROCESSING
, HL7V2IN_POST_HL7V2_TO_FHIR_MAPPING_PROCESSING
) should be used instead.
Executes the logic inside the specified custom mapper bean class.
Executes the logic inside specified by HL7V2IN_PRE_HL7V2_TO_FHIR_MAPPING_PROCESSING
interceptor.
Translates an incoming HL7V2 message into FHIR transaction Bundles using the Smile generic mapper. These Bundles are placed in the bundles
field of the outputted Hl7v2ToFhirConversionResultJson.
Executes the logic inside the JavascriptonPostConvertHl7V2ToFhir
callback function.
Executes the logic inside specified by theHL7V2IN_POST_HL7V2_TO_FHIR_MAPPING_PROCESSING
interceptor.
The following route receives an HL7v2 message from a kafka topic called in-topic and passes it to the pre-convert customization points, the generic Smile Hl7v2-to-FHIR mapper, and the post-convert customization points of the HL7 v2.x listening endpoint module.
After the last Hl7v2 processor has finished executing (i.e. hl7v2ToFhirPostConvertInterceptorProcessor
)
Camel is used check to make sure the doProcess
flag is still true and no errors occurred during the conversion process.
If this check succeeds, the converted bundles
of the Hl7v2ToFhirConversionResultJson are persisted using the bundleProcessor
.
If the checks are not successful, the entire Hl7v2ToFhirConversionResultJson is posted to the kafka error-topic.
<routes xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="kafka:in-topic?brokers=localhost:9092"/>
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPreConvertScriptProcessor"/>
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPreConvertMapperBeanProcessor"/>
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPreConvertInterceptorProcessor"/>
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirProcessor"/>
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPostConvertScriptProcessor"/>
<to uri="smile:endpoint_hl7v2_in/hl7v2ToFhirPostConvertInterceptorProcessor"/>
<choice>
<when>
<spel>#{body.isDoProcess() and !body.hasErrorIssues()}</spel>
<split>
<spel>#{body.bundles}</spel>
<to uri="smile:persistence/bundleProcessor"/>
</split>
</when>
<otherwise>
<to uri="kafka:error-topic?brokers=localhost:9092"/>
</otherwise>
</choice>
</route>
</routes>
CDA Exchange+ processors require that the CDA Exchange+ (CDA_EXCHANGE_PLUS
) module is running. These processors execute each individual step of the CDA Exchange+ import transformation CDA Exchange+.
Each CDA Exchange+ processor has the specifications below:
String
, or an CdaToFhirConversionResultJsonA description of each CDA Exchange+ processor is provided below:
Executes the logic found in the pre-import script method (onPreImportCDA
) that is configured in the CDA Exchange+ module.
The resulting changes are set in the modifiableDocument
property on the CdaToFhirConversionResultJson.
Invokes the pointcut CDA_PRE_IMPORT
, executing any interceptor code that is configured in the CDA Exchange+ module.
The resulting changes are set in the modifiableDocument
property on the CdaToFhirConversionResultJson.
Translates an incoming CDA Exchange document into a FHIR transaction Bundle using the CDA Exchange+ engine. This Bundle is placed in the bundle
field of the outputted CdaToFhirConversionResultJson.
Executes the logic found in the post-import script method (onPostImportCDA
) that is configured in the CDA Exchange+ module.
The resulting changes to the bundle are set in the bundle
property on the CdaToFhirConversionResultJson.
Invokes the pointcut CDA_POST_IMPORT
, executing any interceptor code that is configured in the CDA Exchange+ module.
The resulting changes to the bundle are set in the bundle
property on the CdaToFhirConversionResultJson.
<route>
<from uri="kafka:in-topic?brokers=localhost:9092"/>
<to uri="smile:cda_exchange_plus/cdaToFhirPreConvertScriptProcessor"/>
<to uri="smile:cda_exchange_plus/cdaToFhirPreConvertInterceptorProcessor"/>
<to uri="smile:cda_exchange_plus/cdaToFhirProcessor"/>
<to uri="smile:cda_exchange_plus/cdaToFhirPostConvertScriptProcessor"/>
<to uri="smile:cda_exchange_plus/cdaToFhirPostConvertInterceptorProcessor"/>
<choice>
<when>
<spel>#{body.isDoProcess()}</spel>
<setBody>
<spel>#{body.bundle}</spel>
</setBody>
<to uri="smile:persistence/bundleProcessor"/>
</when>
</choice>
</route>