Поделиться через


Spring Cloud Stream с Azure Service Bus

В этой статье показано, как использовать Spring Cloud Stream Binder для отправки сообщений и получения сообщений от Service Bus queues и topics.

Azure предоставляет асинхронную платформу обмена сообщениями с именем Azure Service Bus ("Service Bus") на основе стандарта Advanced Message Queueing Protocol 1.0 ("AMQP 1.0"). Service Bus можно использовать в диапазоне поддерживаемых платформ Azure.

Предварительные условия

  • Подписка Azure — создать бесплатно.

  • Java Development Kit (JDK) версии 8 или более поздней.

  • Apache Maven версии 3.2 или более поздней.

  • cURL или подобная служебная HTTP-программа, с помощью которой можно протестировать функциональные возможности.

  • Очередь или тема для Azure Service Bus. Если у вас нет, создайте очередь Service Bus или создайте раздел Service Bus.

  • Приложение Spring Boot. Если у вас его нет, создайте проект Maven, используя Spring Initializr. Обязательно выберите Maven Project и в разделе Dependencies добавьте зависимости Spring Web и Azure поддержка и выберите Java версии 8 или более поздней.

Примечание.

Чтобы предоставить вашей учетной записи доступ к ресурсам Azure Service Bus, назначьте роль Azure Service Bus Data Sender и Azure Service Bus Data Receiver для учетной записи Microsoft Entra, которую вы используете в настоящее время. Дополнительные сведения о предоставлении ролей доступа см. в статьях Назначение ролей Azure с помощью портала Azure и Аутентификация и авторизация приложения с помощью Microsoft Entra ID для доступа к сущностям Azure Service Bus.

Внимание

Для выполнения действий, описанных в этой статье, требуется spring Boot версии 2.5 или более поздней.

Отправка и получение сообщений от Azure Service Bus

С помощью очереди или темы для Azure Service Bus можно отправлять и получать сообщения, используя Spring Cloud Azure Stream Binder Service Bus.

Чтобы установить модуль Stream Binder Azure Service Bus Spring Cloud, добавьте следующие зависимости в файл pom.xml:

  • Список компонентов (BOM) Spring Cloud для Azure

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.1.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Примечание.

    Если вы используете Spring Boot 4.0.x, обязательно установите версию, указанную в spring-cloud-azure-dependencies, на значение 7.1.0.

    Если вы используете Spring Boot 3.5.x, обязательно задайте версию spring-cloud-azure-dependencies на 6.2.0.

    Если вы используете Spring Boot 3.1.x-3.5.x, обязательно установите для нее версию spring-cloud-azure-dependencies на 5.25.0.

    Если вы используете Spring Boot 2.x, обязательно установите версию spring-cloud-azure-dependencies на 4.20.0.

    Эта ведомость материалов (BOM) должна быть сконфигурирована в <dependencyManagement> разделе файла pom.xml. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию.

    Дополнительные сведения о версии, используемой для этого BOM, можно найти в разделе Какую версию Spring Cloud Azure мне следует использовать?.

  • Артефакт Service Bus Stream Binder Spring Cloud Azure:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Написать код для приложения

Чтобы настроить ваше приложение для использования очереди или топика Service Bus для отправки и получения сообщений, выполните следующие шаги.

  1. Настройте учетные данные Service Bus в файле конфигурации application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    В следующей таблице описаны поля в конфигурации:

    Поле Описание
    spring.cloud.azure.servicebus.namespace Укажите пространство имен, полученное в Service Bus на портале Azure.
    spring.cloud.stream.bindings.consume-in-0.destination Укажите очередь Service Bus или раздел Service Bus, который вы использовали в этом руководстве.
    spring.cloud.stream.bindings.supply-out-0.destination Укажите то же значение, которое использовалось для назначения ввода.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Укажите, следует ли автоматически урегулировать сообщения. Если задано значение false, будет добавлен заголовок сообщения Checkpointer, чтобы разработчики могли вручную урегулировать сообщения.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Укажите тип сущности для выходной привязки, может быть queue или topic.
    spring.cloud.function.definition Укажите, какой функциональный bean-компонент нужно привязать к внешним назначениям, предоставляемым привязками.
    spring.cloud.stream.poller.fixed-delay Укажите фиксированную задержку для опроса по умолчанию в миллисекундах. Значение по умолчанию — 1000 L. Мы рекомендуем использовать значение 60000.
    spring.cloud.stream.poller.initial-delay Укажите начальную задержку для периодических триггеров. Значение по умолчанию — 0.
  2. Измените файл класса запуска, чтобы отобразить следующее содержимое.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Совет

    В этом руководстве нет операций проверки подлинности в конфигурациях или коде. Однако для подключения к службам Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать удостоверение Azure. Spring Cloud Azure использует DefaultAzureCredential, который предоставляет библиотека идентификации Azure, чтобы помочь вам получить учетные данные без каких-либо изменений кода.

    DefaultAzureCredential поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах (например, локальных и рабочих средах), не реализуя код, зависящий от среды. Дополнительные сведения см. в разделе DefaultAzureCredential.

    Для выполнения проверки подлинности в локальных средах разработки можно использовать Azure CLI, Visual Studio Code, PowerShell или другие методы. Дополнительные сведения см. в разделе Аутентификация Azure в средах разработки на Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем. Дополнительные сведения см. в разделе Управляемые удостоверения для ресурсов Azure.

  3. Запустите приложение. Сообщения, как показано в следующем примере, будут размещены в журнале приложений:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Следующие шаги