概述
本文演示了使用 Azure 队列存储服务的常见方案。 涵盖的方案包括插入、速览、获取和删除队列消息。 还介绍了用于创建和删除队列的代码。
本文中的示例用 Python 编写,并使用 适用于 Python 的 Azure 队列存储客户端库。 有关队列的详细信息,请参阅 “后续步骤 ”部分。
什么是队列存储?
Azure 队列存储是一项服务,用于存储大量可以通过 HTTP 或 HTTPS 通过经过身份验证的调用从世界上任何地方访问的消息。 单个队列消息的大小最多可为 64 KB,队列可以包含数百万条消息,直至存储帐户的最大容量限制。 队列存储通常用于创建积压工作以异步处理。
队列服务概念
Azure 队列服务包含以下组件:
存储帐户: 对 Azure 存储的所有访问都通过存储帐户完成。 有关存储帐户的详细信息,请参阅 存储帐户概述。
队列: 队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据。
消息: 以任意格式显示的消息,最大为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。
URL 格式: 队列使用以下 URL 格式进行寻址:http://
<storage account>.queue.core.windows.net/<queue>以下 URL 指向图示中的队列:
http://myaccount.queue.core.windows.net/incoming-orders
创建 Azure 存储帐户
创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解详细信息,请参阅 “创建存储帐户”。
还可以使用 Azure PowerShell、 Azure CLI 或 用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。
如果目前不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅 使用 Azurite 模拟器进行本地 Azure 存储开发。
下载并安装用于 Python 的 Azure 存储 SDK
用于 Python 的 Azure 存储 SDK 需要 Python v2.7、v3.3 或更高版本。
通过 PyPI 安装
若要通过 Python 包索引(PyPI)进行安装,请键入:
pip install azure-storage-queue
注释
如果要从适用于 Python v0.36 或更早版本的 Azure 存储 SDK 升级,请在安装最新包之前使用 pip uninstall azure-storage 卸载旧版 SDK。
有关替代安装方法,请参阅 用于 Python 的 Azure SDK。
从 Azure 门户复制凭据
当示例应用程序向 Azure 存储发出请求时,必须对其进行授权。 若要对请求进行授权,请将存储帐户凭据以连接字符串形式添加到应用程序中。 若要查看存储帐户凭据,请按以下步骤操作:
登录到 Azure 门户。
找到自己的存储帐户。
在存储帐户菜单窗格中的“安全性 + 网络”下,选择“访问密钥”。 在这里,可以查看帐户访问密钥以及每个密钥的完整连接字符串。
在“访问密钥”窗格中,选择“显示密钥” 。
在“key1”部分,找到“连接字符串”值。 选择“复制到剪贴板”图标来复制该连接字符串。 在下一部分,你要将此连接字符串值添加到某个环境变量。
配置存储连接字符串
在复制连接字符串后,请将其写入到运行该应用程序的本地计算机上的新环境变量。 若要设置环境变量,请打开控制台窗口,并遵照适用于操作系统的说明。 将 <yourconnectionstring> 替换为实际的连接字符串。
setx AZURE_STORAGE_CONNECTION_STRING "<yourconnectionstring>"
在 Windows 中添加环境变量后,必须启动命令窗口的新实例。
重新启动程序
添加环境变量后,重启需要读取环境变量的任何正在运行的程序。 例如,先重启开发环境或编辑器,然后再继续操作。
配置应用程序以访问队列存储
使用 QueueClient 对象可以处理队列。 在希望以编程方式访问 Azure 队列的任何 Python 文件顶部附近添加以下代码:
from azure.storage.queue import (
QueueClient,
BinaryBase64EncodePolicy,
BinaryBase64DecodePolicy
)
import os, uuid
os 包支持检索环境变量。
uuid 包支持为队列名称生成唯一标识符。
创建队列
连接字符串是从前面设置的 AZURE_STORAGE_CONNECTION_STRING 环境变量中检索的。
以下代码使用存储连接字符串创建 QueueClient 对象。
# Retrieve the connection string from an environment
# variable named AZURE_STORAGE_CONNECTION_STRING
connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
# Create a unique name for the queue
q_name = "queue-" + str(uuid.uuid4())
# Instantiate a QueueClient object which will
# be used to create and manipulate the queue
print("Creating queue: " + q_name)
queue_client = QueueClient.from_connection_string(connect_str, q_name)
# Create the queue
queue_client.create_queue()
Azure 队列消息以文本的形式存储。 如果要存储二进制数据,请先设置 Base64 编码和解码函数,然后再将消息放入队列中。
创建客户端对象时配置 Base64 编码和解码函数。
# Setup Base64 encoding and decoding functions
base64_queue_client = QueueClient.from_connection_string(
conn_str=connect_str, queue_name=q_name,
message_encode_policy = BinaryBase64EncodePolicy(),
message_decode_policy = BinaryBase64DecodePolicy()
)
将消息插入队列
若要将消息插入队列,请使用 send_message 方法。
message = u"Hello World"
print("Adding message: " + message)
queue_client.send_message(message)
查看消息
可以通过调用 peek_messages 方法来查看消息,而无需将其从队列中删除。 默认情况下,此方法可查看单个消息。
# Peek at the first message
messages = queue_client.peek_messages()
for peeked_message in messages:
print("Peeked message: " + peeked_message.content)
更改排队消息的内容
可以直接在队列中更改消息内容。 如果消息表示任务,则可以使用此功能更新任务的状态。
以下代码使用 update_message 方法更新消息。 可见性超时设置为 0,这意味着消息会立即显示并更新内容。
messages = queue_client.receive_messages()
list_result = next(messages)
message = queue_client.update_message(
list_result.id, list_result.pop_receipt,
visibility_timeout=0, content=u'Hello World Again')
print("Updated message to: " + message.content)
获取队列长度
您可以获取队列中消息数量的估计值。
get_queue_properties方法返回队列属性,包括 approximate_message_count.
properties = queue_client.get_queue_properties()
count = properties.approximate_message_count
print("Message count: " + str(count))
结果只是近似值,因为在服务响应请求后,可以添加或删除消息。
取消消息的排队
在两个步骤中从队列中删除消息。 如果代码无法处理消息,则此双重过程可确保可以获取相同的消息,然后重试。 成功处理消息后,调用 delete_message。
调用 receive_messages时,默认情况下会获取队列中的下一条消息。 从 receive_messages 返回的消息对从此队列读取消息的任何其他代码都不可见。 默认情况下,此消息保持 30 秒不可见。 若要完成从队列中删除消息,还必须调用 delete_message。
messages = queue_client.receive_messages()
for message in messages:
print("Dequeueing message: " + message.content)
queue_client.delete_message(message.id, message.pop_receipt)
可通过两种方式自定义队列中的消息检索。 首先,可以获取一批消息(最多 32 条)。 其次,你可以设置更长或更短的不可见性超时,从而给予代码更多或更少的时间来完整处理每条消息。
下面的代码示例使用 receive_messages 方法分批获取消息。 然后,它使用嵌套 for 循环处理每个批处理中的每个消息。 它还将每条消息的不可见超时设置为 5 分钟。
messages = queue_client.receive_messages(messages_per_page=5, visibility_timeout=5*60)
for msg_batch in messages.by_page():
for msg in msg_batch:
print("Batch dequeue message: " + msg.content)
queue_client.delete_message(msg)
删除队列
若要删除队列及其中包含的所有消息,请调用 delete_queue 方法。
print("Deleting queue: " + queue_client.queue_name)
queue_client.delete_queue()
小提示
尝试Microsoft Azure 存储资源管理器
Microsoft Azure 存储资源管理器 是一款免费的独立应用,来自 Microsoft,可用于在 Windows、macOS 和 Linux 上直观地处理 Azure 存储数据。
后续步骤
了解队列存储的基础知识后,请按照以下链接了解详细信息。
有关使用已弃用的 Python 版本 2 SDK 的相关代码示例,请参阅 使用 Python 版本 2 的代码示例。