Pubsub Model
The design of the PubSub support in GLIDE aims to unify various nuances into a coherent interface, minimizing differences between Sharded, Cluster, and Standalone configurations.
Additionally, GLIDE is responsible for tracking topology changes in real time, ensuring the client remains subscribed regardless of any connectivity issues. Conceptually, the PubSub functionality can be divided into four actions: Subscribing, Publishing, Receiving, and Unsubscribing.
Subscribing
Section titled “Subscribing”Subscribing in GLIDE differs from the canonical RESP3 protocol. To restore subscription state after a topology change or server disconnect, the subscription configuration is immutable and provided during client creation. Thus, while it is possible to use subscription commands such as SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE via the custom commands interface, using them alongside this model might lead to unpredictable behavior. The subscription configuration is applied to the servers using the following logic:
- Standalone mode: The subscriptions are applied to a random node, either a primary or one of the replicas.
- Cluster mode: For both Sharded and Non-sharded subscriptions, the Sharded semantics are used; the subscription is applied to the node holding the slot for the subscription’s channel/pattern.
Publishing
Section titled “Publishing”Publishing functionality is unified into a single command method with an optional/default parameter for Sharded mode (applicable only for Cluster mode clients). The routing logic for this command is as follows:
- Standalone mode: The command is routed to a primary node or a replica if
read-onlymode is configured. - Cluster mode: The command is routed to the server holding the slot for the command’s channel.
Receiving
Section titled “Receiving”There are three methods for receiving messages:
- Polling: A non-blocking method, typically named
tryGetMessage. It returns the next available message or nothing if no messages are available. - Async: An asynchronous method, returning a CompletableFuture, typically named
getMessage. - Callback: A user-provided callback function that receives the incoming message along with the user-provided context. Note that the callback code must be thread-safe for applicable languages.
The intended method is selected during client creation with the subscription configuration. When the configuration includes a callback (and an optional context), incoming messages will be passed to that callback as they arrive. Calls to the polling/async methods are prohibited and will fail. In the case of async/polling, incoming messages will be buffered in an unbounded buffer. The user should ensure timely extraction of incoming messages to avoid straining the memory subsystem.
Unsubscribing
Section titled “Unsubscribing”Since the subscription configuration is immutable and applied upon client creation, the model does not provide methods for unsubscribing during the client’s lifetime. Additionally, issuing commands such as UNSUBSCRIBE/PUNSUBSCRIBE/SUNSUBSCRIBE via the custom commands interface may lead to unpredictable behavior. Subscriptions will be removed from servers upon client closure, typically as a result of the client’s object destructor. Note that some languages, such as Python, might require an explicit call to a cleanup method, e.g.:
client.close()Examples
Section titled “Examples”Configuration with callback
Section titled “Configuration with callback”MessageCallback callback = (msg, context) -> System.out.printf("Received %s, context %s\n", msg, context);
GlideClientConfiguration config = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .requestTimeout(3000) // subscriptions are configured here .subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder() .subscription(EXACT, "ch1") // Listens for messages published to 'ch1' channel, in unsharded mode .subscription(EXACT, "ch2") // Listens for messages published to 'ch2' channel, in unsharded mode .subscription(PATTERN, "chat*") // Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode .callback(callback) .callback(callback, context) // callback or callback with context are configured here .build()) .build());try (var regularClient = GlideClient.createClient(config).get()) { // Do some work/wait - the callbacks will be dispatched on incomming messages} // unsubscribe happens hereTODO: Add Exampledef callback (msg: CoreCommands.PubSubMsg, context: Any): print(f"Received {msg}, context {context}\n")
listening_config = GlideClientConfiguration( [NodeAddress("localhost", 6379)], pubsub_subscriptions = GlideClientConfiguration.PubSubSubscriptions( # subscriptions are configured here channels_and_patterns={ GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"}, # Listens for messages published to 'ch1' and 'ch2' channel, in unsharded mode GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"} # Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode }, callback=callback, context=context, ))
publishing_config = GlideClientConfiguration( [NodeAddress("localhost", 6379)])
listening_client = await GlideClient.create(listening_config)publishing_client = await GlideClient.create(publishing_config)
# publish message on ch1 channelawait publishing_client.publish("Test message", "ch1")
# Do some work/wait - the callback will receive "Test message" message
await listening_client.close() # unsubscribe happens here// Create a callback and context namectx := "example"callback := func(message *PubSubMessage, context any) { fmt.Printf("Received %s, context %s\n", message.Message, context)}
// define subscriptionssConfig := NewStandaloneSubscriptionConfig(). WithSubscription(ExactChannelMode, "ch1"). WithSubscription(ExactChannelMode, "ch2"). WithSubscription(PatternChannelMode, "chat*"). WithCallback(callback, ctx)
// create configuration for subscriberconfigforsub := NewGlideClientConfiguration(). WithAddress(&NodeAddress{}). WithSubscriptionConfig(sConfig)
// create a client with a subscriptionNewGlideClient(configforsub)
publisher, _ := NewGlideClient(NewGlideClientConfiguration(). WithAddress(&NodeAddress{}))
publisher.Publish("ch1", "Hello from ch1")
// Do more work hereConfiguration without callback
Section titled “Configuration without callback”GlideClientConfiguration config = GlideClientConfiguration.builder() .address(NodeAddress.builder().port(6379).build()) .requestTimeout(3000) // subscriptions are configured here .subscriptionConfiguration(StandaloneSubscriptionConfiguration.builder() .subscription(EXACT, Set.of("ch1", "ch2")) // there is option to set multiple subscriptions at a time .subscription(Map.of(PATTERN, "chat*", EXACT, Set.of("ch1", "ch2"))) // or even all subscriptions at a time .build()) // no callback is configured .build())try (var regularClient = GlideClient.createClient(config).get()) { Message msg = regularClient.tryGetPubSubMessage(); // sync, does not block Message msg = regularClient.getPubSubMessage().get(); // async, waits for the next message} // unsubscribe happens hereTODO: Add Exmaplelistening_config = GlideClientConfiguration( [NodeAddress("localhost", 6379)], pubsub_subscriptions = GlideClientConfiguration.PubSubSubscriptions( # subscriptions are configured here channels_and_patterns={ GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"}, # Listens for messages published to 'ch1' and 'ch2' channel, in unsharded mode GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"} # Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode }, None, None, ))
publishing_config = GlideClientConfiguration( [NodeAddress("localhost", 6379)])
listening_client = await GlideClient.create(listening_config)publishing_client = await GlideClient.create(publishing_config)
# publish message on ch1 channelawait publishing_client.publish("Test message", "ch1")
# waits for "Test message" to arrivemessage = await listening_client.get_pubsub_message()
# returns None since only one message was publishedmessage = listening_client.try_get_pubsub_message()
await listening_client.close() # unsubscribe happens here// Create a signal channel to receive notifications of new messagessignalCh := make(chan struct{}, 1)
// Setup client and subscription configsConfig := NewStandaloneSubscriptionConfig(). WithSubscription(ExactChannelMode, "ch1"). WithSubscription(ExactChannelMode, "ch2"). WithSubscription(PatternChannelMode, "chat*")
client, _ := NewGlideClient(NewGlideClientConfiguration(). WithAddress(&NodeAddress{}). WithSubscriptionConfig(sConfig))defer client.Close()
// Get the message queue from the clientqueue, _ := client.GetQueue()
// Register our signal channelqueue.RegisterSignalChannel(signalCh)defer queue.UnregisterSignalChannel(signalCh)
// Create a context for cancellationctx, cancel := context.WithCancel(context.Background())defer cancel()
// Start processing messages in a goroutinego func() { for { select { case <-ctx.Done(): fmt.Println("Receiver shutting down...") return case <-signalCh: // Process all available messages for { if msg := queue.Pop(); msg != nil { fmt.Printf("Received message: %s on channel: %s\n", msg.Message, msg.Channel) } else { break // No more messages } } } }}()
// The main application can continue running...// When it's time to stop, just call cancel() or close the client