Message Broker: Pulsar
See the Message Broker page for a description of how message queues work by default in Smile CDR.
Apache Pulsar is a distributed messaging and streaming platform designed for high performance, low latency, and high availability. Smile CDR supports Pulsar as a message broker option for subscription processing, batch processing, mdm processing and other asynchronous tasks that require a message broker.
Set module.clustermgr.config.messagebroker.type
to PULSAR
in the Cluster Manager to have Smile CDR use Pulsar instead of a JMS message broker or Kafka for subscription processing.
Smile CDR uses the following configuration properties for Pulsar configuration:
module.clustermgr.config.messagebroker.type=PULSAR
module.clustermgr.config.pulsar.service.url=pulsar://localhost:6650
module.clustermgr.config.pulsar.admin.url=http://localhost:8080
module.clustermgr.config.pulsar.ack_timeout_seconds=30
module.clustermgr.config.pulsar.receiver.queue.size=100
module.clustermgr.config.pulsar.validate_topics_exist_before_use=false
The values above for the module.clustermgr.config.pulsar.*
properties are the defaults, so you don't need to include these properties in your config if you are using the default value.
pulsar.service.url
is the URL for the Pulsar service. The default is pulsar://localhost:6650
for a local Pulsar instance.
pulsar.admin.url
is the URL for the Pulsar admin service. The default is http://localhost:8080
for a local Pulsar instance.
pulsar.ack_timeout_seconds
specifies the timeout in seconds for acknowledging messages. The default is 30 seconds.
pulsar.receiver.queue.size
specifies the size of the receiver queue. The default is 100.
pulsar.validate_topics_exist_before_use
. Set this to true if your Pulsar broker is configured to prevent new topics from being automatically created. When this property is set to true, Smile will prevent subscriptions from being created or updated if the delivery topic they depend on doesn't exist yet.
Like the JMS Message Broker and Kafka, Pulsar uses the Cluster Manager module consumers_per_matching_queue
and consumers_per_delivery_queue
properties to determine the number of consumers for each topic.
For Pulsar brokers, the Smile CDR channels are synonymous with Pulsar topics. See Channel Names for details on how these topics will be named.
Smile CDR provides interceptor hooks that allow you to customize Pulsar configuration beyond what is available through the standard configuration properties. These interceptors give you direct access to the Pulsar client's ConsumerBuilder and ProducerBuilder objects before they are used to create consumers and producers.
The PULSAR_CONSUMER_BUILDER
interceptor hook allows you to modify the Pulsar ConsumerBuilder before it is used to create a consumer.
Example interceptor implementation:
import ca.cdr.api.fhir.interceptor.CdrPointcut;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@Interceptor
public class PulsarConsumerCustomizer {
@CdrHook(CdrPointcut.PULSAR_CONSUMER_BUILDER)
public void customizeConsumerBuilder(ConsumerBuilder<?> theConsumerBuilder) {
// Customize the consumer builder
theConsumerBuilder
.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.maxTotalReceiverQueueSizeAcrossPartitions(50000)
.autoUpdatePartitions(true);
}
}
The PULSAR_PRODUCER_BUILDER
interceptor hook allows you to modify the Pulsar ProducerBuilder before it is used to create a producer.
Example interceptor implementation:
import ca.cdr.api.fhir.interceptor.CdrPointcut;
import ca.uhn.fhir.interceptor.api.Hook;
import ca.uhn.fhir.interceptor.api.Interceptor;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.CompressionType;
@Interceptor
public class PulsarProducerCustomizer {
@CdrHook(CdrPointcut.PULSAR_PRODUCER_BUILDER)
public void customizeProducerBuilder(ProducerBuilder<?> theProducerBuilder) {
// Customize the producer builder
theProducerBuilder
.compressionType(CompressionType.ZSTD)
.blockIfQueueFull(true)
.enableBatching(true)
.batchingMaxPublishDelay(10, java.util.concurrent.TimeUnit.MILLISECONDS)
.batchingMaxMessages(1000);
}
}
To use these interceptors, you need to register them with Smile CDR. See the Interceptors documentation for more information on how to register interceptors.
For Pulsar, the Dead Letter Queue (DLQ) is named PULSAR.DLQ
. This is where messages that cannot be delivered after the configured number of retries will be sent. See Dead Letter Queues for more information.
You are about to leave the Smile Digital Health documentation and navigate to the Open Source HAPI-FHIR Documentation.