Skip to content

Pub/Sub: Hello World

In this tutorial, you will build a simple, real-time notification system using the Pub/Sub (Publish/Subscribe) feature in Valkey GLIDE.

By the end, you will:

  • Learn how to subscribe and publish messages to Valkey channels.
  • The different ways GLIDE can listen for messages.

Valkey’s Pub/Sub is a powerful messaging pattern that allows different parts of your application to communicate without being directly aware of each other. A publisher sends a message to a specific channel (like a radio station), and any subscribers tuned into that channel will receive the message instantly.

This is perfect for chat applications, live data feeds, and event notifications.

GLIDE makes this feature robust and easy to use. You define your subscriptions when you create the client, and GLIDE manages the subscription state for you. It ensures you stay subscribed, even if the connection temporarily drops and needs to be re-established.

Before you begin, you’ll need a few things:

  1. Python: Valkey supports Python 3.9 to 3.13.
  2. Valkey GLIDE: You’ll need the glide-client library. You can install it using pip:
    Terminal window
    pip install valkey-glide
  3. Valkey Server: You need a running Valkey instance. The easiest way to get one is with Docker. Run this command in your terminal to start a standalone Valkey server:
    Terminal window
    docker run -d --name standalone-valkey -p 6379:6379 -d valkey/valkey

Let’s build a simpe Hellow World example that publish a message to a channel then listen for the message.

  1. Create helloworld.py

    import asyncio
    from glide import GlideClient, NodeAddress, GlideClientConfiguration
    async def main():
    print("Starting Pub/Sub tutorial...")
    # Our code will go here
    if __name__ == "__main__":
    asyncio.run(main())
  2. Configure the Listening Client

    Let’s configure a client to listen to two exact channels ('ch1' and 'ch2') and any channel matching the pattern 'chat*'.

    Add this code inside your main function:

    # 1. Configure the listening client
    listening_config = GlideClientConfiguration(
    addresses=[NodeAddress("localhost", 6379)],
    pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions(
    channels_and_patterns={
    # Listens for messages published to 'ch1' and 'ch2'
    GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"},
    # Listens for messages to channels matching 'chat*'
    GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"}
    },
    callback=None,
    context=None,
    )
    )

    We set callback=None because we want to manually fetch messages using an async method, rather than having GLIDE call a function for us.

  3. Configure the Publishing Client

    Next, we configure a separate, regular client just for publishing messages. This client has no special subscription configuration.

    Add this right after the listening_config:

    # 2. Configure the publishing client
    publishing_config = GlideClientConfiguration(
    addresses=[NodeAddress("localhost", 6379)]
    )
  4. Create Clients and Publish a Message

    Now, let’s create our clients and send our first message. They will be configured using the previously created configurations. We’ll use a try...finally block to make sure the clients are always closed properly.

    listening_client = None
    publishing_client = None
    try:
    # 3. Create the clients
    listening_client = await GlideClient.create(listening_config)
    publishing_client = await GlideClient.create(publishing_config)
    # 4. Publish a message to 'ch1'
    print("Publishing message to 'ch1'...")
    # The return value is the number of clients that received the message.
    subscribers_count = await publishing_client.publish("Hello from GLIDE!", "ch1")
    print(f"Message published to {subscribers_count} subscriber(s).")
    finally:
    # 7. Clean up clients
    if listening_client:
    await listening_client.close()
    print("Listening client closed.")
    if publishing_client:
    await publishing_client.close()
    print("Publishing client closed.")

    A best practice with Pub/Sub is to use a dedicated client for your subscriptions. A client in subscription mode is only focused on listening and cannot run other commands. GLIDE enforces this by having you define subscriptions when you create the client.

  5. Receive the Message Asynchronously

    The message is now on its way! When our listening_client receives it, the message is stored in an internal buffer, waiting for us to retrieve it.

    To get the message, we use await listening_client.get_pubsub_message(). This is an asynchronous method that waits until a message is available and then returns it.

    Add this code right after the publishing_client.publish call (inside the try block):

    # 5. Receive the message (asynchronously)
    print("Waiting to receive message...")
    message = await listening_client.get_pubsub_message()
    if message:
    print(f"Received message: '{message.message}' on channel: '{message.channel}'")
    else:
    print("No message received.")
  6. Check for More Messages (Non-blocking)

    What if there are no more messages? If we call await listening_client.get_pubsub_message() again, our program will pause and wait forever (or until a new message arrives).

    Instead, we can use listening_client.try_get_pubsub_message(). This is a non-blocking method. It returns a message only if one is immediately available in the buffer. Otherwise, it returns None right away.

    Add this final piece of logic:

    # 6. Try to get another message (non-blocking)
    print("Trying to get another message...")
    message = listening_client.try_get_pubsub_message()
    if message:
    print(f"Found another message: '{message.message}'")
    else:
    print("No more messages in the buffer. Done.")

When you run it, you should see this output:

python helloworld.py
Starting Pub/Sub tutorial...
Publishing message to 'ch1'...
Message published to 1 subscriber(s).
Waiting to receive message...
Received message: 'Hello from GLIDE!' on channel: 'ch1'
Trying to get another message...
No more messages in the buffer. Done.
Listening client closed.
Publishing client closed.
The Complete Application
import asyncio
from glide import GlideClient, NodeAddress, GlideClientConfiguration
async def main():
print("Starting Pub/Sub tutorial...")
# 1. Configure the listening client
listening_config = GlideClientConfiguration(
addresses=[NodeAddress("localhost", 6379)],
pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions(
channels_and_patterns={
# Listens for messages published to 'ch1' and 'ch2'
GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"},
# Listens for messages to channels matching 'chat*'
GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"}
},
callback=None,
context=None,
)
)
# 2. Configure the publishing client
publishing_config = GlideClientConfiguration(
addresses=[NodeAddress("localhost", 6379)]
)
listening_client = None
publishing_client = None
try:
# 3. Create the clients
listening_client = await GlideClient.create(listening_config)
publishing_client = await GlideClient.create(publishing_config)
# 4. Publish a message to 'ch1'
print("Publishing message to 'ch1'...")
subscribers_count = await publishing_client.publish("Hello from GLIDE!", "ch1")
print(f"Message published to {subscribers_count} subscriber(s).")
# 5. Receive the message (asynchronously)
print("Waiting to receive message...")
message = await listening_client.get_pubsub_message()
if message:
print(f"Received message: '{message.message}' on channel: '{message.channel}'")
else:
print("No message received.")
# 6. Try to get another message (non-blocking)
print("Trying to get another message...")
message = listening_client.try_get_pubsub_message()
if message:
print(f"Found another message: '{message.message}'")
else:
print("No more messages in the buffer. Done.")
finally:
# 7. Clean up clients
if listening_client:
await listening_client.close()
print("Listening client closed.")
if publishing_client:
await publishing_client.close()
print("Publishing client closed.")
if __name__ == "__main__":
asyncio.run(main())

In the previous example, we pulled messages by calling await listening_client.get_pubsub_message(). For many event-driven applications, it’s more efficient to have GLIDE push messages to your code as soon as they arrive.

We can do this by providing a callback function when configuring the client. GLIDE will automatically run this function for every message received.

  1. Define Your Callback Function

    First, let’s define a function that will handle our messages. This function will be called by GLIDE (potentially from a background thread), so any code you write inside it should be thread-safe.

    It must accept two arguments: the message object and the context you provide during configuration.

    from typing import Any
    from glide.async_commands.core import PubSubMsg
    def my_callback(msg: PubSubMsg, context: Any):
    """
    This function will be called by GLIDE for every message.
    """
    print(f"\n--- CALLBACK TRIGGERED ---")
    print(f"Received message: '{msg.message}' on channel '{msg.channel}'")
    print(f"Received context: {context}")
    print(f"--------------------------\n")
  2. Update Client Configuration

    Now, let’s update our listening_config to use this callback. We’ll also define a simple string variable app_context to pass as the context. This context object can be any Python object you want—it’s useful for passing state or identifiers to your callback.

    # Define a context object to pass to the callback
    app_context = "my_notification_service_v1"
    # 1. Configure the listening client with a callback
    listening_config = GlideClientConfiguration(
    addresses=[NodeAddress("localhost", 6379)],
    pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions(
    channels_and_patterns={
    GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"},
    GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"}
    },
    # Set the callback and context here
    callback=None,
    callback=my_callback,
    context=None,
    context=app_context,
    )
    )
  3. Run the Application

    The main logic is now simpler. We don’t need to call get_pubsub_message anymore. We just publish a message and then wait for a moment. The my_callback function will be triggered automatically in the background.

    # 4. Publish a message to 'ch1'
    print("MAIN: Publishing message to 'ch1'...")
    await publishing_client.publish("Hello from the callback!", "ch1")
    # 5. Wait for the callback to fire
    # In a real app, this would be a long-running process
    # (e.g., a web server, or `while True: await asyncio.sleep(1)`)
    # We sleep here just to give the callback time to run.
    print("MAIN: Waiting for 1 second...")
    await asyncio.sleep(1)
    print("MAIN: Wait complete. Shutting down.")

Running the example should produce the following outputs.

Starting Pub/Sub callback tutorial...
MAIN: Publishing message to 'ch1'...
--- CALLBACK TRIGGERED ---
Received message: 'b'Hello from the callback!'' on channel 'b'ch1''
Received context: my_notification_service_v1
--------------------------
MAIN: Message published to 1 subscriber(s).
MAIN: Waiting for 1 second to receive message...
MAIN: Wait complete. Shutting down.
MAIN: Listening client closed.
MAIN: Publishing client closed.
The Full Example
import asyncio
from glide import GlideClient, NodeAddress, GlideClientConfiguration
from typing import Any
from glide.async_commands.core import PubSubMsg
def my_callback(msg: PubSubMsg, context: Any):
"""
This function will be called by GLIDE for every message.
"""
# Using newlines for clear output in the console
print(f"\n--- CALLBACK TRIGGERED ---")
print(f"Received message: '{msg.message}' on channel '{msg.channel}'")
print(f"Received context: {context}")
print(f"--------------------------\n")
async def main():
print("Starting Pub/Sub callback tutorial...")
# Define a context object to pass to the callback
app_context = "my_notification_service_v1"
# 1. Configure the listening client with a callback
listening_config = GlideClientConfiguration(
addresses=[NodeAddress("localhost", 6379)],
pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions(
channels_and_patterns={
GlideClientConfiguration.PubSubChannelModes.Exact: {"ch1", "ch2"},
GlideClientConfiguration.PubSubChannelModes.Pattern: {"chat*"}
},
# Set the callback and context here
callback=my_callback,
context=app_context,
)
)
# 2. Configure the publishing client
publishing_config = GlideClientConfiguration(
addresses=[NodeAddress("localhost", 6379)]
)
listening_client = None
publishing_client = None
try:
# 3. Create the clients
listening_client = await GlideClient.create(listening_config)
publishing_client = await GlideClient.create(publishing_config)
# 4. Publish a message to 'ch1'
print("MAIN: Publishing message to 'ch1'...")
subscribers_count = await publishing_client.publish("Hello from the callback!", "ch1")
print(f"MAIN: Message published to {subscribers_count} subscriber(s).")
# 5. Wait for the callback to fire
print("MAIN: Waiting for 1 second to receive message...")
await asyncio.sleep(1)
print("MAIN: Wait complete. Shutting down.")
finally:
# 6. Clean up clients
if listening_client:
await listening_client.close()
print("MAIN: Listening client closed.")
if publishing_client:
await publishing_client.close()
print("MAIN: Publishing client closed.")
if __name__ == "__main__":
asyncio.run(main())

Congratulations! You’ve successfully built a simple application using Valkey GLIDE’s Pub/Sub feature.

In this tutorial, you learned how to:

  • Configure a dedicated listening client with specific channel and pattern subscriptions.
  • Configure a separate publishing client to send messages.
  • Use the asynchronous get_pubsub_message() to wait for and receive messages.
  • Use the non-blocking try_get_pubsub_message() to check for messages without waiting.
  • Use the event-driven callback hook to process messages without waiting.

This pattern is the foundation for building robust, decoupled applications.