32.0.1ETL Import (Extract/Transform/Load)

 

The Smile CDR ETL Import module can be used to load data into the CDR via an external flat file in CSV (or similar) format.

32.0.2Importing CSV Data

 

Comma-Separated Value (CSV) data is a simple table-based format containing rows of data in named (or sometimes simply numbered) columns. It is a commonly used format for exporting data from databases and spreadsheets in a platform neutral way.

CSV data is broken up into rows, where each new line forms a separate record of data. For example, the following example shows a simple extract of Patients from a system.

ID,FAMILYNAME,GIVENNAME,MIDDLENAME,BIRTHDATE,CITY
001,Smith,John,Edward,1920-01-22,LONDON
002,Fernie,Frances,,1967-12-11,PARIS

CSV data typically separates values using a comma but sometimes a different character is used. Tab-separated and pipe-separated are popular choices, and these are both supported by the ETL Import module. CSV data typically uses the first row to indicate the names of the columns but this is not mandatory – although it does make processing easier.

Importing CSV data involves three steps:

  1. Pushing data into the ETL Import module.
  2. Transforming this data into appropriate FHIR resources.
  3. Storing the data in a persistence module.

32.0.2.1Creating a CSV-based ETL Import Module

The import module has several properties that should be set:

  • The import_source property should be set to CSV to specify that the importing data will be in CSV format.
  • A mapping.script should be specified (see below).
  • The csv_delimiter should be set to the appropriate delimiter (typically ,). For tab, use \t.

32.0.2.2Creating a Mapping Script

The ETL Import module uses JavaScript to specify mapping scripts. The mapping script must contain at least one function, with signature function handleEtlImportRow(inputMap, context) {}. This method will be called once for each row in the CSV file. This method should convert the row into one or more FHIR resources and then store them in the persistence module (or it may choose to ignore the row completely).

32.0.3Function: handleEtlImportRow(theInputMap, theContext)

 

The handleEtlImportRow method has two arguments:

  • theInputMap (argument 0) – A dictionary containing key-value pairs. Each key represents a column name, and each value represents the value for the given key in the current row. Note that this map is strict: any request for a key (i.e. column) that is not defined in the CSV file results in an error.
  • theContext (argument 1) – An object containing details about the current processing job. This object is of type CsvProcessorContext

32.0.3.1Example

/*
 * CSV Mapping function - Expects input in the format:
 * ID,FAMILYNAME,GIVENNAME,MIDDLENAME,BIRTHDATE,CITY
 */
function handleEtlImportRow(theInputMap, theContext) {
    Log.info("Processing CSV row from file: " + theContext.filename);

    // Create a patient
    var patient = ResourceBuilder.build('Patient');
    patient.id = Uuid.newPlaceholderId();

    // Identifier
    patient.identifier[0].system = 'http://example.com/mrns';
    patient.identifier[0].value = theInputMap['ID'];

    // Name
    patient.name[0].family = theInputMap['FAMILYNAME'];
    patient.name[0].given[0] = theInputMap['GIVENNAME'];
    patient.name[0].given[1] = theInputMap['MIDDLENAME'];

    // DOB
    patient.birthDate = theInputMap['BIRTHDATE'];

    // Address
    patient.address[0].city = theInputMap['CITY'];

    // Build a transaction and process it
    var transaction = TransactionBuilder.newTransactionBuilder();
    transaction.create(patient);
    Fhir.transaction(transaction);
}

In this example, each row is converted to a Patient resource and then stored in the CDR. The Smile CDR JavaScript execution environment makes it easy to build and manipulate FHIR Resource objects using the FHIR ResourceBuilder.

The created resources are then converted into FHIR transactions that can be used to store and manipulate resources.

For a complete reference of APIs available for JavaScript conversion see the Smile CDR Javascript Execution Environment documentation.

32.0.4Handling Multiple Import Formats

 

If you will be uploading multiple types of data, where the CSV formats will vary according to the type of data you are ingesting, it is highly recommended to combine your scripts into a single script that is able to handle all datatypes.

This does not mean that all logic needs to be combined into one large function, as you can still break out logic into any arrangement you would like within the script.

In order to give the script a hint about which type of data it is processing, two flags are available to the script via the CsvProcessorContext object:

  • The filename indicates to the script the name of the file that is currently being processed. This filename is passed as a parameter to the JSON Admin API when the ETL job is invoked. If you are using smileutil to trigger the job, this parameter will be populated automatically.

  • The userJobType allows an arbitrary string token to be passed to the script. This string is transparent to Smile CDR and can be any format you would like. If you are using smileutil to trigger the job, you can indicate the userJobType as an argument to this command.

The following example shows a script that uses different processing logic depending on the user job type.

function handleEtlImportRow(theInputMap, theContext) {
    let jobType = theContext.userJobType;
    if (jobType === 'PATIENT') {
        handlePatientRow(theInputMap);
    } else if (jobType === 'OBSERVATION') {
        handleObservationRow(theInputMap);
    } else {
        throw 'No job type specified'; // just in case
    }
}

function handlePatientRow(theInputMap) {
    let patient = ResourceBuilder.build('Patient');
    patient.name[0].family = theInputMap['FAMILY'];
    patient.name[0].given[0] = theInputMap['GIVEN'];
    // ...populate other elements...
    Fhir.create(patient);
}

function handleObservationRow(theInputMap) {
    let observation = ResourceBuilder.build('Observation');
    observation.code.coding[0].system = 'http://loinc.org';
    observation.code.coding[0].value = theInputMap['FAMILY'];
    // ...populate other elements...
    Fhir.create(observation);
}

The same type of logic could also be achieved using filenames.

if (theContext.filename.includes("patients")) {
    // handle patients
}

32.0.5Initiating ETL Jobs

 

Once the module has been created, the ETL Import Endpoint on the JSON Admin API may be used to upload and process data.

See ETL Import Endpoint for details on how to use this endpoint. Note that this action must be invoked by a user with the ETL_IMPORT_PROCESS_FILE permission. This user does not require additional permissions in order to read and write FHIR data.

It is possible to invoke this endpoint using any standard REST client including Insomnia or curl.

The smileutil command also provides a simple command line mechanism for invoking this operation to initiate ETL jobs. See smileutil: Upload CSV Bulk Import File for information.

32.0.6Performance and Concurrency

 

When ingesting large amounts of data, it is often desirable to perform multiple jobs in parallel. When designing your ingestion strategy is it important to consider a few things:

  • Tuning your available thread count on the Parallelism Thread Count appropriately can improve your throughput. Generally a value that is 2-3x your available CPU core count yields the best results, but it is a good idea to try different values. Make sure that your database connection pool is sized to be large enough to not be a bottleneck as well.

  • If your thread count is tuned well, it is important to realize that a single ETL import job will use most of the available CPU processing power. Adding additional parallel jobs may not improve your overall rate of ingestion, and in some cases may actually decrease it. This is particularly acute when using multiple instances of the ETL Import module on the same Smile CDR node, since each will compete for CPU power. It is preferable to use a single ETL Import module and invoke it multiple times in parallel, since this ensures that the different jobs use a shared worker pool.

  • If you are transmitting large files, and especially if you are transmitting multiple large files in parallel, it is helpful to send these files in smaller batches. If you are using smileutil, adding an argument like --split-rows 10000 to your command line ensures that HTTP invocations do not time out.

32.0.7Locking Rows

 

A common problem when ingesting CSV data in a multithreaded environment is that multiple rows can attempt to create/update the same backing object, which leads to failures because the FHIR server does not permit multiple threads to update the same resource at the same time.

In order to assist with this, the context object has a method called lock(mutexName) that can be used to lock individual rows.

The lock object simply acquires a mutex on a particular string, releasing this mutex only when the row has finished processing.

For example:

function handleEtlImportRow(inputMap, context) {
   var patient = ResourceBuilder.build('Patient');
   
   // Lock on the patient ID
   context.lock(inputMap['PATIENT_ID']);
   
   patient.id = inputMap['PATIENT_ID'];
   patient.name.family = inputMap['PATIENT_FAMILY'];
   
   Fhir.update(patient);
}

32.0.8Creating a Hashing Script

 

A common hurdle when processing a CSV file in parallel is recognizing whether the order of rows in the file is important. If the desire is to process the rows in order, then we can write a hashing script to facilitate this. The hashing script must contain at least one function, with signature function hashEtlImportRow(inputMap, context) {}.

The hashEtlImportRow method has two arguments:

  • inputMap (argument 0) – A dictionary containing key-value pairs. Each key represents a column name, and each value represents the value for the given key in the current row. Note that this map is strict: any request for a key (i.e. column) that is not defined in the CSV file results in an error.
  • context (argument 1) – An object containing details about the current processing job.

This function determines which multiplexing queue to put a given row in, in order to ensure that the data are being processed in the correct sequence.

32.0.8.1Hashing Script Example

function hashEtlImportRow(theInputMap, theContext) {
    var filename = theContext.filename;
    if (filename.includes("PATIENTS_")) {
        return theInputMap['PATIENT_ID'];
    } else if (filename.includes("OBSERVATIONS_")) {
        return theInputMap['OBSERVATION_ID'];
    } else {
        throw 'Unexpected filename: ' + filename;
    }
}

32.0.9Asynchronous Execution

 

By default, ETL jobs are executed synchronously across a thread pool. This means for example that if a client invokes an ETL job with 1000 rows and a configured Parallelism Thread Count (see above) processing thread count of 10, each thread will process roughly 100 rows and the HTTP client call will return when all 1000 rows have been processed.

An alternate processing mode called Asynchronous Mode can also be used. To enable this, set the Asynchronous Mode Enabled configuration to true.

With this setting, instead of performing all writes inline, the job will be stored in a temporary database table within the FHIR Storage module, and the actual data ingestion will happen asynchronously. This means that the client will return quickly and the actual processing will happen in the background as a batch job.

This has the following consequences:

  • The job will be executed in a cluster-aware fashion, meaning that it may be picked up by any free node on the cluster. This is good for ensuring even processing across a cluster.

This also has the following considerations:

  • The job must not execute any FHIR read operations
  • The job must not be order-dependent as the background job does not currently make any guarantee about in-order processing (a future update may resolve this)
  • The job must not use a hashing script

32.0.9.1Tracking Asynchronous Jobs

When ETL import jobs are run asynchronously, the data import happens as a background process. If you would like to see the status of running background jobs, the JSON Admin API Batch Job Fetch All Instances operation can be invoked.

To see the current status of the most recent 20 bulk import jobs (including running, completed, and failed jobs), invoke a URL similar to the following:

http://localhost:9000/batch2-jobs/modules/persistence/names/bulkImportJob/jobs?count=20&start=0

32.0.10Performance

 

See Performance Tuning for strategies to improve performance when using the ETL module.