如何通过 PHP 使用队列存储

小提示

试用 Microsoft Azure 存储资源管理器

Microsoft Azure 存储资源管理器是 Microsoft 免费提供的独立应用,适用于在 Windows、macOS 和 Linux 上以可视方式处理 Azure 存储数据。

本指南介绍如何使用 Azure 队列存储服务执行常见方案。 这些示例通过用于 PHP Azure 存储客户端库的类编写。 涵盖的方案包括插入、速览、获取和删除队列消息,以及创建和删除队列。

什么是队列存储?

Azure 队列存储是一项服务,用于存储大量可以通过 HTTP 或 HTTPS 通过经过身份验证的调用从世界上任何地方访问的消息。 单个队列消息的大小最多可为 64 KB,队列可以包含数以百万计的消息,消息总量取决于存储帐户的容量上限。 队列存储通常用于创建积压工作以异步处理。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储的所有访问都通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述

  • 队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式: 队列可以通过使用以下 URL 格式进行寻址:http://<storage account>.queue.core.windows.net/<queue>

    以下 URL 指向图示中的队列:

    http://myaccount.queue.core.windows.net/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可以使用 Azure PowerShellAzure CLI或用于 .NET的 azure 存储资源提供程序创建 Azure 存储帐户。

如果目前不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅 使用 Azurite 模拟器进行本地 Azure 存储开发

创建 PHP 应用程序

创建访问 Azure 队列存储的 PHP 应用程序的唯一要求是在代码中引用用于 PHP Azure 存储客户端库中的类。 可以使用任何开发工具创建应用程序,包括记事本。

本指南使用队列存储服务功能,这些功能可以在 PHP 应用程序本地调用,或在 Azure 中的 Web 应用程序内运行的代码中调用。

获取 Azure 客户端库

通过 Composer 安装

  1. 在项目的根目录中创建名为 composer.json 的文件,并向其添加以下代码:

    {
      "require": {
        "microsoft/azure-storage-queue": "*"
      }
    }
    
  2. 在项目根目录中下载 composer.phar

  3. 打开命令提示符并在项目根目录中运行以下命令:

    php composer.phar install
    

或者转到 GitHub 上的 Azure 存储 PHP 客户端库 克隆源代码。

配置应用程序以访问队列存储

若要使用 Azure 队列存储的 API,需要:

  1. 使用 require_once 语句引用自动加载文件。
  2. 引用可能使用的任何类。

以下示例演示如何包含自动加载程序文件并引用 QueueRestProxy 类。

require_once 'vendor/autoload.php';
use MicrosoftAzure\Storage\Queue\QueueRestProxy;

在以下示例中,require_once 语句始终显示,但只引用运行该示例所需的类。

设置 Azure 存储连接

若要实例化 Azure 队列存储客户端,必须先具有有效的连接字符串。 队列存储连接字符串的格式如下所示。

若要访问实时服务,

DefaultEndpointsProtocol=[http|https];AccountName=[yourAccount];AccountKey=[yourKey]

若要访问模拟器存储,请执行以下作:

UseDevelopmentStorage=true

若要创建 Azure 队列存储客户端,需要使用 QueueRestProxy 类。 可以使用以下任一技术:

  • 将连接字符串直接传递给它。
  • 使用 Web 应用中的环境变量来存储连接字符串。 有关配置连接字符串的文档,请参阅 Azure Web 应用配置设置 文档。

对于此处概述的示例,直接传递连接字符串。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";
$queueClient = QueueRestProxy::createQueueService($connectionString);

创建队列

QueueRestProxy 对象允许使用 CreateQueue 方法创建队列。 创建队列时,可以在队列上设置选项,但不需要这样做。 此示例演示如何在队列上设置元数据。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\CreateQueueOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// OPTIONAL: Set queue metadata.
$createQueueOptions = new CreateQueueOptions();
$createQueueOptions->addMetaData("key1", "value1");
$createQueueOptions->addMetaData("key2", "value2");

try    {
    // Create queue.
    $queueClient->createQueue("myqueue", $createQueueOptions);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

注释

不应依赖元数据键的区分大小写。 所有密钥都以小写形式从服务中读取。

向队列添加消息

若要将消息添加到队列,请使用 QueueRestProxy->createMessage。 该方法采用队列名称、消息文本和消息选项(可选)。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\CreateMessageOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Create message.
    $queueClient->createMessage("myqueue", "Hello, World");
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

查看下一条消息

可以通过调用 QueueRestProxy->peekMessages来查看队列前面的一个或多个消息,而无需将其从队列中删除。 默认情况下,peekMessage 方法返回单个消息,但可以使用 PeekMessagesOptions->setNumberOfMessages 方法更改该值。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\PeekMessagesOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// OPTIONAL: Set peek message options.
$message_options = new PeekMessagesOptions();
$message_options->setNumberOfMessages(1); // Default value is 1.

try    {
    $peekMessagesResult = $queueClient->peekMessages("myqueue", $message_options);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

$messages = $peekMessagesResult->getQueueMessages();

// View messages.
$messageCount = count($messages);
if($messageCount <= 0){
    echo "There are no messages.<br />";
}
else{
    foreach($messages as $message)    {
        echo "Peeked message:<br />";
        echo "Message Id: ".$message->getMessageId()."<br />";
        echo "Date: ".date_format($message->getInsertionDate(), 'Y-m-d')."<br />";
        echo "Message text: ".$message->getMessageText()."<br /><br />";
    }
}

从队列中取出下一条消息

代码在两个步骤内从队列中删除消息。 首先,调用 QueueRestProxy->listMessages,这会使消息对从队列读取的任何其他代码不可见。 默认情况下,此消息保持 30 秒不可见。 (如果此时间段内未删除该消息,则会再次在队列上显示该消息。若要完成从队列中删除消息,必须调用 QueueRestProxy->deleteMessage。 删除消息的这两个步骤可确保当代码因硬件或软件故障而无法处理消息时,代码的另一个实例可以获取相同的消息,然后重试。 代码在处理完消息后立即调用 deleteMessage

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// Get message.
$listMessagesResult = $queueClient->listMessages("myqueue");
$messages = $listMessagesResult->getQueueMessages();
$message = $messages[0];

/* ---------------------
    Process message.
   --------------------- */

// Get message ID and pop receipt.
$messageId = $message->getMessageId();
$popReceipt = $message->getPopReceipt();

try    {
    // Delete message.
    $queueClient->deleteMessage("myqueue", $messageId, $popReceipt);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

更改排队消息的内容

可以通过调用 QueueRestProxy->updateMessage直接在队列中更改消息的内容。 如果消息表示工作任务,则可以使用此功能更新工作任务的状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。 这会保存与消息关联的工作状态,并给客户端另一分钟以继续处理消息。 可以使用此方法跟踪队列消息上的多步骤工作流,而无需从头开始,如果处理步骤因硬件或软件故障而失败。 通常,你也会保留重试计数,如果消息重试次数超过 n 次,则会将其删除。 这可以防止在处理消息时触发应用程序错误。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Get message.
$listMessagesResult = $queueClient->listMessages("myqueue");
$messages = $listMessagesResult->getQueueMessages();
$message = $messages[0];

// Define new message properties.
$new_message_text = "New message text.";
$new_visibility_timeout = 5; // Measured in seconds.

// Get message ID and pop receipt.
$messageId = $message->getMessageId();
$popReceipt = $message->getPopReceipt();

try    {
    // Update message.
    $queueClient->updateMessage("myqueue",
                                $messageId,
                                $popReceipt,
                                $new_message_text,
                                $new_visibility_timeout);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

处理出队消息的其他选项

可通过两种方式自定义队列中的消息检索。 首先,可以获取一批消息(最多 32 条)。 其次,您可以设置更长或更短的可见性超时,从而为代码完全处理每条消息提供更多或更少的时间。 下面的代码示例使用 getMessages 方法在一次调用中获取 16 条消息。 然后使用 for 循环处理每个消息。 它还将每条消息的不可见超时设置为 5 分钟。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\ListMessagesOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// Set list message options.
$message_options = new ListMessagesOptions();
$message_options->setVisibilityTimeoutInSeconds(300);
$message_options->setNumberOfMessages(16);

// Get messages.
try{
    $listMessagesResult = $queueClient->listMessages("myqueue",
                                                     $message_options);
    $messages = $listMessagesResult->getQueueMessages();

    foreach($messages as $message){

        /* ---------------------
            Process message.
        --------------------- */

        // Get message Id and pop receipt.
        $messageId = $message->getMessageId();
        $popReceipt = $message->getPopReceipt();

        // Delete message.
        $queueClient->deleteMessage("myqueue", $messageId, $popReceipt);
    }
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

获取队列长度

您可以获取队列中消息数量的估计值。 QueueRestProxy->getQueueMetadata 方法检索有关队列的元数据。 对返回的对象调用 getApproximateMessageCount 方法可提供队列中消息数的计数。 计数只是近似值,因为在队列存储响应请求后,可以添加或删除消息。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Get queue metadata.
    $queue_metadata = $queueClient->getQueueMetadata("myqueue");
    $approx_msg_count = $queue_metadata->getApproximateMessageCount();
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

echo $approx_msg_count;

删除队列

若要删除队列及其中的所有消息,请调用 QueueRestProxy->deleteQueue 方法。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Delete queue.
    $queueClient->deleteQueue("myqueue");
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

后续步骤

了解 Azure 队列存储的基础知识后,请按照以下链接了解更复杂的存储任务:

有关详细信息,请参阅 PHP 开发人员中心