Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
Azure Cosmos DB — это полностью управляемая служба базы данных NoSQL, предоставляемая корпорацией Майкрософт. Это позволяет легко создавать глобально распределенные и высокомасштабируемые приложения. В этом руководстве описывается процесс создания приложения Java, использующего базу данных Azure Cosmos DB для NoSQL и реализующий обработчик канала изменений для обработки данных в режиме реального времени. Приложение Java взаимодействует с Azure Cosmos DB для NoSQL с помощью пакета SDK Java для Azure Cosmos DB версии 4.
Внимание
Это руководство применимо только к пакету SDK Java для Azure Cosmos DB версии 4. Дополнительные сведения см. в заметках о выпуске пакета SDK Java для Azure Cosmos DB версии 4, репозитории Maven, обработчике процессора потока изменений в Azure Cosmos DB и руководстве по устранению неполадок для пакета SDK Java для Azure Cosmos DB версии 4. Если сейчас вы используете более раннюю версию, чем версия 4, руководство Перевод приложения на использование пакета средств разработки Java для Azure Cosmos DB версии 4 поможет вам обновить его до версии 4.
Предварительные условия
Учетная запись Azure Cosmos DB: ее можно создать из портала Azure или использовать эмулятор Azure Cosmos DB также.
Среда разработки Java. Убедитесь, что на компьютере установлен пакет средств разработки Java (JDK) с по крайней мере 8 версиями.
Пакет SDK Java для Azure Cosmos DB версии 4: предоставляет необходимые функции для взаимодействия с Azure Cosmos DB.
Общие сведения
Канал изменений Azure Cosmos DB предоставляет интерфейс, управляемый событиями, для активации действий в ответ на вставку документов и имеет множество применений.
Работа по управлению событиями канала изменений выполняется в основном встроенной в пакет SDK библиотекой обработчика канала изменений. Эта библиотека достаточно мощная, чтобы распределять события канала обновлений между несколькими рабочими, если это требуется. Вам достаточно лишь предоставить библиотеке канала изменений обратный вызов.
В этом простом примере приложения Java демонстрируется обработка данных в режиме реального времени с помощью Azure Cosmos DB и обработчика канала изменений. Приложение вставляет примеры документов в контейнер для потока, чтобы имитировать поток данных. Обработчик канала изменений, привязанный к контейнеру ленты, обрабатывает входящие изменения и записывает содержимое документа. Обработчик автоматически управляет арендами для параллельной обработки.
Исходный код
Вы можете клонировать репозиторий пакета SDK и найти этот пример в SampleChangeFeedProcessor.java
:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/
Пошаговое руководство
Настройте
ChangeFeedProcessorOptions
в приложении Java с помощью Azure Cosmos DB и пакета SDK Java для Azure Cosmos DB версии 4.ChangeFeedProcessorOptions
предоставляет основные параметры для управления поведением обработчика канала изменений в процессе обработки данных.options = new ChangeFeedProcessorOptions(); options.setStartFromBeginning(false); options.setLeasePrefix("myChangeFeedDeploymentUnit"); options.setFeedPollDelay(Duration.ofSeconds(5)); options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
Инициализировать ChangeFeedProcessor с соответствующими конфигурациями, включая имя узла, контейнер потока данных, контейнер аренды и логику обработки данных. Метод start() инициирует обработку данных, обеспечивая одновременную и в реальном времени обработку входящих изменений данных из контейнера потока.
logger.info("Start Change Feed Processor on worker (handles changes asynchronously)"); ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder() .hostName("SampleHost_1") .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleChanges(handleChanges()) .options(options) .buildChangeFeedProcessor(); changeFeedProcessorInstance.start() .subscribeOn(Schedulers.boundedElastic()) .subscribe();
Укажите делегат, обрабатывающий входящие изменения данных с помощью
handleChanges()
метода. Метод обрабатывает полученные документы JsonNode из потока изменений. У разработчика есть два варианта для обработки документа JsonNode, предоставленного фидом изменений. Один из вариантов — работать с документом в виде JsonNode. Это отлично, особенно если у вас нет единой модели данных для всех документов. Второй вариант — преобразование JsonNode в POJO с той же структурой, что и JsonNode. Затем вы можете работать с POJO.private static Consumer<List<JsonNode>> handleChanges() { return (List<JsonNode> docs) -> { logger.info("Start handleChanges()"); for (JsonNode document : docs) { try { //Change Feed hands the document to you in the form of a JsonNode //As a developer you have two options for handling the JsonNode document provided to you by Change Feed //One option is to operate on the document in the form of a JsonNode, as shown below. This is great //especially if you do not have a single uniform data model for all documents. logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter() .writeValueAsString(document)); //You can also transform the JsonNode to a POJO having the same structure as the JsonNode, //as shown below. Then you can operate on the POJO. CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class); logger.info("id: " + pojo_doc.getId()); } catch (JsonProcessingException e) { e.printStackTrace(); } } isWorkCompleted = true; logger.info("End handleChanges()"); }; }
Создайте и запустите приложение Java. Приложение запускает процессор изменений, вставляет примеры документов в контейнер потока и обрабатывает входящие изменения.
Заключение
В этом руководстве вы узнали, как создать приложение Java с помощью пакета SDK Java для Azure Cosmos DB версии 4 , использующего базу данных Azure Cosmos DB для NoSQL, и использовать обработчик канала изменений для обработки данных в режиме реального времени. Это приложение можно расширить для обработки более сложных вариантов использования и создания надежных, масштабируемых и глобально распределенных приложений с помощью Azure Cosmos DB.
Дополнительные ресурсы
Следующие шаги
Теперь вы можете перейти к дополнительным сведениям об оценке канала изменений в следующих статьях: