Skip to content

Cluster Batch

Home / python / cluster_batch

glide.async_commands.batch.ClusterBatch

Bases: BaseBatch

Batch implementation for cluster GlideClusterClient. Batches allow the execution of a group of commands in a single step.

Batch Response

An array of command responses is returned by the client exec command, in the order they were given. Each element in the array represents a command given to the ClusterBatch. The response for each command depends on the executed Valkey command. Specific response types are documented alongside each method.

Parameters:

Name Type Description Default
is_atomic bool

Determines whether the batch is atomic or non-atomic. If True, the batch will be executed as an atomic transaction. If False, the batch will be executed as a non-atomic pipeline.

required

See Valkey Transactions (Atomic Batches) and Valkey Pipelines (Non-Atomic Batches) for details.

Examples:

Atomic Batch - Transaction in a Cluster:
>>> transaction = ClusterBatch(is_atomic=True)  # Atomic (Transaction)
>>> transaction.set("key", "value")
>>> transaction.get("key")
>>> result = await client.exec(transaction, false)
>>> print(result)
[OK, b"value"]
Non-Atomic Batch - Pipeline in a Cluster:
>>> pipeline = ClusterBatch(is_atomic=False)  # Non-Atomic (Pipeline)
>>> pipeline.set("key1", "value1")
>>> pipeline.set("key2", "value2")
>>> pipeline.get("key1")
>>> pipeline.get("key2")
>>> result = await client.exec(pipeline, false)
>>> print(result)
[OK, OK, b"value1", b"value2"]
Source code in glide/async_commands/batch.py
5439
5440
5441
5442
5443
5444
5445
5446
5447
5448
5449
5450
5451
5452
5453
5454
5455
5456
5457
5458
5459
5460
5461
5462
5463
5464
5465
5466
5467
5468
5469
5470
5471
5472
5473
5474
5475
5476
5477
5478
5479
5480
5481
5482
5483
5484
5485
5486
5487
5488
5489
5490
5491
5492
5493
5494
5495
5496
5497
5498
5499
5500
5501
5502
5503
5504
5505
5506
5507
5508
5509
5510
5511
5512
5513
5514
5515
5516
5517
5518
5519
5520
5521
5522
5523
5524
5525
5526
5527
5528
5529
5530
5531
5532
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543
5544
5545
5546
5547
5548
5549
5550
5551
5552
5553
5554
5555
5556
5557
5558
5559
5560
5561
5562
5563
5564
5565
5566
5567
5568
5569
5570
5571
class ClusterBatch(BaseBatch):
    """
    Batch implementation for cluster GlideClusterClient. Batches allow the execution of a group
    of commands in a single step.

    Batch Response:
        An ``array`` of command responses is returned by the client ``exec`` command,
        in the order they were given. Each element in the array represents a command given to the ClusterBatch.
        The response for each command depends on the executed Valkey command.
        Specific response types are documented alongside each method.

    Args:
        is_atomic (bool): Determines whether the batch is atomic or non-atomic. If ``True``,
            the batch will be executed as an atomic transaction. If ``False``,
            the batch will be executed as a non-atomic pipeline.

    See [Valkey Transactions (Atomic Batches)](https://valkey.io/topics/transactions/) and [Valkey Pipelines (Non-Atomic Batches)](https://valkey.io/topics/pipelining/) for details.

    Examples:
        ### Atomic Batch - Transaction in a Cluster:
        >>> transaction = ClusterBatch(is_atomic=True)  # Atomic (Transaction)
        >>> transaction.set("key", "value")
        >>> transaction.get("key")
        >>> result = await client.exec(transaction, false)
        >>> print(result)
        [OK, b"value"]

        ### Non-Atomic Batch - Pipeline in a Cluster:
        >>> pipeline = ClusterBatch(is_atomic=False)  # Non-Atomic (Pipeline)
        >>> pipeline.set("key1", "value1")
        >>> pipeline.set("key2", "value2")
        >>> pipeline.get("key1")
        >>> pipeline.get("key2")
        >>> result = await client.exec(pipeline, false)
        >>> print(result)
        [OK, OK, b"value1", b"value2"]

    """

    def copy(
        self,
        source: TEncodable,
        destination: TEncodable,
        replace: Optional[bool] = None,
    ) -> "ClusterBatch":
        """
        Copies the value stored at the `source` to the `destination` key. When `replace` is True,
        removes the `destination` key first if it already exists, otherwise performs no action.

        See [valkey.io](https://valkey.io/commands/copy) for more details.

        Args:
            source (TEncodable): The key to the source value.
            destination (TEncodable): The key where the value should be copied to.
            replace (Optional[bool]): If the destination key should be removed before copying the value to it.

        Command response:
            bool: True if the source was copied.

            Otherwise, return False.

        Since: Valkey version 6.2.0.
        """
        args = [source, destination]
        if replace is not None:
            args.append("REPLACE")

        return self.append_command(RequestType.Copy, args)

    def publish(
        self, message: str, channel: str, sharded: bool = False
    ) -> "ClusterBatch":
        """
        Publish a message on pubsub channel.
        This command aggregates PUBLISH and SPUBLISH commands functionalities.
        The mode is selected using the 'sharded' parameter

        See [PUBLISH](https://valkey.io/commands/publish) and [SPUBLISH](https://valkey.io/commands/spublish)
        for more details.

        Args:
            message (str): Message to publish
            channel (str): Channel to publish the message on.
            sharded (bool): Use sharded pubsub mode. Available since Valkey version 7.0.

        Returns:
            int: Number of subscriptions in that shard that received the message.
        """
        return self.append_command(
            RequestType.SPublish if sharded else RequestType.Publish, [channel, message]
        )

    def pubsub_shardchannels(
        self, pattern: Optional[TEncodable] = None
    ) -> "ClusterBatch":
        """
        Lists the currently active shard channels.

        See [valkey.io](https://valkey.io/commands/pubsub-shardchannels) for details.

        Args:
            pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels.
                If not provided, all active shard channels are returned.

        Command response:
            List[bytes]: A list of currently active shard channels matching the given pattern.

            If no pattern is specified, all active shard channels are returned.
        """
        command_args = [pattern] if pattern is not None else []
        return self.append_command(RequestType.PubSubShardChannels, command_args)

    def pubsub_shardnumsub(
        self, channels: Optional[List[TEncodable]] = None
    ) -> "ClusterBatch":
        """
        Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.

        Note:
            It is valid to call this command without channels. In this case, it will just return an empty map.

        See [valkey.io](https://valkey.io/commands/pubsub-shardnumsub) for details.

        Args:
            channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers.
                If not provided, returns an empty map.

        Command response:
            Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.
        """
        return self.append_command(
            RequestType.PubSubShardNumSub, channels if channels else []
        )

copy(source, destination, replace=None)

Copies the value stored at the source to the destination key. When replace is True, removes the destination key first if it already exists, otherwise performs no action.

See valkey.io for more details.

Parameters:

Name Type Description Default
source TEncodable

The key to the source value.

required
destination TEncodable

The key where the value should be copied to.

required
replace Optional[bool]

If the destination key should be removed before copying the value to it.

None
Command response

bool: True if the source was copied.

Otherwise, return False.

Since: Valkey version 6.2.0.

Source code in glide/async_commands/batch.py
5478
5479
5480
5481
5482
5483
5484
5485
5486
5487
5488
5489
5490
5491
5492
5493
5494
5495
5496
5497
5498
5499
5500
5501
5502
5503
5504
5505
5506
def copy(
    self,
    source: TEncodable,
    destination: TEncodable,
    replace: Optional[bool] = None,
) -> "ClusterBatch":
    """
    Copies the value stored at the `source` to the `destination` key. When `replace` is True,
    removes the `destination` key first if it already exists, otherwise performs no action.

    See [valkey.io](https://valkey.io/commands/copy) for more details.

    Args:
        source (TEncodable): The key to the source value.
        destination (TEncodable): The key where the value should be copied to.
        replace (Optional[bool]): If the destination key should be removed before copying the value to it.

    Command response:
        bool: True if the source was copied.

        Otherwise, return False.

    Since: Valkey version 6.2.0.
    """
    args = [source, destination]
    if replace is not None:
        args.append("REPLACE")

    return self.append_command(RequestType.Copy, args)

publish(message, channel, sharded=False)

Publish a message on pubsub channel. This command aggregates PUBLISH and SPUBLISH commands functionalities. The mode is selected using the 'sharded' parameter

See PUBLISH and SPUBLISH for more details.

Parameters:

Name Type Description Default
message str

Message to publish

required
channel str

Channel to publish the message on.

required
sharded bool

Use sharded pubsub mode. Available since Valkey version 7.0.

False

Returns:

Name Type Description
int ClusterBatch

Number of subscriptions in that shard that received the message.

Source code in glide/async_commands/batch.py
5508
5509
5510
5511
5512
5513
5514
5515
5516
5517
5518
5519
5520
5521
5522
5523
5524
5525
5526
5527
5528
5529
def publish(
    self, message: str, channel: str, sharded: bool = False
) -> "ClusterBatch":
    """
    Publish a message on pubsub channel.
    This command aggregates PUBLISH and SPUBLISH commands functionalities.
    The mode is selected using the 'sharded' parameter

    See [PUBLISH](https://valkey.io/commands/publish) and [SPUBLISH](https://valkey.io/commands/spublish)
    for more details.

    Args:
        message (str): Message to publish
        channel (str): Channel to publish the message on.
        sharded (bool): Use sharded pubsub mode. Available since Valkey version 7.0.

    Returns:
        int: Number of subscriptions in that shard that received the message.
    """
    return self.append_command(
        RequestType.SPublish if sharded else RequestType.Publish, [channel, message]
    )

pubsub_shardchannels(pattern=None)

Lists the currently active shard channels.

See valkey.io for details.

Parameters:

Name Type Description Default
pattern Optional[TEncodable]

A glob-style pattern to match active shard channels. If not provided, all active shard channels are returned.

None
Command response

List[bytes]: A list of currently active shard channels matching the given pattern.

If no pattern is specified, all active shard channels are returned.

Source code in glide/async_commands/batch.py
5531
5532
5533
5534
5535
5536
5537
5538
5539
5540
5541
5542
5543
5544
5545
5546
5547
5548
5549
def pubsub_shardchannels(
    self, pattern: Optional[TEncodable] = None
) -> "ClusterBatch":
    """
    Lists the currently active shard channels.

    See [valkey.io](https://valkey.io/commands/pubsub-shardchannels) for details.

    Args:
        pattern (Optional[TEncodable]): A glob-style pattern to match active shard channels.
            If not provided, all active shard channels are returned.

    Command response:
        List[bytes]: A list of currently active shard channels matching the given pattern.

        If no pattern is specified, all active shard channels are returned.
    """
    command_args = [pattern] if pattern is not None else []
    return self.append_command(RequestType.PubSubShardChannels, command_args)

pubsub_shardnumsub(channels=None)

Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.

Note

It is valid to call this command without channels. In this case, it will just return an empty map.

See valkey.io for details.

Parameters:

Name Type Description Default
channels Optional[List[str]]

The list of shard channels to query for the number of subscribers. If not provided, returns an empty map.

None
Command response

Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.

Source code in glide/async_commands/batch.py
5551
5552
5553
5554
5555
5556
5557
5558
5559
5560
5561
5562
5563
5564
5565
5566
5567
5568
5569
5570
5571
def pubsub_shardnumsub(
    self, channels: Optional[List[TEncodable]] = None
) -> "ClusterBatch":
    """
    Returns the number of subscribers (exclusive of clients subscribed to patterns) for the specified shard channels.

    Note:
        It is valid to call this command without channels. In this case, it will just return an empty map.

    See [valkey.io](https://valkey.io/commands/pubsub-shardnumsub) for details.

    Args:
        channels (Optional[List[str]]): The list of shard channels to query for the number of subscribers.
            If not provided, returns an empty map.

    Command response:
        Mapping[bytes, int]: A map where keys are the shard channel names and values are the number of subscribers.
    """
    return self.append_command(
        RequestType.PubSubShardNumSub, channels if channels else []
    )