47.4.1Camel Processors

 

Camel processors only require the Camel (CAMEL) module.

47.4.2Channel Import Processor

 
  • Example URI: smile:camel/channelImportProcessor
  • Description: Performs similar functionality to the Channel Import Module. Note that this processor assumes that retry and failure processing will be handled by the Camel route. This functionality assumes the JavaScript functions used by ChannelImport are defined in the Camel Module. Note that even though this processor is called channelImportProcessor, it does not interact with a Channel Import Module in any way. It is provided to give users an option to migrate from the Channel Import Module to the Camel Module.
  • Input: ResourceOperationJsonMessage
  • Output: N/A

47.4.2.1Example Route

The following routes fully replicate the functionality in the Channel Import Module when a ResourceOperationJsonMessage message is posted to the kafka topic called channel-import-in-topic.

<routes xmlns="http://camel.apache.org/schema/spring">
	<route id="channelImportProcessor">
		<from uri="kafka:channel-import-in-topic?brokers=localhost:9092"/>
		<to uri="direct:processMessage"/>
	</route>

	<route id="processResourceOperationMessage">
		<from uri="direct:processMessage"/>
		<onException>
			<exception>ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException</exception>
			<exception>ca.uhn.fhir.rest.server.exceptions.InvalidRequestException</exception>
			<exception>ca.uhn.fhir.rest.server.exceptions.InternalErrorException</exception>
			<handled>
				<constant>true</constant>
			</handled>
			<to uri="smile:camel/incrementRetryCountProcessor"/>
			<log logName="ca.cdr.camel" message="Error processing ${body}.  Sending to retry queue." loggingLevel="DEBUG"/>
			<to uri="kafka:test.retry?brokers=localhost:9092"/>
		</onException>
		<onException>
			<exception>java.lang.Exception</exception>
			<handled>
				<constant>true</constant>
			</handled>
			<to uri="direct:failureQueue"/>
		</onException>
		<log logName="ca.cdr.camel" message="Processing Resource Operation ${body}" loggingLevel="DEBUG"/>
		<to uri="smile:camel/channelImportProcessor"/>
	</route>

	<route id="retryResourceOperationMessage">
		<from uri="kafka:test.retry?brokers=localhost:9092"/>
		<when>
			<jsonpath>$.headers[?(@.retryCount &lt; 3)]</jsonpath>
			<delay>
				<constant>1000</constant>
			</delay>
			<log logName="ca.cdr.camel" message="Retrying ${body}" loggingLevel="DEBUG"/>
			<to uri="direct:processMessage"/>
		</when>
		<otherwise>
			<to uri="direct:failureQueue"/>
		</otherwise>
	</route>

	<route id="failureQueue">
		<from uri="direct:failureQueue"/>
		<log logName="ca.cdr.camel" message="Error processing ${body}.  Sending to Failure queue." loggingLevel="DEBUG"/>
		<to uri="kafka:test.failed?brokers=localhost:9092"/>
	</route>
</routes>

47.4.3Increment Retry Count Processor

 
  • Example URI: smile:camel/incrementRetryCountProcessor
  • Description: This increments the retry count in a BaseJsonMessage (e.g. on a ResourceOperationJsonMessage).
  • Input: BaseJsonMessage
  • Output: N/A

See the Example Route in channelImportProcessor above for an example of how it can be used.

47.4.3.1Script

  • Example URI: smile:camel/script?function=myFunction&input=bundle&includeExchange=true

  • Description: This calls an arbitrary JavaScript function with the input from the route. The function can optionally receive the Camel Exchange as a second parameter.

    Parameters:

    • function: The JavaScript function to call (required)
    • input: Specifies the format to convert the input data before calling the function (optional)
      • json: converts input message into a JSON document
      • hl7v2: converts input message into an HL7v2 message
      • If not specified, the input message will be passed as a String. (Note there used to also be a bundle input type but that is now automatically handled without specifying an inputType)
    • includeExchange: When equal to true, passes the Camel Exchange as a second parameter to the function (optional)
  • Input:

    • First parameter: JSON, IBaseBundle, Hl7v2 Message, or String (depending on configuration)
    • Second parameter (optional): Camel Exchange object - provides access to headers and properties of the exchange
      • exchange.headers: Provides read/write access to exchange message headers
      • exchange.properties: Provides read/write access to exchange properties
  • Output: The return value of the function will be passed to the next step in the route.

47.4.3.2Example Routes

Below are example routes using the script processor with each of these input types:

<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="string">
        <from uri="direct:stringStart"/>
        <to uri="smile:camel/script?function=processString"/>
        <to uri="mock:stringResult"/>
    </route>

    <route id="stringWithExchange">
        <from uri="direct:stringWithExchangeStart"/>
        <setProperty name="myProperty">
            <constant>original property value</constant>
        </setProperty>
        <setHeader name="myHeader">
            <constant>original header value</constant>
        </setHeader>
        <to uri="smile:camel/script?function=processStringWithExchange&amp;includeExchange=true"/>
        <to uri="mock:stringWithExchangeResult"/>
    </route>

    <route id="json">
        <from uri="direct:jsonStart"/>
        <to uri="smile:camel/script?function=processJson&amp;input=json"/>
        <to uri="mock:jsonResult"/>
    </route>

    <route id="bundle">
        <from uri="direct:bundleStart"/>
        <to uri="smile:camel/script?function=processBundle&amp;input=bundle"/>
        <to uri="mock:bundleResult"/>
    </route>

    <route id="hl7v2">
        <from uri="direct:hl7v2Start"/>
        <to uri="smile:camel/script?function=processHl7v2&amp;input=hl7v2"/>
        <to uri="mock:hl7v2Result"/>
    </route>
</routes>

Here are some example JavaScript functions that would work with the above routes:

function processString(string) {
   var parts = string.split(/\s+/)

   var patient = ResourceBuilder.build('Patient');
   patient.name[0].given[0] = parts[0];
   patient.name[0].family = parts[1];

   return patient;
}

function processStringWithExchange(message, exchange) {
   // Access exchange properties
   var properties = exchange.properties;
   var originalPropertyValue = properties['myProperty'];

   // Change existing exchange property
   properties['myProperty'] = 'changed property value';
   // Add new exchange property
   properties['newProperty'] = 'added property';

   // Access exchange headers
   var headers = exchange.headers;
   var originalHeaderValue = headers['myHeader'];
   
   // Change existing exchange header
   headers['myHeader'] = 'changed header value';
   // Add new exchange header
   headers['newHeader'] = 'added header';

   return message + ", " + originalPropertyValue + ", " + originalHeaderValue;
}

function processJson(json) {
// Create a patient
   var parts = json.name.split(/\s+/)

   var patient = ResourceBuilder.build('Patient');
   patient.name[0].given[0] = parts[0];
   patient.name[0].family = parts[1];

// Add the patient to a transaction bundle
   var transaction = TransactionBuilder.newTransactionBuilder();
   transaction.create(patient);

   return transaction;
}

function processBundle(bundle) {
   var resources = bundle.entryResources();
   return resources[0].name[0].family;
}

function processHl7v2(message) {
   // Create a patient from an ORU_R01 message
   var patient = ResourceBuilder.build('Patient');
   patient.name[0].family = message['/PATIENT_RESULT[0]/PATIENT/PID-5[0]-1'];
   patient.name[0].given[0] = message['/PATIENT_RESULT[0]/PATIENT/PID-5[0]-2'];

   // Add the patient to a transaction bundle
   var transaction = TransactionBuilder.newTransactionBuilder();
   transaction.create(patient);

   return transaction;
}