Поделиться через


Асинхронное программирование в пакете SDK Azure для Java

В этой статье описывается асинхронная модель программирования в пакете SDK Azure для Java.

Изначально пакет SDK Azure содержал только неблокирующие асинхронные API для взаимодействия со службами Azure. Эти API позволяют использовать пакет SDK Azure для эффективного создания масштабируемых приложений, использующих системные ресурсы. Однако пакет SDK Azure для Java также содержит синхронные клиенты для обеспечения более широкой аудитории, а также делает наши клиентские библиотеки доступными для пользователей, не знакомых с асинхронным программированием. (См. «Доступность» в руководстве по проектированию SDK для Azure.) Таким образом, все клиентские библиотеки Java в SDK для Azure Java предлагают как асинхронные, так и синхронные клиентские средства. Однако рекомендуется использовать асинхронные клиенты для рабочих систем, чтобы максимально увеличить использование системных ресурсов.

Реактивные потоки

Если ознакомиться с разделом асинхронного клиента службы в Руководстве по проектированию SDK для Java от Azure, вы заметите, что вместо использования типов асинхронных API, предоставляемых Java 8, наши API используют реактивные типы. Почему мы предпочли реактивные типы типам, которые доступны в JDK?

Java 8 представила такие функции, как Streams, Лямбда-выражения и CompletableFuture. Эти функции предоставляют множество возможностей, но имеют некоторые ограничения.

CompletableFuture предоставляет неблокирующие функции, основанные на обратных вызовах, а интерфейс CompletionStage позволял легко композиционировать ряд асинхронных операций. Лямбда-интерфейсы делают эти API на основе push-уведомлений более читаемыми. Потоки предоставляют операции функционального стиля для обработки коллекции элементов данных. Однако потоки синхронны и не могут использоваться повторно. CompletableFuture позволяет выполнять один запрос, обеспечивает поддержку обратного вызова и ожидает один ответ. Однако многим облачным службам требуется возможность потоковой передачи данных — например, Центры событий.

Реактивные потоки могут помочь преодолеть эти ограничения путем потоковой передачи элементов из источника на подписчика. Когда подписчик запрашивает данные из источника, источник отправляет любое количество результатов обратно. Ему не нужно отправлять все одновременно. Передача происходит за период времени, когда источник имеет данные для отправки.

В этой модели подписчик регистрирует обработчики событий для обработки данных при поступлении. Эти взаимодействия на основе push-уведомлений уведомляют подписчика с помощью различных сигналов:

  • Вызов onSubscribe() указывает, что передача данных начинается.
  • Вызов onError() указывает, что произошла ошибка, которая также отмечает конец передачи данных.
  • Вызов onComplete() указывает на успешное завершение передачи данных.

В отличие от Java Streams, реактивные потоки обрабатывают ошибки как события первого класса. Реактивные потоки имеют выделенный канал, через который источник может передавать любые ошибки подписчику. Кроме того, реактивные потоки позволяют подписчику согласовывать скорость передачи данных для преобразования этих потоков в комбинированную модель передачи-получения.

Спецификация Reactive Streams предоставляет стандарт для способа передачи данных. На высоком уровне спецификация определяет следующие четыре интерфейса и задает правила по реализации этих интерфейсов.

  • Издатель — источник потока данных.
  • Подписчик является потребителем потока данных.
  • Подписка управляет состоянием передачи данных между издателем и подписчиком.
  • Обработчик является издателем и подписчиком.

Существуют некоторые известные библиотеки Java, которые предоставляют реализацию этой спецификации, например RxJava, Akka Streams, Vert.x и Project Reactor.

Пакет SDK Azure для Java внедрил Project Reactor для предоставления своих асинхронных API. Основной фактор, определяющий это решение, заключался в обеспечении плавной интеграции с Spring Webflux, которая также использует Project Reactor. Еще один фактор, влияющий на выбор Project Reactor вместо RxJava, было то, что Project Reactor использует Java 8, но RxJava в то время оставался на Java 7. Project Reactor также предлагает широкий набор операторов, которые являются составными и позволяют писать декларативный код для создания конвейеров обработки данных. Еще одна хорошая вещь о Project Reactor заключается в том, что он имеет адаптеры для преобразования типов Реактора Проекта в другие популярные типы реализаций.

Сравнение API синхронных и асинхронных операций

Мы обсудили синхронные клиенты и параметры асинхронных клиентов. В таблице ниже приведены сведения о том, как выглядят API, разработанные с помощью следующих параметров:

Тип API Нет значения Одно значение Несколько значений
Стандартная версия Java — синхронные API void T Iterable<T>
Стандартный Java — асинхронные API CompletableFuture<Void> CompletableFuture<T> CompletableFuture<List<T>>
Интерфейсы реактивных потоков Publisher<Void> Publisher<T> Publisher<T>
Реализация Project Reactor для Реактивных потоков Mono<Void> Mono<T> Flux<T>

Для полноты стоит отметить, что Java 9 представил класс Flow , включающий четыре реактивных интерфейса потоков. Однако этот класс не включает ни одну реализацию.

Использование асинхронных API в пакете SDK Azure для Java

Спецификация реактивных потоков не различает типы издателей. В спецификации реактивных потоков издатели просто создают ноль или больше элементов данных. Во многих случаях существует полезное различие между издателем, производящим не более одного элемента данных, и тем, кто создает ноль или более. В облачных API это различие указывает, возвращает ли запрос однозначный ответ или коллекцию. Project Reactor предоставляет два типа, чтобы сделать это различие - Mono и Flux. API, который возвращает Mono, будет содержать ответ не более чем с одним значением, а API, который возвращает Flux, будет содержать ответ с нулем или более значениями.

Например, предположим, что вы используете ConfigurationAsyncClient для получения конфигурации, хранящейся с помощью службы конфигурации приложений Azure. (Дополнительные сведения см. в статье "Что такое конфигурация приложений Azure?.)

При создании ConfigurationAsyncClient и вызове getConfigurationSetting() на клиенте возвращается Mono, указывающее, что ответ содержит одно значение. Однако вызов этого метода не делает ничего. Клиент еще не запросил службу конфигурации приложений Azure. На этом этапе Mono<ConfigurationSetting> возвращаемый ЭТИМ API является просто сборкой конвейера обработки данных. Это означает, что необходимая настройка для использования данных завершена. Чтобы действительно инициировать передачу данных (то есть отправить запрос сервису и получить ответ), необходимо подписаться на возвращенный Mono объект. Таким образом, при работе с этими реактивными потоками необходимо помнить о необходимости вызова subscribe(), потому что ничего не произойдет, пока вы этого не сделаете.

В следующем примере показано, как подписаться на Mono и вывести значение конфигурации на консоль.

ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
    .connectionString("<your connection string>")
    .buildAsyncClient();

asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
    config -> System.out.println("Config value: " + config.getValue()),
    ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
    () -> System.out.println("Successfully retrieved configuration setting"));

System.out.println("Done");

Обратите внимание, что после вызова getConfigurationSetting() на клиенте пример кода подписывается на результат и предоставляет три отдельных лямбда-выражения. Первая лямбда обрабатывает данные, полученные от службы, и запускается при успешном ответе. Вторая лямбда-функция активируется в случае ошибки при получении конфигурации. Третий лямбда-код вызывается при завершении потока данных, что означает, что из этого потока больше элементов данных не ожидается.

Замечание

Как и во всех асинхронных программированиях, после создания подписки выполнение выполняется как обычно. Если нет ничего, чтобы сохранить программу активной и выполняемой, она может завершиться до завершения асинхронной операции. Основной поток, который вызвал subscribe(), не будет ждать, пока вы выполните сетевой запрос в конфигурации приложений Azure и получите ответ. В рабочих системах можно продолжать обрабатывать что-то другое, но в этом примере можно добавить небольшую задержку, вызвав Thread.sleep() или используя асинхронную CountDownLatch операцию, чтобы обеспечить возможность завершения асинхронной операции.

Как показано в следующем примере, API, возвращающие Flux, также следуют аналогичному принципу. Разница заключается в том, что первый обратный вызов, предоставленный subscribe() методу, вызывается несколько раз для каждого элемента данных в ответе. Ошибка или обратный вызов завершения вызываются ровно один раз и считаются сигналами терминала. Другие обратные вызовы не вызываются, если один из этих сигналов получен от издателя.

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(
    event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

Обратная прессура

Что происходит, когда источник создает данные быстрее, чем подписчик может обрабатывать? Подписчик может перегружаться данными, что может привести к ошибкам вне памяти. Подписчику нужен способ обратного обмена данными с издателем, чтобы замедлить работу, когда он не может оставаться в курсе. По умолчанию при вызове subscribe() на Flux, как показано в примере выше, подписчик запрашивает неограниченный поток данных, указывая издателю отправлять данные как можно быстрее. Это поведение не всегда желательно, и подписчику может потребоваться контролировать скорость публикации через "обратное давление". Обратное давление позволяет подписчику контролировать поток данных. Подписчик запрашивает ограниченное количество элементов данных, которые они могут обрабатывать. После завершения обработки этих элементов подписчик может запросить больше. Используя обратное давление, вы можете преобразовать push-модель передачи данных в push-pull модель.

В следующем примере показано, как управлять скоростью получения событий потребителем Центров событий:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.request(1); // request another event when the subscriber is ready
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Когда подписчик впервые устанавливает связь с издателем, издатель передает подписчику экземпляр Subscription, который управляет состоянием передачи данных. Это Subscription среда, с помощью которой подписчик может применить противодавление, вызвав request(), чтобы указать, сколько дополнительных элементов данных он может обрабатывать.

Если подписчик запрашивает несколько элементов данных при каждом вызове onNext(), request(10) например, издатель отправляет следующие 10 элементов немедленно, если они доступны или когда они становятся доступными. Эти элементы накапливаются в буфере у подписчика, и так как каждый onNext() вызов запросит еще 10, задержка продолжает расти, пока либо у издателя заканчиваются элементы данных для отправки, либо буфер подписчика переполняется, что приводит к ошибкам нехватки памяти.

Отмена подписки

Подписка управляет состоянием передачи данных между издателем и подписчиком. Подписка активна, пока издатель не завершит передачу всех данных подписчику или подписчику больше не заинтересован в получении данных. Существует несколько способов отмены подписки, как показано ниже.

В следующем примере подписка отменяется путем удаления подписчика:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

Disposable disposable = asyncClient.receive().subscribe(
    partitionEvent -> {
        Long num = partitionEvent.getData().getSequenceNumber()
        System.out.println("Sequence number of received event: " + num);
    },
    ex -> System.out.println("Error receiving events: " + ex.getMessage()),
    () -> System.out.println("Successfully completed receiving all events"));

// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();

В следующем примере подписка отменяется путем вызова метода cancel() на Subscription:

EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
    .connectionString("<your connection string>")
    .consumerGroup("<your consumer group>")
    .buildAsyncConsumerClient();

asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1); // request 1 data element to begin with
    }

    @Override
    public void onNext(PartitionEvent partitionEvent) {
        System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
        this.subscription.cancel(); // Cancels the subscription. No further event is received.
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Error receiving events: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Successfully completed receiving all events")
    }
});

Заключение

Потоки являются дорогостоящими ресурсами, которые не следует тратить на ожидание ответов от удаленных вызовов служб. По мере увеличения внедрения архитектур микрослужб необходимость масштабирования и эффективного использования ресурсов становится жизненно важной. Асинхронные API являются благоприятными при наличии операций, связанных с сетью. Пакет SDK Azure для Java предлагает широкий набор API для асинхронных операций, которые помогут максимально повысить эффективность системных ресурсов. Настоятельно рекомендуем попробовать наши асинхронные клиенты.

См. Какой оператор мне нужен? в Справочнике по Реактору 3 для получения дополнительной информации об операторах, которые лучше всего подходят для ваших конкретных задач.

Дальнейшие шаги

Теперь, когда вы лучше понимаете различные концепции асинхронного программирования, важно узнать, как итерировать результаты. Дополнительные сведения о лучших стратегиях итерации и работе с разбиением на страницы см. в статье "Пагинация и итерация в SDK Azure для Java".