Интеграция поддержки Apache Kafka Connect в Центрах событий Azure
Apache Kafka Connect — это платформа для подключения, импорта и экспорта данных в любую внешнюю систему или из нее, например MySQL, HDFS и файловую систему, через кластер Kafka. В этом руководстве описано использование платформы Kafka Connect с Центрами событий.
В этом руководстве описана интеграция Kafka Connect с Центрами событий и развертывание соединителей FileStreamSource и FileStreamSink. Хотя эти соединители не предназначены для использования в рабочей среде, они демонстрируют комплексный сценарий Kafka Connect, в котором Центры событий Azure выступает в качестве брокера Kafka.
Этот пример можно найти на сайте GitHub.
При работе с этим руководством вы выполните следующие задачи:
- Создание пространства имен Центров событий
- Клонирование примера проекта
- Настройка Kafka Connect для Центров событий
- Выполнение Kafka Connect
- Создание соединителей
Предварительные требования
Для работы с этим пошаговым руководством выполните следующие предварительные требования:
- Подписка Azure. Если ее нет, создайте бесплатную учетную запись.
- Git;
- Linux/MacOS
- Выпуск Kafka (версии 1.1.1, Scala версии 2.11) доступен на сайте kafka.apache.org
- Ознакомьтесь со статьей Центры событий Azure для Apache Kafka (предварительная версия).
Создание пространства имен Центров событий
Для отправки и получения данных из любой службы Центров событий требуется пространство имен Центров событий. См. раздел Создание концентратора событий для получения инструкций по созданию пространства имен и концентратора событий. Получите строку подключения Центров событий и полное доменное имя (FQDN) для последующего использования. Инструкции см. в статье Get an Event Hubs connection string (Получение строки подключения для Центров событий).
Клонирование примера проекта
Клонируйте репозиторий Центров событий Azure и перейдите к руководствам или вложенной папке подключения:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git cd azure-event-hubs-for-kafka/tutorials/connect
Настройка Kafka Connect для Центров событий
Минимальная перенастройка необходима при перенаправлении пропускной способности Kafka в Центры событий. Следующий пример connect-distributed.properties показывает, как настроить Connect для проверки подлинности и обмена данных с конечной точкой Kafka в Центрах событий:
# e.g. namespace.servicebus.windows.net:9093 bootstrap.servers=:9093 group.id=connect-cluster-group # connect internal topic names, auto-created if not exists config.storage.topic=connect-cluster-configs offset.storage.topic=connect-cluster-offsets status.storage.topic=connect-cluster-status # internal topic replication factors - auto 3x replication in Azure Storage config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 rest.advertised.host.name=connect offset.flush.interval.ms=10000 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # required EH Kafka security settings security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password=""; producer.security.protocol=SASL_SSL producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password=""; consumer.security.protocol=SASL_SSL consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password=""; plugin.path=/libs # path to the libs directory within the Kafka release
Замените строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=»$ConnectionString» password=»Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX»;
Выполнение Kafka Connect
На этом этапе рабочая роль Kafka Connect запускается локально в распределенном режиме с помощью Центров событий для поддержания состояния кластера.
- Сохраните файл connect-distributed.properties , указанный выше, локально. Не забудьте заменить все значения в фигурных скобках.
- Перейдите к расположению выпуска Kafka на своем компьютере.
- Выполните ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties . REST API рабочей роли Connect готов к взаимодействию при появлении ‘INFO Finished starting connectors and tasks’ .
Kafka Connect использует API Kafka AdminClient для автоматического создания разделов с рекомендуемыми конфигурациями, включая сжатие. Быстрая проверка пространства имен на портале Azure показывает, что внутренние разделы рабочей роли Connect были созданы автоматически.
Внутренние разделы Kafka Connect должны использовать сжатия. Команда Центров событий не несет ответственности за исправление неправильных конфигураций в случае ненадлежащей настройки внутренних разделов Connect.
Создание соединителей
В этом разделе показано использование соединителей FileStreamSource и FileStreamSink.
-
Создайте каталог для файлов входных и выходных данных.
mkdir ~/connect-quickstart
seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
curl -s -X POST -H "Content-Type: application/json" --data '/connect-quickstart/input.txt">>' http://localhost:8083/connectors
curl -s http://localhost:8083/connectors/file-source/status
curl -X POST -H "Content-Type: application/json" --data '/connect-quickstart/output.txt">>' http://localhost:8083/connectors
curl -s http://localhost:8083/connectors/file-sink/status
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Очистка
Kafka Connect создает разделы Центров событий для хранения конфигураций, смещений и состояния, которые сохраняются даже после отключения кластера Connect. Если этого не требуется, рекомендуется удалить эти разделы. Вы также можете удалить connect-quickstart Центры событий, созданные в рамках этого пошагового руководства.
Дальнейшие действия
Дополнительные сведения о концентраторах событий для Kafka см. в следующих статьях:
- Зеркальное отображение брокера Kafka в концентраторе событий
- Подключение Apache Spark к концентратору событий
- Подключение Apache Flink к концентратору событий
- Migrating to Azure Event Hubs for Apache Kafka Ecosystems (Переход в Центры событий Azure для экосистем Apache Kafka)
- Подключение Akka Streams к концентратору событий
- Руководство разработчика Apache Kafka для концентраторов событий Azure
Kafka Connect¶
Kafka Connect – компонент Apache Kafka с открытым исходным кодом, является основой для подключения Kafka к внешним системам, таким как базы данных, хранилища key-value, поисковые индексы и файловые системы. С Kafka Connect можно использовать существующие реализации коннекторов для перемещения данных в сервис Kafka и из него:
- Source Connector – принимает базы данных и обновляет таблицы потоков для топиков Kafka. Также собирает метрики со всех серверов приложений в топики Kafka, делая данные доступными для потоковой обработки с низкой задержкой;
- Sink Connector – доставляет данные из топиков Kafka во вторичные индексы, такие как Elasticsearch, или в пакетные системы для автономного анализа, такие как Hadoop.
Kafka Connect ориентирован на потоковую передачу данных из сервиса Kafka и в него, что упрощает написание высококачественных, надежных и высокопроизводительных плагинов. Это также позволяет фреймворку давать гарантии, которые трудно достичь с помощью других структур. Kafka Connect является неотъемлемым компонентом конвейера ETL в сочетании с сервисом Kafka и потоковой обработкой.
Kafka Connect может работать либо как автономный процесс для выполнения заданий на одной машине (например, сбор журналов), либо как распределенный, масштабируемый, отказоустойчивый сервис, поддерживающий всю структуру. Это позволяет сократить масштаб до разработки, тестирования и небольших продуктовых развертываний с низким барьером для входа и низкими эксплуатационными накладными расходами, а также увеличить масштаб поддержки конвейера данных большой организации.
Основные преимущества использования Kafka Connect:
- Data Centric Pipeline – использование значимых абстракций данных для извлечения или передачи данных в Kafka;
- Flexibility and Scalability (гибкость и масштабируемость) – работа с потоковыми и пакетно-ориентированными системами на одном узле или масштабирование до сервиса по всей ширине организации;
- Reusability and Extensibility (повторное использование и расширяемость) – использование существующих коннекторов и возможность расширения их для адаптации к конкретным потребностям и сокращения времени на разработку.
Connectors & Tasks¶
Копирование данных между сервисом Kafka и сторонней системой осуществляется посредством создаваемых пользователями инстансов Kafka Connectors. Коннекторы бывают двух видов: SourceConnectors – импортируют данные из другой системы, и SinkConnectors – экспортируют данные в другую систему. Например, JDBCSourceConnector импортирует реляционную базу данных в Kafka, а HDFSSinkConnector экспортирует содержимое топика Kafka в файлы HDFS.
Реализации класса Connector не выполняют копирование данных самостоятельно: их конфигурация описывает набор данных для копирования, и Connector отвечает за разбиение этого задания на набор задач – Tasks, которые могут быть распределены между объектами Kafka Connect. Tasks также бывают двух видов: SourceTask и SinkTask. При необходимости реализация класса Connector может отслеживать изменения данных внешних систем и запрашивать реконфигурацию задачи.
С назначением данных, которые должны быть скопированы, каждая задача Task должна скопировать свое подмножество данных в сервис Kafka или из него. Данные, которые копирует коннектор, должны быть представлены как партиционированный поток, аналогично модели топика Kafka, где каждая партиция представляет собой упорядоченную последовательность записей со смещениями. Каждой задаче назначается подмножество партиций для обработки. Порой это сопоставление очевидно: каждый файл в наборе файлов журнала можно считать партицией, каждую строку в файле – записью, а смещения – просто позициии в файле. В иных случаях сопоставление с моделью требует больше усилий: коннектор JDBC может сопоставить каждую таблицу с партицией, но смещение менее ясно. Один из возможных вариантов сопоставления это использовать в качестве смещения последнюю запрашиваемую отметку времени при генерации запросов.
Source Connector, создавший две задачи, которые копируют данные из входных партиций и записывают в сервис Kafka, приведен на Рис.102. .

Рис. 102. Пример реализации Source Connector
Partitions & Records¶
Каждая партиция представляет собой упорядоченную последовательность записей ключ-значение, где и ключи, и значения могут иметь сложные структуры. Поддерживаются многие примитивные типы, а также массивы, структуры и вложенные структуры данных. Для большинства типов можно напрямую использовать стандартные типы Java, такие как java.lang.Integer, java.lang.Map и java.lang.Collection. Для структурированных записей следует использовать класс Struct.
На Рис.103. представлен партиционированный поток: модель данных, в которой коннекторы сопоставляют все системы source и sink. Каждая запись содержит ключи и значения (со схемами), идентификатор партиции и смещения в ней.

Рис. 103. Пример партиционированного потока
Для отслеживания структуры и совместимости записей в партициях схемы (Schemas) могут быть включены в каждую запись. Поскольку схемы обычно генерируются “на лету” на основе источника данных, класс SchemaBuilder включен, что делает их построение очень простым.
| Schemas: | Определение абстрактного типа данных. Типы данных могут быть примитивными типами (целочисленные типы, типы с плавающей запятой, логические, строки и байты) или сложными типами (типизированные массивы, карты с одной схемой ключей и схемами значений, а также структурами, которые имеют фиксированный набор имен полей, каждый из которых имеет схема связанных значений). Любой тип может быть указан как необязательный, что позволяет его опускать (в результате чего значения отсутствуют) и может указывать значение по умолчанию. |
|---|
Такой формат данных среды выполнения не предполагает какого-либо конкретного формата сериализации; это преобразование осуществляется с помощью Converter, которые обрабатывают формат времени выполнения org.apache.kafka.connect.data и сериализованные данные byte[].
| Converter: | Интерфейс конвертера обеспечивает поддержку перевода между форматом данных выполнения Kafka Connect и byte[]. Внутренне это включает промежуточный шаг к формату, используемому слоем сериализации (например, JsonNode, GenericRecord, Message). |
|---|
В дополнение к ключу и значению записи имеют идентификаторы партиций и смещения, которые используются фреймворком для периодической фиксации смещений обработанных данных. В случае сбоя обработка может возобновиться с последнего зафиксированного смещения, что позволяет избежать повторной обработки и дублирования событий.
© Copyright 2022, Arenadata.io.
Зачем вам Kafka Connect: разбираем на примере интеграции Elasticsearch с Кафка
Сегодня поговорим, как связать Elasticsearch с Apache Kafka: рассмотрим, зачем нужны коннекторы, когда их следует использовать и какие особенности популярных в Big Data форматов JSON и AVRO стоит при этом учитывать. Также читайте в нашей статье, что такое Logstash Shipper, чем он отличается от FileBeat и при чем тут Kafka Connect.
Когда и зачем нужна интеграция Elasticsearch с Apache Kafka: 3 практических примера
Напомним, в ELK Stack компонент Logstash отвечает за сбор, преобразование и сохранение в общем хранилище данных из разных файлов, СУБД, логов и прочих мест в режиме реального времени. Это похоже на основное назначение Apache Kafka — распределенной стриминговой платформы, которая собирает и агрегирует большие данные разных форматов из множества источников. Возникает вопрос: зачем добавлять Kafka в ELK-стек, используя дополнительное средство сбора потоковых данных? Здесь можно выделить несколько сценариев [1]:
· временная остановка кластера Elasticsearch (ES) с целью обновления версии или внесения других изменений. Чтобы не потерять данные, приходящие из разных систем, Kafka можно использовать в качестве временного буфера для сохранения информации. Когда ELK-кластер возобновит работу, Logstash продолжит собирать, преобразовывать и отправлять в ES пропущенные данные, считывая их из топиков Apache Kafka с того места, где случился останов.
· Выравнивание пропускной способности компонентов ELK, чтобы, например, при внезапном увеличении объема данных ES-кластер не «захлебнулся» от высокой скорости поступления новой информации. На практике такая ситуация может возникнуть в случае ошибки в обновленном Big Data приложении или аномальном непредвиденном росте пользовательской активности.
Таким образом, добавление Кафка в Эластик-стек позволяет не зависеть от мониторинга событий. Совместное использование FileBeat с Kafka дает возможность создавать разные топики для каждого сервиса, улучшая «реактивность» всей Big Data системы. Напомним, FileBeat — это легковесный серверный агент для отправки определенных типов рабочих данных в Elasticsearch. Он занимает использует гораздо меньше системных ресурсов, чем Logstash. Хотя функциональные возможности Logstash по вводу, фильтрации и выводу для сбора, обогащения и преобразования данных из различных источников гораздо больше, чем у FileBeat. Можно сказать, что Logstash «дороже», чем FileBeat.
Возвращаясь к преимуществам включения Kafka в ELK Stack, отметим, что любая команда разработчиков или администраторов Big Data систем может подписаться на топики Kafka для сбора метрик или выдачи сигналов тревоги, уведомляющих о случившихся или потенциальных авариях. Это весьма востребована, поскольку на практике, в основном, Elasticsearch с Kibana используются для информирования, а не для оповещения или мониторинга [2].
Например, в этом случае можно использовать 2 экземпляра Logstash — для отправки и индексации данных соответственно. Отправитель (Logstash Shipper) будет немедленно сохранять данные в топиках Kafka. А индексатор (Logstash Indexer) считывает из Кафка данные со своей собственной скоростью, выполняя при этом дорогостоящие преобразования, включая поиск и индексацию в Elasticsearch. Также FileBeat может отслеживать файлы и отправлять их в Kafka через приемник Logstash [1].
Что такое Kafka Connect и как это работает
Разумеется, есть еще множество других кейсов по совместному использованию Kafka с компонентами ELK Stack. Причем Apache Kafka также может выступать приемником данных из Elasticsearch. Для всех вариантов отлично подходит Kafka Connect — компонент Кафка, который обеспечивает потоковую интеграцию с внешними хранилищами данных, включая JDBC, Elasticsearch, IBM MQ, S3, BigQuery и другие. Наличие расширенного API позволяет дополнить Kafka Connect собственными коннекторами. А REST API облегчает их настройку и управления. Модульная природа Kafka Connect делает возможным гибко удовлетворить все интеграционные потребности [3]:
· коннекторы (connectors) — это файлы JAR, которые определяют, как интегрироваться с внешним хранилищем данных;
· конвертеры (converters) используются для сериализации и десериализации данных;
· преобразования (transforms) отвечают за дополнительную обработку сообщений «на лету».
Обычно для каждой внешней системы используются свой коннектор. В частности, за интеграцию Apache Kafka с ELK отвечает коннектор Kafka Connect Elasticsearch. Он позволяет перемещать данные, записывая их из топика Кафка в индекс Elasticsearch с приведением к одному типу. Например, в кейсах по аналитике больших данных каждое сообщение в Kafka рассматривается как событие, которое коннектор идентифицирует по топику (topic), разделу (partition) и смещению (offset), чтобы преобразовать в уникальные ES-документы. При использовании Elasticsearch в качестве key-value хранилища ключи из сообщений Kafka будут идентификаторами ES-документов, гарантируя упорядоченное обновления. Оба рассмотренные варианта использования поддерживают идемпотентную семантику записи Elasticsearch, т.е. точно однократную доставку (exactly once). Подробнее о гарантиях доставки сообщений в Apache Kafka мы рассказывали здесь.
Также стоит упомянуть про маппирование или отображение данных, которое определяет, как документ и содержащиеся в нем поля хранятся и индексируются в ES. Пользователи могут явно определять сопоставления типов в индексах. Если отображение не задано явно, Elasticsearch может определять имена и типы полей из данных. Однако такие типы, как метка времени (timestamp) и десятичная дробь, могут быть выведены некорректно. Kafka Connect Elasticsearch позволяет выводить сопоставления из схем сообщений Кафка. Таким образом, благодаря эволюционной поддержке схем данных, коннектор может обрабатывать изменения схемы в обратной, прямой и полностью совместимой конфигурации. В ряде случаев доступны некоторые несовместимые изменения схемы, например, конвертация поля из целого числа в строку [4].
5 особенностей интеграции ES с Кафка, о которых нужно знать
Следует помнить несколько важных моментов, при использовании Kafka Connect Elasticsearch [5]:
· ·· данные сериализуются на основе значений по умолчанию, указанных в ваших worker’ах Kafka Connect, например, Avro. Если нужно что-то другое, следует вручную добавить переопределения.
· · При передаче данных в Elasticsearch из KSQL (Kafka SQL), необходимо установить для преобразователя ключей значение STRING. Пока, все что относится к поддержке ключей выражается так: “Key.converter”: “org.apache.kafka.connect.storage.StringConverter”
· · Коннектор автоматически изменяет имена топиков в верхнем регистре на имена индексов в нижнем регистре в Elasticsearch, вручную сопоставлять не нужно.
· · Можно использовать регулярные выражения для сопоставления нескольких топиков, определив themes.regex в конфигурации топика.
· · отдельно стоит сказать про параметр schema.ignore. Если он равен True, можно просто передать JSON-документ в Elasticsearch — сопоставление типов полей выполнится автоматически. Это актуально, если в данных отсутствует явная схема, например, формат JSON, CSV и пр. При использовании формата AVRO или JSON со встроенной схемой следует установить schema.ignore = false. Это позволит Kafka Connect явно создать сопоставление типов в Elasticsearch при предаче данных. На практике в большинстве случаев используется schema.ignore = true, что позволяет передать данные, не вдаваясь в технические подробности.
В следующей статье мы продолжим разговор про коннекторы Apache Kafka Connect и рассмотрим наиболее распространенные ошибки интеграции с Elasticsearch. А практические детали по связыванию Apache Kafka с другими внешними источниками для потоковой обработки больших данных вы узнаете на практических курсах по Кафка в нашем лицензированном учебном центре повышения квалификации и обучения руководителей и ИТ-специалистов (разработчиков, архитекторов, инженеров и аналитиков Big Data) в Москве:
Источники
Введение в коннекторы Kafka
Apache Kafka® — это распределенная платформа потоковой передачи. В предыдущем уроке мы обсуждали, как реализовать потребителей и производителей Kafka с помощью Spring .
В этом руководстве мы узнаем, как использовать коннекторы Kafka.
- Различные типы коннекторов Kafka
- Функции и режимы Kafka Connect
- Конфигурация соединителей с использованием файлов свойств, а также REST API
2. Основы Kafka Connect и Kafka Connectors
Kafka Connect — это фреймворк для подключения Kafka к внешним системам , таким как базы данных, хранилища ключей и значений, поисковые индексы и файловые системы, с помощью так называемых коннекторов .
Kafka Connectors — это готовые к использованию компоненты, которые могут помочь нам импортировать данные из внешних систем в темы Kafka и экспортировать данные из тем Kafka во внешние системы . Мы можем использовать существующие реализации соединителей для общих источников данных и приемников или реализовать собственные соединители.
Исходный коннектор собирает данные из системы. Исходные системы могут быть целыми базами данных, таблицами потоков или брокерами сообщений. Исходный коннектор также может собирать метрики с серверов приложений в темы Kafka, делая данные доступными для потоковой обработки с малой задержкой.
Соединитель приемника доставляет данные из разделов Kafka в другие системы, которые могут быть индексами, такими как Elasticsearch, пакетными системами, такими как Hadoop, или любой другой базой данных.
Некоторые соединители поддерживаются сообществом, а другие поддерживаются Confluent или его партнерами. Действительно, мы можем найти разъемы для большинства популярных систем, таких как S3, JDBC и Cassandra, и это лишь некоторые из них.
3. Особенности
Возможности Kafka Connect включают в себя:
- Фреймворк для подключения внешних систем к Kafka — упрощает разработку, развертывание и управление коннекторами.
- Распределенный и автономный режимы — это помогает нам развертывать большие кластеры, используя распределенный характер Kafka, а также настройки для разработки, тестирования и небольших производственных развертываний.
- Интерфейс REST — мы можем управлять соединителями с помощью REST API.
- Автоматическое управление смещением — Kafka Connect помогает нам обрабатыватьпроцесс фиксации смещения, что избавляет нас от необходимости вручную реализовывать эту подверженную ошибкам часть разработки коннектора.
- Распределенная и масштабируемая по умолчанию — Kafka Connect использует существующий протокол управления группами; мы можем добавить больше рабочих для масштабирования кластера Kafka Connect.
- Потоковая и пакетная интеграция — Kafka Connect — идеальное решение для объединения систем потоковой и пакетной передачи данных в сочетании с существующими возможностями Kafka.
- Преобразования — они позволяют нам вносить простые и легкие изменения в отдельные сообщения.
4. Настройка
Вместо использования простого дистрибутива Kafka мы загрузим Confluent Platform, дистрибутив Kafka, предоставленный Confluent, Inc., компанией, стоящей за Kafka. Confluent Platform поставляется с некоторыми дополнительными инструментами и клиентами по сравнению с простой Kafka, а также некоторыми дополнительными готовыми соединителями.
Для нашего случая достаточно версии с открытым исходным кодом, которую можно найти на сайте Confluent .
5. Быстрый старт Kafka Connect
Для начала мы обсудим принцип Kafka Connect, используя его самые основные соединители, которые являются соединителем источника файла и соединителем приемника файла .
Удобно, что Confluent Platform поставляется с обоими этими коннекторами, а также с эталонными конфигурациями.
5.1. Конфигурация исходного коннектора
Для исходного соединителя эталонная конфигурация доступна по адресу $CONFLUENT_HOME/etc/kafka/connect-file-source.properties :
name=local-file-source connector.class=FileStreamSource tasks.max=1 topic=connect-test file=test.txt
Эта конфигурация имеет некоторые свойства, общие для всех исходных соединителей:
- имя — указанное пользователем имя экземпляра соединителя.
- Connector.class указывает класс реализации, в основном вид соединителя
- tasks.max указывает, сколько экземпляров нашего исходного коннектора должно работать параллельно, и
- тема определяет тему, в которую коннектор должен отправлять выходные данные
В этом случае у нас также есть атрибут, специфичный для коннектора:
- file определяет файл, из которого коннектор должен считывать ввод
Чтобы это работало, давайте создадим базовый файл с некоторым содержимым:
echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt
Обратите внимание, что рабочий каталог — $CONFLUENT_HOME.
5.2. Конфигурация разъема приемника
Для нашего соединителя приемника мы будем использовать эталонную конфигурацию в $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties :
name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
Логически он содержит точно такие же параметры, хотя на этот раз коннектор.класс указывает реализацию коннектора приемника, а файл — это место, куда коннектор должен записывать содержимое.
5.3. Конфигурация рабочего
Наконец, нам нужно настроить обработчик Connect, который объединит два наших коннектора и выполнит работу по чтению из коннектора-источника и записи в коннектор-приемник.
Для этого мы можем использовать $CONFLUENT_HOME/etc/kafka/connect-standalone.properties :
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 plugin.path=/share/java
Обратите внимание, что plugin.path может содержать список путей, по которым доступны реализации коннектора.
Поскольку мы будем использовать коннекторы в комплекте с Kafka, мы можем установить для plugin.path значение $CONFLUENT_HOME/share/java . При работе с Windows может потребоваться указать здесь абсолютный путь.
Для остальных параметров мы можем оставить значения по умолчанию:
- bootstrap.servers содержит адреса брокеров Kafka
- key.converter и value.converter определяют классы-конвертеры, которые сериализуют и десериализуют данные по мере их поступления из источника в Kafka, а затем из Kafka в приемник.
- key.converter.schemas.enable и value.converter.schemas.enable — это настройки, специфичные для конвертера .
- offset.storage.file.filename — самый важный параметр при работе Connect в автономном режиме: он определяет, где Connect должен хранить данные смещения.
- offset.flush.interval.ms определяет интервал, с которым воркер пытается зафиксировать смещения для задач.
И список параметров довольно зрелый, поэтому ознакомьтесь с официальной документацией для получения полного списка.
5.4. Kafka Connect в автономном режиме
И с этим мы можем начать нашу первую настройку коннектора:
$CONFLUENT_HOME/bin/connect-standalone \ $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \ $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \ $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties
Во-первых, мы можем проверить содержимое темы с помощью командной строки:
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
Как мы видим, исходный коннектор взял данные из файла test.txt , преобразовал их в JSON и отправил в Kafka:
"schema":"type":"string","optional":false>,"payload":"foo"> "schema":"type":"string","optional":false>,"payload":"bar">
И, если мы посмотрим на папку $CONFLUENT_HOME , мы увидим, что здесь был создан файл test.sink.txt :
cat $CONFLUENT_HOME/test.sink.txt foo bar
Поскольку коннектор приемника извлекает значение из атрибута полезной нагрузки и записывает его в файл назначения, данные в test.sink.txt содержат содержимое исходного файла test.txt .
Теперь добавим больше строк в test.txt.
Когда мы это делаем, мы видим, что коннектор источника автоматически обнаруживает эти изменения.
Нам нужно только убедиться, что в конце вставлена новая строка, иначе исходный коннектор не будет учитывать последнюю строку.
На этом этапе давайте остановим процесс Connect, так как мы запустим Connect в распределенном режиме несколькими строками.
6. REST API Connect
До сих пор мы выполняли все настройки, передавая файлы свойств через командную строку. Однако, поскольку Connect предназначен для работы в качестве службы, также доступен REST API.
По умолчанию он доступен по адресу http://localhost:8083 . Несколько конечных точек:
- GET /connectors — возвращает список всех используемых коннекторов.
- GET /connectors/ — возвращает сведения о конкретном соединителе.
- POST /connectors — создает новый коннектор; тело запроса должно быть объектом JSON, содержащим поле строкового имени и поле конфигурации объекта с параметрами конфигурации коннектора.
- GET /connectors//status — возвращает текущий статус коннектора, в том числе если он запущен, сбой или приостановлен, к какому рабочему процессу он назначен, информацию об ошибке, если он вышел из строя, и состояние всех его задач.
- DELETE /connectors/ — удаляет коннектор, изящно останавливая все задачи и удаляя его конфигурацию.
- GET /connector-plugins — возвращает список плагинов коннекторов, установленных в кластере Kafka Connect.
Официальная документация предоставляет список со всеми конечными точками.
В следующем разделе мы будем использовать REST API для создания новых соединителей.
7. Kafka Connect в распределенном режиме
Автономный режим идеально подходит для разработки и тестирования, а также для небольших установок. Однако, если мы хотим в полной мере использовать распределенную природу Kafka, мы должны запустить Connect в распределенном режиме.
При этом настройки коннектора и метаданные сохраняются в темах Kafka, а не в файловой системе. В результате рабочие узлы действительно не имеют состояния.
7.1. Запуск подключения
Эталонную конфигурацию для распределенного режима можно найти в $CONFLUENT_HOME /etc/kafka/connect-distributed.properties.
Параметры в основном такие же, как и для автономного режима. Отличий всего несколько:
- group.id определяет имя кластерной группы Connect. Значение должно отличаться от любого идентификатора группы потребителей.
- offset.storage.topic , config.storage.topic и status.storage.topic определяют темы для этих настроек. Для каждой темы мы также можем определить коэффициент репликации
Опять же, официальная документация предоставляет список со всеми параметрами.
Мы можем запустить Connect в распределенном режиме следующим образом:
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
7.2. Добавление соединителей с помощью REST API
Теперь, по сравнению с автономной командой запуска, мы не передавали никаких конфигураций соединителя в качестве аргументов. Вместо этого мы должны создать коннекторы с помощью REST API.
Чтобы настроить наш предыдущий пример, мы должны отправить два запроса POST на адрес http://localhost:8083/connectors, содержащие следующие структуры JSON.
Во-первых, нам нужно создать тело для исходного коннектора POST в виде файла JSON. Здесь мы назовем его connect-file-source.json :
"name": "local-file-source", "config": "connector.class": "FileStreamSource", "tasks.max": 1, "file": "test-distributed.txt", "topic": "connect-distributed" > >
Обратите внимание, как это выглядит очень похоже на эталонный файл конфигурации, который мы использовали в первый раз.
И затем мы POST это:
curl -d @"$CONFLUENT_HOME/connect-file-source.json" \ -H "Content-Type: application/json" \ -X POST http://localhost:8083/connectors
Затем мы сделаем то же самое для соединителя приемника, вызвав файл connect-file-sink.json :
"name": "local-file-sink", "config": "connector.class": "FileStreamSink", "tasks.max": 1, "file": "test-distributed.sink.txt", "topics": "connect-distributed" > >
И выполните POST, как раньше:
curl -d @$CONFLUENT_HOME/connect-file-sink.json \ -H "Content-Type: application/json" \ -X POST http://localhost:8083/connectors
При необходимости мы можем убедиться, что эта настройка работает правильно:
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning "schema":"type":"string","optional":false>,"payload":"foo"> "schema":"type":"string","optional":false>,"payload":"bar">
И, если мы посмотрим на папку $CONFLUENT_HOME , мы увидим, что здесь был создан файл test-distributed.sink.txt :
cat $CONFLUENT_HOME/test-distributed.sink.txt foo bar
После того, как мы протестировали распределенную установку, давайте очистим ее, удалив два соединителя:
curl -X DELETE http://localhost:8083/connectors/local-file-source curl -X DELETE http://localhost:8083/connectors/local-file-sink
8. Преобразование данных
8.1. Поддерживаемые преобразования
Преобразования позволяют нам вносить простые и легкие изменения в отдельные сообщения.
Kafka Connect поддерживает следующие встроенные преобразования:
- InsertField — добавьте поле, используя либо статические данные, либо метаданные записи.
- ReplaceField — фильтровать или переименовывать поля
- MaskField — замените поле допустимым нулевым значением для типа (например, нулем или пустой строкой).
- HoistField — Оберните все событие как одно поле внутри структуры или карты.
- ExtractField — извлечение определенного поля из структуры и карты и включение в результаты только этого поля.
- SetSchemaMetadata — изменение имени или версии схемы.
- TimestampRouter — измените тему записи на основе исходной темы и временной метки.
- RegexRouter — изменение темы записи на основе исходной темы, строки замены и регулярного выражения.
Преобразование настраивается с использованием следующих параметров:
- transforms — разделенный запятыми список псевдонимов для преобразований.
- transforms.$alias.type — Имя класса для преобразования
- transforms.$alias.$transformationSpecificConfig — Конфигурация для соответствующего преобразования
8.2. Применение трансформатора
Чтобы протестировать некоторые функции преобразования, давайте настроим следующие два преобразования:
- Во-первых, давайте обернем все сообщение как структуру JSON.
- После этого добавим поле в эту структуру
Прежде чем применять наши преобразования, мы должны настроить Connect для использования JSON без схемы, изменив connect-distributed.properties :
key.converter.schemas.enable=false value.converter.schemas.enable=false
После этого нам нужно перезапустить Connect, опять же в распределенном режиме:
$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
Опять же, нам нужно создать тело для исходного коннектора POST в виде файла JSON. Здесь мы назовем его connect-file-source-transform.json.
Помимо уже известных параметров, добавим несколько строк для двух необходимых преобразований:
"name": "local-file-source", "config": "connector.class": "FileStreamSource", "tasks.max": 1, "file": "test-transformation.txt", "topic": "connect-transformation", "transforms": "MakeMap,InsertSource", "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value", "transforms.MakeMap.field": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value": "test-file-source" > >
После этого давайте выполним POST:
curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \ -H "Content-Type: application/json" \ -X POST http://localhost:8083/connectors
Давайте напишем несколько строк в наш test-transformation.txt :
Foo Bar
Если мы теперь проверим тему подключения-преобразования , мы должны получить следующие строки:
"line":"Foo","data_source":"test-file-source"> "line":"Bar","data_source":"test-file-source">
9. Использование готовых коннекторов
После использования этих простых соединителей давайте посмотрим на более продвинутые, готовые к использованию соединители и способы их установки.
9.1. Где найти соединители
Готовые соединители доступны из разных источников:
- Несколько соединителей связаны с простым Apache Kafka (источник и приемник для файлов и консоли).
- Еще несколько коннекторов входят в состав Confluent Platform (ElasticSearch, HDFS, JDBC и AWS S3).
- Также проверьте Confluent Hub , который является своего рода магазином приложений для коннекторов Kafka. Количество предлагаемых коннекторов постоянно растет:
- Коннекторы Confluent (разработаны, протестированы, задокументированы и полностью поддерживаются Confluent)
- Сертифицированные коннекторы (реализованы сторонней организацией и сертифицированы Confluent)
- Коннекторы, разработанные и поддерживаемые сообществом
- Кроме того, Confluent также предоставляет страницу соединителей с некоторыми соединителями, которые также доступны в концентраторе Confluent, а также с некоторыми другими соединителями сообщества.
- И, наконец, есть производители, которые предоставляют коннекторы как часть своего продукта. Например, Landoop предоставляет потоковую библиотеку под названием Lenses , которая также содержит набор из примерно 25 коннекторов с открытым исходным кодом (многие из них также перечислены в других местах).
9.2. Установка коннекторов из Confluent Hub
Корпоративная версия Confluent предоставляет сценарий для установки соединителей и других компонентов из Confluent Hub (сценарий не включен в версию с открытым исходным кодом). Если мы используем корпоративную версию, мы можем установить коннектор с помощью следующей команды:
$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview
9.3. Установка коннекторов вручную
Если нам нужен коннектор, которого нет в Confluent Hub, или если у нас есть версия Confluent с открытым исходным кодом, мы можем установить необходимые коннекторы вручную. Для этого нам нужно скачать и разархивировать коннектор, а также переместить включенные библиотеки в папку, указанную как plugin.path.
Для каждого коннектора в архиве должны быть две интересующие нас папки:
- Папка lib содержит jar коннектора, например, kafka-connect-mqtt-1.0.0-preview.jar , а также еще несколько jar, необходимых коннектору
- Папка etc содержит один или несколько эталонных файлов конфигурации .
Мы должны переместить папку lib в $CONFLUENT_HOME/share/java или любой другой путь, который мы указали как plugin.path в connect-standalone.properties и connect-distributed.properties . При этом также может иметь смысл переименовать папку во что-то осмысленное.
Мы можем использовать файлы конфигурации из etc , либо ссылаясь на них при запуске в автономном режиме, либо мы можем просто взять свойства и создать из них файл JSON.
10. Заключение
В этом руководстве мы рассмотрели, как установить и использовать Kafka Connect.
Мы рассмотрели типы разъемов, как исток, так и сток. Мы также рассмотрели некоторые функции и режимы, в которых может работать Connect. Затем мы рассмотрели трансформеры. И, наконец, мы узнали, где взять и как установить пользовательские коннекторы.
Как всегда, файлы конфигурации можно найти на GitHub .
- 1. Обзор
- 2. Основы Kafka Connect и Kafka Connectors
- 3. Особенности
- 4. Настройка
- 5. Быстрый старт Kafka Connect
- 5.1. Конфигурация исходного коннектора
- 5.2. Конфигурация разъема приемника
- 5.3. Конфигурация рабочего
- 5.4. Kafka Connect в автономном режиме
- 7.1. Запуск подключения
- 7.2. Добавление соединителей с помощью REST API
- 8.1. Поддерживаемые преобразования
- 8.2. Применение трансформатора
- 9.1. Где найти соединители
- 9.2. Установка коннекторов из Confluent Hub
- 9.3. Установка коннекторов вручную