Pub/Sub: Hello World
In this tutorial, you will build a simple, real-time notification system using the Pub/Sub (Publish/Subscribe) feature in Valkey GLIDE.
By the end, you will:
- Learn how to subscribe and publish messages to Valkey channels.
- The different ways GLIDE can listen for messages.
Valkey Pub/Sub
Section titled “Valkey Pub/Sub”Valkey’s Pub/Sub is a powerful messaging pattern that allows different parts of your application to communicate without being directly aware of each other. A publisher sends a message to a specific channel (like a radio station), and any subscribers tuned into that channel will receive the message instantly.
This is perfect for chat applications, live data feeds, and event notifications.
GLIDE makes this feature robust and easy to use. You define your subscriptions when you create the client, and GLIDE manages the subscription state for you. It ensures you stay subscribed, even if the connection temporarily drops and needs to be re-established.
Prerequisites
Section titled “Prerequisites”Before you begin, you’ll need a few things:
- Python: Valkey supports Python 3.9 to 3.13.
- Valkey GLIDE: You’ll need the
glide-clientlibrary. You can install it using pip:Terminal window pip install valkey-glide - Valkey Server: You need a running Valkey instance. The easiest way to get one is with Docker. Run this command in your terminal to start a standalone Valkey server:
Terminal window docker run -d --name standalone-valkey -p 6379:6379 -d valkey/valkey
Hello World
Section titled “Hello World”Let’s build a simpe Hellow World example that publish a message to a channel then listen for the message.
-
Create
helloworld.pyimport asynciofrom glide import GlideClient, NodeAddress, GlideClientConfigurationasync def main():print("Starting Pub/Sub tutorial...")# Our code will go hereif __name__ == "__main__":asyncio.run(main()) -
Configure the Listening Client
Let’s configure a client to listen to two exact channels (
'ch1'and'ch2') and any channel matching the pattern'chat*'.Add this code inside your
mainfunction:# 1. Configure the listening clientlistening_config = GlideClientConfiguration(addresses=[NodeAddress("localhost", 6379)],pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions(channels_and_patterns={# Listens for messages published to 'ch1' and 'ch2'GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"},# Listens for messages to channels matching 'chat*'GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"}},callback=None,context=None,))We set
callback=Nonebecause we want to manually fetch messages using an async method, rather than having GLIDE call a function for us. -
Configure the Publishing Client
Next, we configure a separate, regular client just for publishing messages. This client has no special subscription configuration.
Add this right after the
listening_config:# 2. Configure the publishing clientpublishing_config = GlideClientConfiguration(addresses=[NodeAddress("localhost", 6379)]) -
Create Clients and Publish a Message
Now, let’s create our clients and send our first message. They will be configured using the previously created configurations. We’ll use a
try...finallyblock to make sure the clients are always closed properly.listening_client = Nonepublishing_client = Nonetry:# 3. Create the clientslistening_client = await GlideClient.create(listening_config)publishing_client = await GlideClient.create(publishing_config)# 4. Publish a message to 'ch1'print("Publishing message to 'ch1'...")# The return value is the number of clients that received the message.subscribers_count = await publishing_client.publish("Hello from GLIDE!", "ch1")print(f"Message published to {subscribers_count} subscriber(s).")finally:# 7. Clean up clientsif listening_client:await listening_client.close()print("Listening client closed.")if publishing_client:await publishing_client.close()print("Publishing client closed.")A best practice with Pub/Sub is to use a dedicated client for your subscriptions. A client in subscription mode is only focused on listening and cannot run other commands. GLIDE enforces this by having you define subscriptions when you create the client.
-
Receive the Message Asynchronously
The message is now on its way! When our
listening_clientreceives it, the message is stored in an internal buffer, waiting for us to retrieve it.To get the message, we use
await listening_client.get_pubsub_message(). This is an asynchronous method that waits until a message is available and then returns it.Add this code right after the
publishing_client.publishcall (inside thetryblock):# 5. Receive the message (asynchronously)print("Waiting to receive message...")message = await listening_client.get_pubsub_message()if message:print(f"Received message: '{message.message}' on channel: '{message.channel}'")else:print("No message received.") -
Check for More Messages (Non-blocking)
What if there are no more messages? If we call
await listening_client.get_pubsub_message()again, our program will pause and wait forever (or until a new message arrives).Instead, we can use
listening_client.try_get_pubsub_message(). This is a non-blocking method. It returns a message only if one is immediately available in the buffer. Otherwise, it returnsNoneright away.Add this final piece of logic:
# 6. Try to get another message (non-blocking)print("Trying to get another message...")message = listening_client.try_get_pubsub_message()if message:print(f"Found another message: '{message.message}'")else:print("No more messages in the buffer. Done.")
When you run it, you should see this output:
python helloworld.pyStarting Pub/Sub tutorial...Publishing message to 'ch1'...Message published to 1 subscriber(s).Waiting to receive message...Received message: 'Hello from GLIDE!' on channel: 'ch1'Trying to get another message...No more messages in the buffer. Done.Listening client closed.Publishing client closed.The Complete Application
import asynciofrom glide import GlideClient, NodeAddress, GlideClientConfiguration
async def main(): print("Starting Pub/Sub tutorial...")
# 1. Configure the listening client listening_config = GlideClientConfiguration( addresses=[NodeAddress("localhost", 6379)], pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions( channels_and_patterns={ # Listens for messages published to 'ch1' and 'ch2' GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"}, # Listens for messages to channels matching 'chat*' GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"} }, callback=None, context=None, ) )
# 2. Configure the publishing client publishing_config = GlideClientConfiguration( addresses=[NodeAddress("localhost", 6379)] )
listening_client = None publishing_client = None try: # 3. Create the clients listening_client = await GlideClient.create(listening_config) publishing_client = await GlideClient.create(publishing_config)
# 4. Publish a message to 'ch1' print("Publishing message to 'ch1'...") subscribers_count = await publishing_client.publish("Hello from GLIDE!", "ch1") print(f"Message published to {subscribers_count} subscriber(s).")
# 5. Receive the message (asynchronously) print("Waiting to receive message...") message = await listening_client.get_pubsub_message() if message: print(f"Received message: '{message.message}' on channel: '{message.channel}'") else: print("No message received.")
# 6. Try to get another message (non-blocking) print("Trying to get another message...") message = listening_client.try_get_pubsub_message() if message: print(f"Found another message: '{message.message}'") else: print("No more messages in the buffer. Done.")
finally: # 7. Clean up clients if listening_client: await listening_client.close() print("Listening client closed.") if publishing_client: await publishing_client.close() print("Publishing client closed.")
if __name__ == "__main__": asyncio.run(main())Using Callbacks
Section titled “Using Callbacks”In the previous example, we pulled messages by calling await listening_client.get_pubsub_message().
For many event-driven applications, it’s more efficient to have GLIDE push messages to your code as soon as they arrive.
We can do this by providing a callback function when configuring the client. GLIDE will automatically run this function for every message received.
-
Define Your Callback Function
First, let’s define a function that will handle our messages. This function will be called by GLIDE (potentially from a background thread), so any code you write inside it should be thread-safe.
It must accept two arguments: the message object and the context you provide during configuration.
from typing import Anyfrom glide.async_commands.core import PubSubMsgdef my_callback(msg: PubSubMsg, context: Any):"""This function will be called by GLIDE for every message."""print(f"\n--- CALLBACK TRIGGERED ---")print(f"Received message: '{msg.message}' on channel '{msg.channel}'")print(f"Received context: {context}")print(f"--------------------------\n") -
Update Client Configuration
Now, let’s update our
listening_configto use this callback. We’ll also define a simple string variableapp_contextto pass as thecontext. This context object can be any Python object you want—it’s useful for passing state or identifiers to your callback.# Define a context object to pass to the callbackapp_context = "my_notification_service_v1"# 1. Configure the listening client with a callbacklistening_config = GlideClientConfiguration(addresses=[NodeAddress("localhost", 6379)],pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions(channels_and_patterns={GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"},GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"}},# Set the callback and context herecallback=None,callback=my_callback,context=None,context=app_context,)) -
Run the Application
The main logic is now simpler. We don’t need to call
get_pubsub_messageanymore. We just publish a message and then wait for a moment. Themy_callbackfunction will be triggered automatically in the background.# 4. Publish a message to 'ch1'print("MAIN: Publishing message to 'ch1'...")await publishing_client.publish("Hello from the callback!", "ch1")# 5. Wait for the callback to fire# In a real app, this would be a long-running process# (e.g., a web server, or `while True: await asyncio.sleep(1)`)# We sleep here just to give the callback time to run.print("MAIN: Waiting for 1 second...")await asyncio.sleep(1)print("MAIN: Wait complete. Shutting down.")
Running the example should produce the following outputs.
Starting Pub/Sub callback tutorial...MAIN: Publishing message to 'ch1'...
--- CALLBACK TRIGGERED ---Received message: 'b'Hello from the callback!'' on channel 'b'ch1''Received context: my_notification_service_v1--------------------------
MAIN: Message published to 1 subscriber(s).MAIN: Waiting for 1 second to receive message...MAIN: Wait complete. Shutting down.MAIN: Listening client closed.MAIN: Publishing client closed.The Full Example
import asynciofrom glide import GlideClient, NodeAddress, GlideClientConfigurationfrom typing import Anyfrom glide.async_commands.core import PubSubMsg
def my_callback(msg: PubSubMsg, context: Any): """ This function will be called by GLIDE for every message. """ # Using newlines for clear output in the console print(f"\n--- CALLBACK TRIGGERED ---") print(f"Received message: '{msg.message}' on channel '{msg.channel}'") print(f"Received context: {context}") print(f"--------------------------\n")
async def main(): print("Starting Pub/Sub callback tutorial...")
# Define a context object to pass to the callback app_context = "my_notification_service_v1"
# 1. Configure the listening client with a callback listening_config = GlideClientConfiguration( addresses=[NodeAddress("localhost", 6379)], pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions( channels_and_patterns={ GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"}, GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"} }, # Set the callback and context here callback=my_callback, context=app_context, ) )
# 2. Configure the publishing client publishing_config = GlideClientConfiguration( addresses=[NodeAddress("localhost", 6379)] )
listening_client = None publishing_client = None try: # 3. Create the clients listening_client = await GlideClient.create(listening_config) publishing_client = await GlideClient.create(publishing_config)
# 4. Publish a message to 'ch1' print("MAIN: Publishing message to 'ch1'...") subscribers_count = await publishing_client.publish("Hello from the callback!", "ch1") print(f"MAIN: Message published to {subscribers_count} subscriber(s).")
# 5. Wait for the callback to fire print("MAIN: Waiting for 1 second to receive message...") await asyncio.sleep(1)
print("MAIN: Wait complete. Shutting down.")
finally: # 6. Clean up clients if listening_client: await listening_client.close() print("MAIN: Listening client closed.") if publishing_client: await publishing_client.close() print("MAIN: Publishing client closed.")
if __name__ == "__main__": asyncio.run(main())Summary
Section titled “Summary”Congratulations! You’ve successfully built a simple application using Valkey GLIDE’s Pub/Sub feature.
In this tutorial, you learned how to:
- Configure a dedicated listening client with specific channel and pattern subscriptions.
- Configure a separate publishing client to send messages.
- Use the asynchronous
get_pubsub_message()to wait for and receive messages. - Use the non-blocking
try_get_pubsub_message()to check for messages without waiting. - Use the event-driven
callbackhook to process messages without waiting.
This pattern is the foundation for building robust, decoupled applications.