Many health systems already have data stored in FHIR format that you might want to ingest into Smile CDR. While there are tools like ETL Import and CSV Bulk Import, these rely on a static data source, which is often not how you want to have the data ingested. While various mechanisms exist to attempt to push data in real-time from one system to another, the Channel Import module aims to provide a channel-based method of ingesting FHIR data into Smile CDR.
When a Channel Import module is added, it listens to a given named channel provided by a Message Broker. Whenever a message arrives on that channel, the Channel Import module will consume it, and process the message accordingly.
Some external publisher will publish messages into the channel, and the Channel Import module will consume them. The module supports functionality for retrying failed messages, which is documented in the Message Broker Management section.
To enable channel import on a Smile CDR instance, several modules must be used together. The following diagram shows how these modules relate to each other.
This diagram identifies the modules outlined below.
The Cluster Manager module contains the configuration used to connect to the selected message broker.
See the Message Broker documentation for information on how to select and configure a message broker. By default an embedded Apache ActiveMQ server will be used, and this is acceptable for testing purposes but an external broker should be used in a production scenario.
The FHIR Storage module is used as the destination for data coming in from the channel.
The Channel Import module subscribes to the channel defined in configuration. It processes incoming messages, and forwards the requests to the FHIR Storage module.
Channel Import currently supports the following formats for messages.
In order to publish valid messages to the Channel Import channel, they must follow the JSON structure of a ResourceOperationMessage.
For all message types, the shape of the request is the same, but the content of the internal payload changes.
The next section details how to correctly form messages of a given type. This discussion surrounds the content of the payload
field of a given message. All the examples in this section will work if used in a tool such as kafkacat
For payloads of type application/fhir+json
or application/json+fhir
, there are three types of operations supported, which mirror the standard FHIR REST operations; POST, PUT, and DELETE. Since this module is specifically for importing data, it does not support the GET operation.
Here is an example of such a payload:
{
"payload": {
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"mediaType": "application/fhir+json",
"operationType": "UPDATE",
"payload": "{\"resourceType\":\"Patient\",\"id\":\"Patient/G123\",\"name\":[{\"family\":\"Sample\",\"given\":[\"Sample\",\"Samplington\"]}]}"
}
}
transactionId
– An optional arbitrary transaction identifier.mediaType
– the content type of the payload. For FHIR data, this is set to application/fhir+json
operationType
– An OperationTypeEnum, consisting of either UPDATE/DELETE/CREATE. These map to the FHIR operations PUT/DELETE/POST.payload
– An IBaseResource, converted to a string. This is the primary content of the message.If your data is not in FHIR Format, but is JSON, Channel Import can ingest it with the help of the Javascript Execution Environment. The user must define a mapping script to consume the JSON. Here is an example message showing how to correctly format a message that is arbitrary JSON:
{
"payload": {
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"mediaType": "application/json",
"payload": "{\"name\": \"Shallan Davar\"}"
}
}
In order for channel import to process this message, the JavaScript execution environment must have a handler defined for the function handleChannelImportJsonPayload
. This example script will take the name field, split it on spaces, and create a patient object out of it.
function handleChannelImportJsonPayload(jsonObject) {
var name = jsonObject.name;
var parts = name.split(' ')
//Create an empty patient
var patient = ResourceBuilder.build('Patient');
//Populate it with a name.
patient.name[0].given[0] = parts[0];
patient.name[0].family = parts[1];
//Create the transaction
var transaction = TransactionBuilder.newTransactionBuilder();
//Add the patient create operation to the transaction
transaction.create(patient);
//Execute the transaction
Fhir.transaction(transaction);
}
The only parameter to the method is jsonObject
, which is a GSON JsonObject. You may use any of the methods defined in the documentation for interacting with the JsonObject. All of the standard Javascript APIs such as HTTP API, FHIR Model API, and Environment API are also available.
The execution of this script with this message would generate the following patient in the database:
{
"resourceType": "Patient",
"id": "3152",
"meta": {
"versionId": "1",
"lastUpdated": "2021-01-31T11:32:10.823-08:00"
},
"name": [
{
"family": "Davar",
"given": [
"Shallan"
]
}
]
}
CSV processing via Channel Import is possible via a dependent ETL Import module. While configuring Channel Import, you may select a configured ETL Import module as a dependency. If this is done, any CSV files received by Channel Import will be forwarded to the ETL Import module for processing. Here is an example message that contains a CSV.
{
"payload": {
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"mediaType": "text/csv",
"payload": "id,name,nickname,birthdate\n1,Kaladin,Kal,1998-06-20\n2,Dalinar,Blackthorn,1960-01-14",
"attributes": {
"filename": "names.csv"
}
}
}
Upon reception of this message, the payload and the filename (as indicated in the attribute filename
) will be forwarded to the ETL Import module. The documentation for ETL Import shows in great detail how to configure a handler for the incoming CSV.
Channel Import supports ingestion of arbitrary string payloads, which can be transformed and processed via the same mechanism as arbitrary FHIR payloads. This is an example string message, which uses the text/plain
mediatype.
{
"payload": {
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"mediaType": "text/plain",
"payload": "Hello my name is Diederik"
}
}
In order to handle plaintext, you must define a JavaScript handler which can process it. The following is an example of such a script.
function handleChannelImportStringPayload(stringPayload) {
var prefix = "Hello my name is ";
var name = stringPayload.substring(prefix.length);
//Create an empty patient
var patient = ResourceBuilder.build('Patient');
//Populate it with a name.
patient.name[0].given[0] = name
//Create the transaction
var transaction = TransactionBuilder.newTransactionBuilder();
//Add the patient create operation to the transaction
transaction.create(patient);
//Execute the transaction
Fhir.transaction(transaction);
}
String processing should be considered a last resort, as it is difficult to do robustly.
HL7 v2.x processing via Channel Import is possible via a dependent HL7 v2.x Support: Inbound Messaging module. While configuring Channel Import, you may select a configured HL7 v2.x Inbound module as a dependency. If this is done, any HL7 v2.x messages received by Channel Import will be forwarded to the HL7 v2.x Inbound module for processing. Here is an example message that contains a HL7 v2.x message.
{
"payload": {
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"mediaType": "application/hl7-v2",
"payload": "MSH|^~\\&|||||||ADT^A01^ADT_A01|A001|T|2.5\rPID|||7000135^^^http://acme.org/mrns^MR~01238638267^^^http://acme.org/secondaryIds^SB||Smith^John^Q^Jr^^^L|||||||||||||||||||||||||\rPV1||||A03.0.1|||||||||||||||4736455^^^http://acme.org/visitNumbers^VN"
}
}
Upon reception of this message, the payload will be forwarded to the HL7 v2.x Inbound module. The documentation for HL7 v2.x Support: Inbound Messaging shows in great detail how to configure a handler for the incoming HL7 v2.x messages.
Lastly, CDA documents may be imported via Channel Import using the experimental CDA Exchange+ module. When configuring Channel Import you may select a configured CDA Exchange+ module as a dependency. Once this is done any CDA documents received will be sent to the CDA Exchange+ module for processing. The following is an example message that contains a CDA Exchange+ message (contents edited for brevity).
{
"payload": {
"transactionId": "a0c42c2b-43e9-4e09-8cf2-be3a7b95fb19",
"mediaType": "text/xml+cda",
"payload": "<?xml-stylesheet type=\"text/xsl\" href=\"CCDA.xsl\" alternate=\"no\" title=\"Allscripts Default\" ?><?xml-stylesheet type=\"text/xsl\" href=\"ccda.xsl\"?><ClinicalDocument xmlns=\"urn:hl7-org:v3\" xmlns:sdtc=\"urn:hl7-org:sdtc\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><realmCode code=\"US\"/><typeId root=\"2.16.840.1.113883.1.3\" extension=\"POCD_HD000040\"/><templateId root=\"2.16.840.1.113883.10.20.22.1.8\" extension=\"2015-08-01\"/><templateId root=\"2.16.840.1.113883.10.20.22.1.8\"/><templateId root=\"2.16.840.1.113883.3.3251.1.1\"/><templateId root=\"2.16.840.1.113883.10.20.22.1.1\" extension=\"2015-08-01\"/><templateId root=\"2.16.840.1.113883.10.20.22.1.1\"/><templateId root=\"1.3.6.1.4.1.19376.1.5.3.1.5.5.1\"/><templateId root=\"1.3.6.1.4.1.19376.1.5.3.1.5.5.2\"/><templateId root=\"2.16.840.1.113883.10.20.22.1.2\"/> . . . </ClinicalDocument>"
}
}
When receiving a message of this type, the message payload portion of the message will be forwarded to the CDA Exchange+ module for importing. See the CDA Exchange+ documentation for information on how to configure this module.
If you are a user of ActiveMQ, when adding messages to a queue, you must define a message type so the receiver knows how to handle it. This can be done one of two ways. You can either set the JMS type of the message, or you can set a JMS property on the message.
If you are using the JMS Type, you must set its value to ResourceOperationJsonMessage
. If you are instead doing this via JMS Property,
you must set the property as the following key-value pair: {"SmileCdrJsonType": "ResourceOperationJsonMessage"}
.
The Channel Import Troubleshooting Log can be helpful in diagnosing issues relating to Channel Import. Furthermore, if a message completely fails processing, the cause of the failure is stored along with the failed message.
The following examples will show how to form ResourceOperationMessages for different use cases you may have.
Posting the following message to the channel would result in the resource being ingested into the storage module.
{
"payload": {
"mediaType": "application/fhir+json",
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"operationType": "CREATE",
"payload": "{\"resourceType\":\"Patient\",\"name\":[{\"family\":\"Sample\",\"given\":[\"Sample\",\"Samplington\"]}]}"
}
}
Note that the operationType is set to CREATE
, and the resource in the payload has no ID. This is acceptable because the server will ignore any ID in the resource during a create operation.
{
"payload": {
"mediaType": "application/fhir+json",
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"operationType": "UPDATE",
"payload": "{\"resourceType\":\"Patient\",\"id\":\"Patient/G123\",\"name\":[{\"family\":\"Sample\",\"given\":[\"Sample\",\"Samplington\"]}]}"
}
}
When using operationType UPDATE
, the server will attempt to respect and preserve the ID of the resource. Caution must be exercised when using this method as there exists the possibility of conflicting with existing resources on the server.
{
"payload": {
"mediaType": "application/fhir+json",
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"operationType": "DELETE",
"payload": "{\"resourceType\":\"Patient\",\"id\":\"Patient/G123\",\"name\":[{\"family\":\"Sample\",\"given\":[\"Sample\",\"Samplington\"]}]}"
}
}
When using operationType DELETE
, the resource's ID must be present in the payload body. If the resource ID is known to the FHIR Storage module, it will be deleted.
{
"mediaType": "application/fhir+json",
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"operationType": "CREATE",
"payload": "{\"resourceType\":\"Bundle\",\"id\":\"bundle-add-patients\",\"type\":\"transaction\",\"entry\":[{\"request\":{\"method\":\"POST\",\"url\":\"Patient\"},\"resource\":{\"resourceType\":\"Patient\",\"name\":[{\"family\":\"Samplefamily\",\"given\":[\"Sample\",\"Sample2\"]}]}},{\"request\":{\"method\":\"POST\",\"url\":\"Patient\"},\"resource\":{\"resourceType\":\"Patient\",\"name\":[{\"family\":\"Samplefamily\",\"given\":[\"Sample\",\"Updatedsamplename\"]}]}}]}"
}
When operating on a collection of resources, a Bundle resource with a Bundle.type
value of transaction
or batch
can be provided as the payload body. The system will disregard the operationType and apply the Bundle using FHIR transaction or batch transactions, respectively.
If partitioning (multitenancy) is enabled on the Channel Import's persistence module, the Partition ID needs to be explicitly set in the broker message. A message that arrives with such a field will be correctly routed.
Otherwise, you see an error when attempting to import a resource.
The following is an example of a payload with partition details:
{
"payload": {
"transactionId": "f31d9d2f-4857-4344-824a-74c124bbb21f",
"mediaType": "application/fhir+json",
"operationType": "UPDATE",
"payload": "{\"resourceType\":\"Patient\",\"id\":\"Patient/G123\",\"name\":[{\"family\":\"Sample\",\"given\":[\"Sample\",\"Samplington\"]}]}",
"partitionId" : {
"partitionNames" : [ "DEFAULT" ]
}
}
}