Skip to content

Pubsub Model

The design of the PubSub support in GLIDE aims to unify various nuances into a coherent interface, minimizing differences between Sharded, Cluster, and Standalone configurations.

Additionally, GLIDE is responsible for tracking topology changes in real time, ensuring the client remains subscribed regardless of any connectivity issues. Conceptually, the PubSub functionality can be divided into four actions: Subscribing, Publishing, Receiving, and Unsubscribing.

Subscribing in GLIDE differs from the canonical RESP3 protocol. To restore subscription state after a topology change or server disconnect, the subscription configuration is immutable and provided during client creation. Thus, while it is possible to use subscription commands such as SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE via the custom commands interface, using them alongside this model might lead to unpredictable behavior. The subscription configuration is applied to the servers using the following logic:

  • Standalone mode: The subscriptions are applied to a random node, either a primary or one of the replicas.
  • Cluster mode: For both Sharded and Non-sharded subscriptions, the Sharded semantics are used; the subscription is applied to the node holding the slot for the subscription’s channel/pattern.

Publishing functionality is unified into a single command method with an optional/default parameter for Sharded mode (applicable only for Cluster mode clients). The routing logic for this command is as follows:

  • Standalone mode: The command is routed to a primary node or a replica if read-only mode is configured.
  • Cluster mode: The command is routed to the server holding the slot for the command’s channel.

There are three methods for receiving messages:

  • Polling: A non-blocking method, typically named tryGetMessage. It returns the next available message or nothing if no messages are available.
  • Async: An asynchronous method, returning a CompletableFuture, typically named getMessage.
  • Callback: A user-provided callback function that receives the incoming message along with the user-provided context. Note that the callback code must be thread-safe for applicable languages.

The intended method is selected during client creation with the subscription configuration. When the configuration includes a callback (and an optional context), incoming messages will be passed to that callback as they arrive. Calls to the polling/async methods are prohibited and will fail. In the case of async/polling, incoming messages will be buffered in an unbounded buffer. The user should ensure timely extraction of incoming messages to avoid straining the memory subsystem.

Since the subscription configuration is immutable and applied upon client creation, the model does not provide methods for unsubscribing during the client’s lifetime. Additionally, issuing commands such as UNSUBSCRIBE/PUNSUBSCRIBE/SUNSUBSCRIBE via the custom commands interface may lead to unpredictable behavior. Subscriptions will be removed from servers upon client closure, typically as a result of the client’s object destructor. Note that some languages, such as Python, might require an explicit call to a cleanup method, e.g.:

client.close()
MessageCallback callback =
(msg, context) -> System.out.printf("Received %s, context %s\n", msg, context);
GlideClientConfiguration config = GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(6379).build())
.requestTimeout(3000)
// subscriptions are configured here
.subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder()
.subscription(EXACT, "ch1") // Listens for messages published to 'ch1' channel, in unsharded mode
.subscription(EXACT, "ch2") // Listens for messages published to 'ch2' channel, in unsharded mode
.subscription(PATTERN, "chat*") // Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode
.callback(callback)
.callback(callback, context) // callback or callback with context are configured here
.build())
.build());
try (var regularClient = GlideClient.createClient(config).get()) {
// Do some work/wait - the callbacks will be dispatched on incomming messages
} // unsubscribe happens here
GlideClientConfiguration config = GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(6379).build())
.requestTimeout(3000)
// subscriptions are configured here
.subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder()
.subscription(EXACT, Set.of("ch1", "ch2")) // there is option to set multiple subscriptions at a time
.subscription(Map.of(PATTERN, "chat*", EXACT, Set.of("ch1", "ch2")))
// or even all subscriptions at a time
.build()) // no callback is configured
.build())
try (var regularClient = GlideClient.createClient(config).get()) {
Message msg = regularClient.tryGetPubSubMessage(); // sync, does not block
Message msg = regularClient.getPubSubMessage().get(); // async, waits for the next message
} // unsubscribe happens here