如何使用来自C++的队列存储

小提示

尝试Microsoft Azure 存储资源管理器

Microsoft Azure 存储资源管理器 是一款免费的独立应用,来自 Microsoft,可用于在 Windows、macOS 和 Linux 上直观地处理 Azure 存储数据。

概述

本指南介绍如何使用 Azure 队列存储服务执行常见方案。 这些示例以C++编写,并使用 适用于C++的Azure存储客户端库。 涵盖的方案包括 插入速览获取删除 队列消息,以及 创建和删除队列

注释

本指南面向 C++ v1.0.0 及更高版本的 Azure 存储客户端库。 建议的版本是 Azure 存储客户端库 v2.2.0,可通过 NuGetGitHub 获取。

什么是队列存储?

Azure 队列存储是一项服务,用于存储大量可以通过 HTTP 或 HTTPS 通过经过身份验证的调用从世界上任何地方访问的消息。 单个队列消息的大小最多可为 64 KB,队列可以包含数以百万计的消息,消息总量取决于存储帐户的容量上限。 队列存储通常用于创建积压工作以异步处理。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储的所有访问都通过存储帐户完成。 有关存储帐户的详细信息,请参阅 存储帐户概述

  • 队列: 队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 以任意格式显示的消息,最大为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式: 队列使用以下 URL 格式进行寻址:http://<storage account>.queue.core.windows.net/<queue>

    以下 URL 指向图示中的队列:

    http://myaccount.queue.core.windows.net/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解详细信息,请参阅 “创建存储帐户”。

还可以使用 Azure PowerShellAzure CLI用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。

如果目前不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅 使用 Azurite 模拟器进行本地 Azure 存储开发

创建 C++ 应用程序

本指南将使用可在C++应用程序中运行的存储功能。

为此,需要安装用于C++的 Azure 存储客户端库,并在 Azure 订阅中创建 Azure 存储帐户。

若要安装用于C++的 Azure 存储客户端库,可以使用以下方法:

.\vcpkg.exe install azure-storage-cpp

可以在 自述 文件中找到有关如何生成源代码并导出到 NuGet 的指南。

配置应用程序以访问队列存储

将以下 include 语句添加到要在其中使用 Azure 存储 API 访问队列的 C++ 文件的顶部:

#include <was/storage_account.h>
#include <was/queue.h>

设置 Azure 存储连接字符串

Azure 存储客户端使用存储连接字符串来存储用于访问数据管理服务的终结点和凭据。 在客户端应用程序中运行时,必须提供以下格式的存储连接字符串。使用您的存储帐户的名称和在Azure 门户中列出的存储帐户访问密钥,并设置AccountNameAccountKey值。 有关存储帐户和访问密钥的信息,请参阅 “关于 Azure 存储帐户”。 此示例演示如何声明一个静态字段以保存连接字符串:

// Define the connection-string with your values.
const utility::string_t storage_connection_string(U("DefaultEndpointsProtocol=https;AccountName=your_storage_account;AccountKey=your_storage_account_key"));

若要在本地 Windows 计算机中测试应用程序,可以使用 Azurite 存储模拟器。 Azurite 是一个实用工具,用于模拟本地开发计算机上的 Azure Blob 存储和队列存储。 以下示例演示如何声明一个静态字段来保存本地存储模拟器的连接字符串:

// Define the connection-string with Azurite.
const utility::string_t storage_connection_string(U("UseDevelopmentStorage=true;"));  

若要启动 Azurite,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发

以下示例假定你已使用这两种方法之一来获取存储连接字符串。

检索连接字符串

可使用 cloud_storage_account 类来表示存储帐户信息。 若要从存储连接字符串中检索存储帐户信息,可以使用此方法 parse

// Retrieve storage account from connection string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

如何:创建队列

对象 cloud_queue_client 允许获取队列的引用对象。 以下代码创建一个 cloud_queue_client 对象。

// Retrieve storage account from connection string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create a queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

使用 cloud_queue_client 对象获取对要使用的队列的引用。 如果队列不存在,您可以创建队列。

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Create the queue if it doesn't already exist.
queue.create_if_not_exists();  

如何:将消息插入队列

若要将消息插入现有队列,请先创建新的 cloud_queue_message消息。 接下来,调用该方法 add_message 。 可以从字符串(采用 UTF-8 格式)或字节数组创建 A cloud_queue_message 。 下面是创建队列的代码(如果队列不存在),并插入消息 Hello, World

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Create the queue if it doesn't already exist.
queue.create_if_not_exists();

// Create a message and add it to the queue.
azure::storage::cloud_queue_message message1(U("Hello, World"));
queue.add_message(message1);  

如何查看下一条消息

可以通过调用 peek_message 该方法来查看队列前面的消息,而无需将其从队列中删除。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Peek at the next message.
azure::storage::cloud_queue_message peeked_message = queue.peek_message();

// Output the message content.
std::wcout << U("Peeked message content: ") << peeked_message.content_as_string() << std::endl;

如何:更改排队消息的内容

可以直接在队列中更改消息内容。 如果消息表示工作任务,则可以使用此功能更新工作任务的状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。 这会保存与消息关联的工作状态,并给客户端另一分钟以继续处理消息。 可以使用此方法跟踪队列消息上的多步骤工作流,而无需从头开始,如果处理步骤因硬件或软件故障而失败。 通常,也会保留重试计数,如果消息重试次数超过 n 次,则会将其删除。 这可以防止消息在每次处理时触发应用程序错误。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Get the message from the queue and update the message contents.
// The visibility timeout "0" means make it visible immediately.
// The visibility timeout "60" means the client can get another minute to continue
// working on the message.
azure::storage::cloud_queue_message changed_message = queue.get_message();

changed_message.set_content(U("Changed message"));
queue.update_message(changed_message, std::chrono::seconds(60), true);

// Output the message content.
std::wcout << U("Changed message content: ") << changed_message.content_as_string() << std::endl;  

如何:取消排队下一条消息

代码以两个步骤从队列中取出消息。 调用 get_message 时,将得到队列中的下一条消息。 对从此队列读取消息的任何其他代码而言,从 get_message 返回的消息将不可见。 要完成从队列中移除消息,还必须调用 delete_message。 删除消息的这两个步骤可确保如果代码由于硬件或软件故障而无法处理消息,则代码的另一个实例可以获取相同的消息,然后重试。 代码在处理完消息后立即调用 delete_message

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Get the next message.
azure::storage::cloud_queue_message dequeued_message = queue.get_message();
std::wcout << U("Dequeued message: ") << dequeued_message.content_as_string() << std::endl;

// Delete the message.
queue.delete_message(dequeued_message);

如何使用其他选项来出队列消息

可通过两种方式自定义队列中的消息检索。 首先,可以获取一批消息(最多 32 条)。 其次,你可以设置更长或更短的不可见性超时,从而给予代码更多或更少的时间来完整处理每条消息。 下面的代码示例使用 get_messages 该方法在一次调用中获取 20 条消息。 然后,它使用 for 循环处理每个消息。 它还将每条消息的不可见超时设置为 5 分钟。 请注意,所有消息的五分钟同时开始,因此在调用 get_messages后 5 分钟后,任何尚未删除的消息都将再次可见。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Dequeue some queue messages (maximum 32 at a time) and set their visibility timeout to
// 5 minutes (300 seconds).
azure::storage::queue_request_options options;
azure::storage::operation_context context;

// Retrieve 20 messages from the queue with a visibility timeout of 300 seconds.
std::vector<azure::storage::cloud_queue_message> messages = queue.get_messages(20, std::chrono::seconds(300), options, context);

for (auto it = messages.cbegin(); it != messages.cend(); ++it)
{
    // Display the contents of the message.
    std::wcout << U("Get: ") << it->content_as_string() << std::endl;
}

如何:获取队列长度

您可以获取队列中消息数量的估计值。 download_attributes 方法返回包括消息计数在内的队列属性。 该方法 approximate_message_count 获取队列中消息的大致数量。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// Fetch the queue attributes.
queue.download_attributes();

// Retrieve the cached approximate message count.
int cachedMessageCount = queue.approximate_message_count();

// Display number of messages.
std::wcout << U("Number of messages in queue: ") << cachedMessageCount << std::endl;  

如何:删除队列

若要删除队列及其包含的所有消息,请对队列对象调用 delete_queue_if_exists 方法。

// Retrieve storage account from connection-string.
azure::storage::cloud_storage_account storage_account = azure::storage::cloud_storage_account::parse(storage_connection_string);

// Create the queue client.
azure::storage::cloud_queue_client queue_client = storage_account.create_cloud_queue_client();

// Retrieve a reference to a queue.
azure::storage::cloud_queue queue = queue_client.get_queue_reference(U("my-sample-queue"));

// If the queue exists and delete it.
queue.delete_queue_if_exists();  

后续步骤

了解队列存储的基础知识后,请按照以下链接了解有关 Azure 存储的详细信息。