Поделиться через


Spring Cloud Stream с Azure Event Hubs

В этом руководстве показано, как отправлять и получать сообщения с помощью Azure Event Hubs и Spring Cloud Stream Binder Eventhubs в приложении Spring Boot.

Предварительные условия

Примечание.

Чтобы предоставить учетной записи доступ к ресурсам, в Azure Event Hubs назначьте роль Azure Event Hubs Data Receiver и Azure Event Hubs Data Sender учетной записи Microsoft Entra, которую вы используете. Затем в учетной записи Azure Storage назначьте роль Storage Blob Data Contributor учетной записи Microsoft Entra, которую вы в данный момент используете. Дополнительные сведения о предоставлении ролей доступа см. в статье Назначение ролей Azure с помощью портала Azure и Авторизация доступа к ресурсам Центров событий с помощью Microsoft Entra ID.

Внимание

Для выполнения действий, описанных в этом руководстве, требуется Spring Boot версии 2.5 или более поздней.

Отправка и получение сообщений из Azure Event Hubs

С помощью учетной записи Azure Storage и концентратора событий Azure можно отправлять и получать сообщения с помощью Центров событий Spring Cloud Azure Stream Binder.

Чтобы установить модуль Центров событий Stream Binder Azure Spring Cloud, добавьте следующие зависимости в файл pom.xml:

  • Список компонентов (BOM) Spring Cloud для Azure

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>7.1.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Примечание.

    Если вы используете Spring Boot 4.0.x, обязательно установите версию, указанную в spring-cloud-azure-dependencies, на значение 7.1.0.

    Если вы используете Spring Boot 3.5.x, обязательно задайте версию spring-cloud-azure-dependencies на 6.2.0.

    Если вы используете Spring Boot 3.1.x-3.5.x, обязательно установите для нее версию spring-cloud-azure-dependencies на 5.25.0.

    Если вы используете Spring Boot 2.x, обязательно установите версию spring-cloud-azure-dependencies на 4.20.0.

    Эта ведомость материалов (BOM) должна быть сконфигурирована в <dependencyManagement> разделе файла pom.xml. Это гарантирует, что все зависимости Spring Cloud Azure используют одну и ту же версию.

    Дополнительные сведения о версии, используемой для этого BOM, можно найти в разделе Какую версию Spring Cloud Azure мне следует использовать?.

  • Артефакт Центров событий Stream Binder в Spring Cloud Azure:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

Программирование приложения

Выполните следующие действия, чтобы настроить приложение для создания и использования сообщений с помощью Azure Event Hubs.

  1. Настройте учетные данные концентратора событий, добавив следующие свойства в файл application.properties .

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    В следующей таблице описаны поля в конфигурации:

    Поле Описание
    spring.cloud.azure.eventhubs.namespace Укажите пространство имен, полученное в концентраторе событий на портале Azure.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Укажите учетную запись хранения, созданную в этом руководстве.
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Укажите контейнер учетной записи хранения.
    spring.cloud.stream.bindings.consume-in-0.destination Укажите концентратор событий, используемый в этом руководстве.
    spring.cloud.stream.bindings.consume-in-0.group Укажите группы потребителей в экземпляре Event Hubs.
    spring.cloud.stream.bindings.supply-out-0.destination Укажите тот же концентратор событий, который использовался в этом руководстве.
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode Укажите MANUAL.
    spring.cloud.function.definition Укажите, какой функциональный bean-компонент нужно привязать к внешним точкам назначения, раскрываемым привязками.
    spring.cloud.stream.poller.initial-delay Укажите начальную задержку для периодических триггеров. Значение по умолчанию — 0.
    spring.cloud.stream.poller.fixed-delay Укажите фиксированную задержку для опросчика по умолчанию в миллисекундах. Значение по умолчанию — 1000 L.
  2. Измените файл класса запуска, чтобы отобразить следующее содержимое.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Совет

    В этом руководстве нет операций проверки подлинности в конфигурациях или коде. Однако для подключения к службам Azure требуется проверка подлинности. Чтобы завершить проверку подлинности, необходимо использовать удостоверение Azure. Spring Cloud Azure использует DefaultAzureCredential, который предоставляет библиотека идентификации Azure, чтобы помочь вам получить учетные данные без каких-либо изменений кода.

    DefaultAzureCredential поддерживает несколько методов проверки подлинности и определяет, какой метод следует использовать во время выполнения. Этот подход позволяет приложению использовать различные методы проверки подлинности в разных средах (например, локальных и рабочих средах), не реализуя код, зависящий от среды. Дополнительные сведения см. в разделе DefaultAzureCredential.

    Для выполнения проверки подлинности в локальных средах разработки можно использовать Azure CLI, Visual Studio Code, PowerShell или другие методы. Дополнительные сведения см. в разделе Аутентификация Azure в средах разработки на Java. Чтобы завершить проверку подлинности в средах размещения Azure, рекомендуется использовать управляемое удостоверение, назначаемое пользователем. Дополнительные сведения см. в разделе Управляемые удостоверения для ресурсов Azure.

  3. Запустите приложение. Такие сообщения будут размещены в журнале приложений, как показано в следующем примере выходных данных:

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

Развертывание в Azure Spring Apps

Теперь, когда у вас есть приложение Spring Boot, работающее локально, пришло время переместить его в рабочую среду. Azure Spring Apps упрощает развертывание приложений Spring Boot для Azure без каких-либо изменений кода. Эта служба управляет инфраструктурой приложений Spring, благодаря чему разработчики могут сосредоточиться на коде. Azure Spring Apps обеспечивает управление жизненным циклом с помощью комплексного мониторинга и диагностики, управления конфигурацией, обнаружения служб, интеграции CI/CD, развертывания по схеме blue-green и многого другого. Чтобы развернуть ваше приложение в Azure Spring Apps, см. статью Развертывание первого приложения в Azure Spring Apps.

Следующие шаги