49.10.1Camel Recipes

 

This page provides additional examples showing how the Camel module can be used to integrate Smile CDR with external systems in ways not supported otherwise by Smile CDR and implement specialized workflows.

49.10.2Load HL7 v2 Messages from AWS S3 Bucket

 

The following describes a Camel module configuration which would enable loading of HL7 v2 messages into Smile CDR from files downloaded from an S3 bucket. The same task could be done by manually or programmatically downloading files from S3 to a local disk folder and then using the Smile CDR CLI hl7v2-transmit-flatfile command to load them into Smile CDR. The Camel module in this example would replicate a similar set of steps in an automated fashion but would process the files in memory (no disk folder needed) and would include handling for mapping errors.

The configuration for this example consists of three Camel routes each consisting of a <route> XML definition with specific dependencies that will need to be deployed and options that can be adjusted. When deploying this configuration, the three <route> elements would be copied into a single <routes> container element which would then be pasted into the Camel Routes (Text) or Camel Routes (File) configuration value of the Camel module. Additionally, a Spring context config class would be configured as the Spring Context Config Class(es) value for the Camel module and used to initiate Java-based processor class dependencies.

The following subsections describe the three Camel routes and dependencies.

49.10.2.1Download File from AWS S3 Bucket

This route depends on an AWS Storage Service Camel component which will need to be added to Smile CDR. See the Adding Other Camel Components section for information about sourcing and deploying jar files for additional Camel components.

The AWS Storage Service component supports many different options. At minimum, the route will need to provide the following for the component:

  • The name of the S3 bucket
  • The AWS service region
  • Credential information needed to authorize access to the S3 bucket.

49.10.2.1.1Fixed Interval Polling Trigger

The sample below provides one variation for this route configured to poll an S3 Bucket named "sample-bucket-123" at 60 second intervals and download any files found.

<route id="download_from_s3">
	<!-- Check for incoming messages from S3 Bucket -->
	<from uri="aws2-s3://sample-bucket-123?region=us-east-2&amp;useDefaultCredentialsProvider=true&amp;delay=60000"/>
	<to uri="direct:parseFileAndSendToKafka" />
</route>

The route uses the AWS DefaultCredentialProviderChain to authenticate download requests. When files are detected in the bucket, the route will download each file into memory and pass the file contents to the next route using the built-in direct component via a Camel Exchange object. The Parse Files into HL7 v2 Messages and Submit to Kafka section below will describe the route that implements the parseFileAndSendToKafka URI. Once the downstream process has confirmed that the file has been received and committed via the exchange, the file will be deleted from the S3 bucket.

49.10.2.1.2External Event Trigger

The sample below provides an alternate implementation for this route configured to instead wait for a message received from an IBM MQ queue before downloading files. In this example, the message from IBM MQ contains the name of a sub-folder in the S3 Bucket and all files contained in the sub-folder will be downloaded and processed.

To facilitate the listening connection to IBM MQ, this route uses the JMS - IBM MQ Source Kamelet. See the Adding Other Camel Components section for information about sourcing and deploying jar and yaml files for Camel Kamelets

<route id="mq_download_from_s3">
	<!-- Listen for incoming messages from IBM MQ -->
	<from uri="kamelet:jms-ibm-mq-source?channel=MY.CHANNEL.NAME&amp;destinationName=MY.QUEUE.NAME&amp;password=MYPASSWORD&amp;queueManager=MY.QUEUE.MANAGER&amp;serverName=localhost&amp;serverPort=1414&amp;username=MYUSERNAME"/>
	<!-- The message is assumed to contain the name of a directory in a S3 bucket.                         -->
	<!-- Append the message body to an AWS S3 request to download all files contained in the subdirectory. -->
	<pollEnrich>
		<simple>aws2-s3://sample-bucket-123?region=us-east-2&amp;useDefaultCredentialsProvider=true&amp;prefix=${bodyAs(String)}</simple>
	</pollEnrich>
	<to uri="direct:parseFileAndSendToKafka" />
</route>

49.10.2.2Parse Files into HL7 v2 Messages and Submit to Kafka

This route uses the built-in Kafka component and a pair of custom Camel processor classes which are described below. Note that the processor bean names must exactly match the method names used to create the beans in the Spring Context Config class.

<route id="parse_hl7v2_send_to_kafka_partition">
	<from uri="direct:parseFileAndSendToKafka" />
	<!-- Use a processor to parse the file contents into a collection of HL7 v2 message objects -->
	<to uri="bean:hl7v2TextParsingProcessor" />
	<!-- Send each Hl7 v2 message object to Kafka topic -->
	<split>
		<spel>#{body}</spel>
		<!-- Use processor to calculate and set a Kafka partition ID in the exchange header -->
		<!-- The line below can be removed if only using a single partition -->
		<to uri="bean:hl7v2PartitionCalculateProcessor" />
		<!-- Send to the hlv2-in-topic topic in Kafka -->
		<to uri="kafka:hl7v2-in-topic?brokers=localhost:9092"/>
	</split>
</route>

49.10.2.2.1Splitting HL7 v2 File into Message Objects

The following Camel processor class parses text files containing HL7 v2 messages into HL7 v2 message objects using the same library as the Smile CDR CLI hl7v2-transmit-flatfile command.

_/*-
 * #%L
 * Smile CDR - CDR
 * %%
 * Copyright (C) 2016 - 2025 Smile CDR, Inc.
 * %%
 * All rights reserved.
 * #L%
 */
package com.example.camel;

import ca.uhn.hl7v2.HapiContext;
import ca.uhn.hl7v2.util.Hl7InputStreamMessageIterator;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

import java.io.StringReader;

/**
 * Processor used to convert String text containing HL7v2 messages (typically loaded from a file)
 * into an {@link Hl7InputStreamMessageIterator} class instance. This effectively parses out the HL7v2
 * String into a collection of HL7v2 message objects which can be iterated through for subsequent
 * processing.
 */
public class HL7v2TextParsingProcessor implements Processor {

   HapiContext myHapiContext;

   public HL7v2TextParsingProcessor(HapiContext theHapiContext) {
      myHapiContext = theHapiContext;
   }

   @Override
   public void process(Exchange exchange) throws Exception {

      // Parse the incoming HL7 v2 file into messages using the HAPI HL7 libraries
      org.apache.camel.Message message = exchange.getIn();
      StringReader hl7v2FileContentsReader = new StringReader(message.getBody(String.class));
      Hl7InputStreamMessageIterator messageIterator =
            new Hl7InputStreamMessageIterator(hl7v2FileContentsReader, myHapiContext);

      // Replace the message body with the Iterator object created above.
      message.setBody(messageIterator);
   }
}

49.10.2.2.2Optional: Calculating a Kafka Partition Based on Patient ID Field

The following Camel processor class parses HL7 v2 message objects, extracting the PID segment from the message and then the first patient identifier from the PID segment. It then calculates a hash of the patient identifier and uses the hash to generate a Kafka partition ID between 0 and 4 (this assumes that the hl7v2-in-topic Kafka topic has at least 5 partitions). The Kafka partition ID is then passed along as a header in the Camel exchange and later used by the Kafka component to determine which partition to publish the message to.

_/*-
 * #%L
 * Smile CDR - CDR
 * %%
 * Copyright (C) 2016 - 2025 Smile CDR, Inc.
 * %%
 * All rights reserved.
 * #L%
 */
package com.example.camel;

import ca.uhn.hl7v2.model.v25.segment.PID;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConstants;

/**
 * Sample processor used to calculate and set a Kafka partition ID in Camel Message header
 * for an HL7 v2 message based on Patient identifier. When the message is later published to a
 * Kafka topic using a Kafka Camel component, the PARTITION_KEY header will direct the component
 * to publish to the specified Kafka partition ID in the topic.
 */
public class HL7v2PartitionCalculateProcessor implements Processor {

   @Override
   public void process(Exchange exchange) throws Exception {

      // Parse the incoming message and extract PID segment
      org.apache.camel.Message message = exchange.getIn();
      ca.uhn.hl7v2.model.Message hapiMsg = message.getBody(ca.uhn.hl7v2.model.Message.class);

      PID pid = (PID) hapiMsg.get("PID");

      // Retrieve the first Patient Identifier and generate a partition ID between 0-4
      // based on a hash of the Identifier value.
      String patientId = pid.getPatientIdentifierList(0).getIDNumber().getValue();
      Integer partition = Math.abs(patientId.hashCode() % 5);

      // Add partition key to Kafka message header
      message.getHeaders().put(KafkaConstants.PARTITION_KEY, partition);
   }
}

49.10.2.2.3Spring Context Config Class

The following class is used to instantiate the two processor classes above and make them accessible to Camel. This class would be configured in the Camel module Spring Context Config Class(es) property as value com.example.camel.HL7ToKafkaCamelAppCtx.

_/*-
 * #%L
 * Smile CDR - CDR
 * %%
 * Copyright (C) 2016 - 2025 Smile CDR, Inc.
 * %%
 * All rights reserved.
 * #L%
 */
package com.example.camel;

import ca.uhn.hl7v2.DefaultHapiContext;
import ca.uhn.hl7v2.HapiContext;
import ca.uhn.hl7v2.parser.CanonicalModelClassFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
 * Sample Spring Config Context class that can be used to create processor beans useful for
 * bulk ingestion of HL7v2 messages from a file into a FHIR repository.
 */
@Configuration
public class HL7ToKafkaCamelAppCtx {

   /**
    * This bean is used to calculate a Kafka partition ID for an HL7v2 message and set an
    * Exchange header that would be referenced by a downstream Kafka Camel component when
    * the message is published to a Kafka topic.
    */
   @Bean
   public HL7v2PartitionCalculateProcessor hl7v2PartitionCalculateProcessor() {
      return new HL7v2PartitionCalculateProcessor();
   }

   /**
    * This bean is used to convert HL7v2 text (e.g. loaded from a file) into a collection
    * of HL7v2 message objects.
    */
   @Bean
   public HL7v2TextParsingProcessor hl7v2TextParsingProcessor() {
      try (HapiContext myHapiContext = new DefaultHapiContext()) {
         CanonicalModelClassFactory mcf = new CanonicalModelClassFactory("2.5");
         myHapiContext.setModelClassFactory(mcf);

         return new HL7v2TextParsingProcessor(myHapiContext);
      } catch (IOException e) {
         throw new RuntimeException(e);
      }
   }
}

49.10.2.3Consume HL7 v2 Message Objects from Kafka and Process in Parallel

This route enables the actual mapping of HL7 v2 messages to FHIR resources and creating/updating the resources in the FHIR repository. It uses the built-in Camel Kafka component to consume HL7 v2 messages from the hl7v2-in-topic topic partitions in parallel and a built-in HL7v2 Inbound Module processor method to pass the messages to an existing HL7 V2 Listening Endpoint module, endpoint_hl7v2_in_v2. It also includes logic to handle any errors that occur when attempting to map the HL7 v2 message to FHIR.

Note that this example only uses the hl7v2ToFhirProcessor component method and so only the default HL7v2-to-FHIR mapping will be performed. See the HL7v2 Inbound Module documentation for other options if custom mapping is required.

<route id="hl7v2_from_kafka_to_persistence">
	<!-- Consume messages from kafka hl7v2-in-topic as consumer hlv2-consume using 3 consumer threads. -->
	<!-- Remove the consumersCount= parameter if only a single partition.     -->
	<from uri="kafka:hl7v2-in-topic?brokers=localhost:9092&amp;groupId=hl7v2-consume&amp;consumersCount=3"/>
	<!-- Perform default mapping -->
	<to uri="smile:endpoint_hl7v2_in_v2/hl7v2ToFhirProcessor"/>
	<!-- If mapping was successful, send to persistence module, -->
	<!-- otherwise send to kafka hlv2-error-topic  -->
	<choice>
		<when>
			<spel>#{body.isDoProcess() and !body.hasErrorIssues()}</spel>
			<split>
				<spel>#{body.bundles}</spel>
				<to uri="smile:persistence/bundleProcessor"/>
			</split>
		</when>
		<otherwise>
			<to uri="kafka:hl7v2-error-topic?brokers=localhost:9092"/>
		</otherwise>
	</choice>
</route>