如何通过 Python 使用 Azure 队列存储

概述

本文演示了使用 Azure 队列存储服务的常见方案。 涵盖的方案包括插入、速览、获取和删除队列消息。 还介绍了用于创建和删除队列的代码。

本文中的示例用 Python 编写,并使用 适用于 Python 的 Azure 队列存储客户端库。 有关队列的详细信息,请参阅 “后续步骤 ”部分。

什么是队列存储?

Azure 队列存储是一项服务,用于存储大量可以通过 HTTP 或 HTTPS 通过经过身份验证的调用从世界上任何地方访问的消息。 单个队列消息的大小最多可为 64 KB,队列可以包含数百万条消息,直至存储帐户的最大容量限制。 队列存储通常用于创建积压工作以异步处理。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储的所有访问都通过存储帐户完成。 有关存储帐户的详细信息,请参阅 存储帐户概述

  • 队列: 队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 以任意格式显示的消息,最大为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式: 队列使用以下 URL 格式进行寻址:http://<storage account>.queue.core.windows.net/<queue>

    以下 URL 指向图示中的队列:

    http://myaccount.queue.core.windows.net/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解详细信息,请参阅 “创建存储帐户”。

还可以使用 Azure PowerShellAzure CLI用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。

如果目前不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅 使用 Azurite 模拟器进行本地 Azure 存储开发

下载并安装用于 Python 的 Azure 存储 SDK

用于 Python 的 Azure 存储 SDK 需要 Python v2.7、v3.3 或更高版本。

通过 PyPI 安装

若要通过 Python 包索引(PyPI)进行安装,请键入:

pip install azure-storage-queue

注释

如果要从适用于 Python v0.36 或更早版本的 Azure 存储 SDK 升级,请在安装最新包之前使用 pip uninstall azure-storage 卸载旧版 SDK。

有关替代安装方法,请参阅 用于 Python 的 Azure SDK

从 Azure 门户复制凭据

当示例应用程序向 Azure 存储发出请求时,必须对其进行授权。 若要对请求进行授权,请将存储帐户凭据以连接字符串形式添加到应用程序中。 若要查看存储帐户凭据,请按以下步骤操作:

  1. 登录到 Azure 门户

  2. 找到自己的存储帐户。

  3. 在存储帐户菜单窗格中的“安全性 + 网络”下,选择“访问密钥”。 在这里,可以查看帐户访问密钥以及每个密钥的完整连接字符串。

    显示访问密钥设置在 Azure 门户中的位置的屏幕截图

  4. 在“访问密钥”窗格中,选择“显示密钥” 。

  5. 在“key1”部分,找到“连接字符串”值。 选择“复制到剪贴板”图标来复制该连接字符串。 在下一部分,你要将此连接字符串值添加到某个环境变量。

    显示如何从 Azure 门户复制连接字符串的屏幕截图

配置存储连接字符串

在复制连接字符串后,请将其写入到运行该应用程序的本地计算机上的新环境变量。 若要设置环境变量,请打开控制台窗口,并遵照适用于操作系统的说明。 将 <yourconnectionstring> 替换为实际的连接字符串。

setx AZURE_STORAGE_CONNECTION_STRING "<yourconnectionstring>"

在 Windows 中添加环境变量后,必须启动命令窗口的新实例。

重新启动程序

添加环境变量后,重启需要读取环境变量的任何正在运行的程序。 例如,先重启开发环境或编辑器,然后再继续操作。

配置应用程序以访问队列存储

使用 QueueClient 对象可以处理队列。 在希望以编程方式访问 Azure 队列的任何 Python 文件顶部附近添加以下代码:

from azure.storage.queue import (
        QueueClient,
        BinaryBase64EncodePolicy,
        BinaryBase64DecodePolicy
)

import os, uuid

os 包支持检索环境变量。 uuid 包支持为队列名称生成唯一标识符。

创建队列

连接字符串是从前面设置的 AZURE_STORAGE_CONNECTION_STRING 环境变量中检索的。

以下代码使用存储连接字符串创建 QueueClient 对象。

# Retrieve the connection string from an environment
# variable named AZURE_STORAGE_CONNECTION_STRING
connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")

# Create a unique name for the queue
q_name = "queue-" + str(uuid.uuid4())

# Instantiate a QueueClient object which will
# be used to create and manipulate the queue
print("Creating queue: " + q_name)
queue_client = QueueClient.from_connection_string(connect_str, q_name)

# Create the queue
queue_client.create_queue()

Azure 队列消息以文本的形式存储。 如果要存储二进制数据,请先设置 Base64 编码和解码函数,然后再将消息放入队列中。

创建客户端对象时配置 Base64 编码和解码函数。

# Setup Base64 encoding and decoding functions
base64_queue_client = QueueClient.from_connection_string(
                            conn_str=connect_str, queue_name=q_name,
                            message_encode_policy = BinaryBase64EncodePolicy(),
                            message_decode_policy = BinaryBase64DecodePolicy()
                        )

将消息插入队列

若要将消息插入队列,请使用 send_message 方法。

message = u"Hello World"
print("Adding message: " + message)
queue_client.send_message(message)

查看消息

可以通过调用 peek_messages 方法来查看消息,而无需将其从队列中删除。 默认情况下,此方法可查看单个消息。

# Peek at the first message
messages = queue_client.peek_messages()

for peeked_message in messages:
    print("Peeked message: " + peeked_message.content)

更改排队消息的内容

可以直接在队列中更改消息内容。 如果消息表示任务,则可以使用此功能更新任务的状态。

以下代码使用 update_message 方法更新消息。 可见性超时设置为 0,这意味着消息会立即显示并更新内容。

messages = queue_client.receive_messages()
list_result = next(messages)

message = queue_client.update_message(
        list_result.id, list_result.pop_receipt,
        visibility_timeout=0, content=u'Hello World Again')

print("Updated message to: " + message.content)

获取队列长度

您可以获取队列中消息数量的估计值。

get_queue_properties方法返回队列属性,包括 approximate_message_count.

properties = queue_client.get_queue_properties()
count = properties.approximate_message_count
print("Message count: " + str(count))

结果只是近似值,因为在服务响应请求后,可以添加或删除消息。

取消消息的排队

在两个步骤中从队列中删除消息。 如果代码无法处理消息,则此双重过程可确保可以获取相同的消息,然后重试。 成功处理消息后,调用 delete_message

调用 receive_messages时,默认情况下会获取队列中的下一条消息。 从 receive_messages 返回的消息对从此队列读取消息的任何其他代码都不可见。 默认情况下,此消息保持 30 秒不可见。 若要完成从队列中删除消息,还必须调用 delete_message

messages = queue_client.receive_messages()

for message in messages:
    print("Dequeueing message: " + message.content)
    queue_client.delete_message(message.id, message.pop_receipt)

可通过两种方式自定义队列中的消息检索。 首先,可以获取一批消息(最多 32 条)。 其次,你可以设置更长或更短的不可见性超时,从而给予代码更多或更少的时间来完整处理每条消息。

下面的代码示例使用 receive_messages 方法分批获取消息。 然后,它使用嵌套 for 循环处理每个批处理中的每个消息。 它还将每条消息的不可见超时设置为 5 分钟。

messages = queue_client.receive_messages(messages_per_page=5, visibility_timeout=5*60)

for msg_batch in messages.by_page():
   for msg in msg_batch:
      print("Batch dequeue message: " + msg.content)
      queue_client.delete_message(msg)

删除队列

若要删除队列及其中包含的所有消息,请调用 delete_queue 方法。

print("Deleting queue: " + queue_client.queue_name)
queue_client.delete_queue()

小提示

尝试Microsoft Azure 存储资源管理器

Microsoft Azure 存储资源管理器 是一款免费的独立应用,来自 Microsoft,可用于在 Windows、macOS 和 Linux 上直观地处理 Azure 存储数据。

后续步骤

了解队列存储的基础知识后,请按照以下链接了解详细信息。

有关使用已弃用的 Python 版本 2 SDK 的相关代码示例,请参阅 使用 Python 版本 2 的代码示例