6.13.1Message Broker: Pulsar
Experimental

 

See the Message Broker page for a description of how message queues work by default in Smile CDR.

Apache Pulsar is a distributed messaging and streaming platform designed for high performance, low latency, and high availability. Smile CDR supports Pulsar as a message broker option for subscription processing, batch processing, mdm processing and other asynchronous tasks that require a message broker.

Set module.clustermgr.config.messagebroker.type to PULSAR in the Cluster Manager to have Smile CDR use Pulsar instead of a JMS message broker or Kafka for subscription processing.

Smile CDR uses the following configuration properties for Pulsar configuration:

module.clustermgr.config.messagebroker.type=PULSAR
module.clustermgr.config.pulsar.service.url=pulsar://localhost:6650
module.clustermgr.config.pulsar.admin.url=http://localhost:8080
module.clustermgr.config.pulsar.ack_timeout_seconds=30
module.clustermgr.config.pulsar.receiver.queue.size=100
module.clustermgr.config.pulsar.validate_topics_exist_before_use=false

The values above for the module.clustermgr.config.pulsar.* properties are the defaults, so you don't need to include these properties in your config if you are using the default value.

pulsar.service.url is the URL for the Pulsar service. The default is pulsar://localhost:6650 for a local Pulsar instance.

pulsar.admin.url is the URL for the Pulsar admin service. The default is http://localhost:8080 for a local Pulsar instance.

pulsar.ack_timeout_seconds specifies the timeout in seconds for acknowledging messages. The default is 30 seconds.

pulsar.receiver.queue.size specifies the size of the receiver queue. The default is 100.

pulsar.validate_topics_exist_before_use. Set this to true if your Pulsar broker is configured to prevent new topics from being automatically created. When this property is set to true, Smile will prevent subscriptions from being created or updated if the delivery topic they depend on doesn't exist yet.

Like the JMS Message Broker and Kafka, Pulsar uses the Cluster Manager module consumers_per_matching_queue and consumers_per_delivery_queue properties to determine the number of consumers for each topic.

6.13.2Pulsar Topic Names

 

For Pulsar brokers, the Smile CDR channels are synonymous with Pulsar topics. See Channel Names for details on how these topics will be named.

6.13.3Customizing Pulsar Configuration with Interceptors

 

Smile CDR provides interceptor hooks that allow you to customize Pulsar configuration beyond what is available through the standard configuration properties. These interceptors give you direct access to the Pulsar client's ConsumerBuilder and ProducerBuilder objects before they are used to create consumers and producers.

6.13.3.1Consumer Configuration

The PULSAR_CONSUMER_BUILDER interceptor hook allows you to modify the Pulsar ConsumerBuilder before it is used to create a consumer.

Example interceptor implementation:

import ca.cdr.api.fhir.interceptor.CdrPointcut;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;

@Interceptor
public class PulsarConsumerCustomizer {

	@CdrHook(CdrPointcut.PULSAR_CONSUMER_BUILDER)
    public void customizeConsumerBuilder(ConsumerBuilder<?> theConsumerBuilder) {
        // Customize the consumer builder
        theConsumerBuilder
            .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
            .maxTotalReceiverQueueSizeAcrossPartitions(50000)
            .autoUpdatePartitions(true);
    }
}

6.13.3.2Producer Configuration

The PULSAR_PRODUCER_BUILDER interceptor hook allows you to modify the Pulsar ProducerBuilder before it is used to create a producer.

Example interceptor implementation:

import ca.cdr.api.fhir.interceptor.CdrPointcut;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.CompressionType;

@Interceptor
public class PulsarProducerCustomizer {

	@CdrHook(CdrPointcut.PULSAR_PRODUCER_BUILDER)
    public void customizeProducerBuilder(ProducerBuilder<?> theProducerBuilder) {
        // Customize the producer builder
        theProducerBuilder
            .compressionType(CompressionType.ZSTD)
            .blockIfQueueFull(true)
            .enableBatching(true)
            .batchingMaxPublishDelay(10, java.util.concurrent.TimeUnit.MILLISECONDS)
            .batchingMaxMessages(1000);
    }
}

To use these interceptors, you need to register them with Smile CDR. See the Interceptors documentation for more information on how to register interceptors.

6.13.4Dead Letter Queue

 

For Pulsar, the Dead Letter Queue (DLQ) is named PULSAR.DLQ. This is where messages that cannot be delivered after the configured number of retries will be sent. See Dead Letter Queues for more information.