Перейти к содержимому

Zookeeper kafka что это

  • автор:

Что такое Zookeeper

Обновлено

Обновлено: 14.09.2022 Опубликовано: 30.11.2021

cервис-координатор, который позволяет обеспечить контроль синхронизации данных. Разработан на Java компанией Apache Software Foundation.

Как правило, Zookeeper используют для:

  • Создания распределенного сервера имен.
  • Определения лидера в других кластерных системах.
  • Создания распределенной конфигурации других приложений (например, Kafka, ClickHouse).

Приложение является открытым и может быть загружено с официального сайта.

Архитектурно, Zookeeper представляет клиент-серверное приложение и включает в себя следующие компоненты:

  1. Клиент — узел в кластере, который находится на связи с сервером, постоянно отправляя ему сигналы о своей работоспособности (heartbeat), который не должен превышать определенное значение (определяется параметром tickTime).
  2. Сервер — одна из нод кластера Zookeeper, которая проверяет состояние клиентов и перенаправляет запросы на один из рабочих серверов (клиентов).
  3. Лидер — главный сервер среди серверов кластера (ансамбля). Он отвечает за все операции записи.
  4. Последователь — все серверы в ансамбле, за исключением лидера. Они принимают от последнего информацию на изменение и записывают ее у себя.
  5. Ансамбль — кластер из серверов Zookeeper. Лидер определяется при старте служб, а если происходит разрыв соединения — на основе кворума. Поэтому минимальное количество нод в ансамбле является 3-м.

Приложение по умолчанию работает на следующих портах:

Порт Описание
2181 Порт для клиентских подключений.
2888 Репликация данных между нодами кластера.
3888 Выбор лидера между нодами кластера.

В качестве альтернатив выделяют:

Apache Kafka для чайников

Данная статья будет полезной тем, кто только начал знакомиться с микросервисной архитектурой и с сервисом Apache Kafka. Материал не претендует на подробный туториал, но поможет быстро начать работу с данной технологией. Я расскажу о том, как установить и настроить Kafka на Windows 10. Также мы создадим проект, используя Intellij IDEA и Spring Boot.

Зачем?

Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться. С Kafka всё обстоит точно также. Опишем ситуацию, в которой данная технология будет полезной. Если у вас монолитная архитектура приложения, то разумеется, никакая Kafka вам не нужна. Всё меняется с переходом на микросервисы. По сути, каждый микросервис – это отдельная программа, выполняющая ту или иную функцию, и которая может быть запущена независимо от других микросервисов. Микросервисы можно сравнить с сотрудниками в офисе, которые сидят за отдельными столами и независимо от коллег решают свою задачу. Работа такого распределённого коллектива немыслима без централизованной координации. Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы между собой. Именно эту проблему и призвана решить Apache Kafka для микросервисов.

Apache Kafka является брокером сообщений. С его помощью микросервисы могут взаимодействовать друг с другом, посылая и получая важную информацию. Возникает вопрос, почему не использовать для этих целей обычный POST – reqest, в теле которого можно передать нужные данные и таким же образом получить ответ? У такого подхода есть ряд очевидных минусов. Например, продюсер (сервис, отправляющий сообщение) может отправить данные только в виде response’а в ответ на запрос консьюмера (сервиса, получающего данные). Допустим, консьюмер отправляет POST – запрос, и продюсер отвечает на него. В это время консьюмер по каким-то причинам не может принять полученный ответ. Что будет с данными? Они будут потеряны. Консьюмеру снова придётся отправлять запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, и продюсер всё ещё готов принять request.

Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами. Не лишним будет напомнить, что бесперебойный и удобный обмен данными – одна из ключевых проблем, которую необходимо решить для обеспечения устойчивой работы микросервисной архитектуры.

Установка и настройка ZooKeeper и Apache Kafka на Windows 10

Первое, что надо знать для начала работы — это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это распределенный сервис конфигурирования и синхронизации, и это всё, что нам нужно знать о нём в данном контексте. Мы должны скачать, настроить и запустить его перед тем, как начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлен и настроен JRE.

Извлекаем из скаченного архива ZooKeeper`а файлы в какую-нибудь папку на диске.
В папке zookeeper с номером версии, находим папку conf и в ней файл “zoo_sample.cfg”.

Копируем его и меняем название копии на “zoo.cfg”. Открываем файл-копию и находим в нём строчку dataDir=/tmp/zookeeper. Прописываем в данной строчке полный путь к нашей папке zookeeper-х.х.х. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0

Теперь добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin;

Запускаем командную строку и пишем команду:

zkserver

Если всё сделано правильно, вы увидите примерно следующее.

Это означает, что ZooKeeper стартанул нормально. Переходим непосредственно к установке и настройке сервера Apache Kafka. Скачиваем свежую версию с официального сайта и извлекаем содержимое архива: kafka.apache.org/downloads

В папке с Kafka находим папку config, в ней находим файл server.properties и открываем его.

Находим строку log.dirs= /tmp/kafka-logs и указываем в ней путь, куда Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.

В этой же папке редактируем файл zookeeper.properties. Строчку dataDir=/tmp/zookeeper меняем на dataDir=c:/kafka/zookeeper-data, не забывая при этом, после имени диска указывать путь к своей папке с Kafka. Если вы всё сделали правильно, можно запускать ZooKeeper и Kafka.

Для кого-то может оказаться неприятной неожиданностью, что никакого GUI для управления Kafka нет. Возможно, это потому, что сервис рассчитан на суровых нёрдов, работающих исключительно с консолью. Так или иначе, для запуска кафки нам потребуется командная строка.

Сначала надо запустить ZooKeeper. В папке с кафкой находим папку bin/windows, в ней находим файл для запуска сервиса zookeeper-server-start.bat, кликаем по нему. Ничего не происходит? Так и должно быть. Открываем в этой папке консоль и пишем:

 start zookeeper-server-start.bat

Опять не работает? Это норма. Всё потому что zookeeper-server-start.bat для своей работы требует параметры, прописанные в файле zookeeper.properties, который, как мы помним, лежит в папке config. Пишем в консоль:

start zookeeper-server-start.bat c:\kafka\config\zookeeper.properties 

Теперь всё должно стартануть нормально.

Ещё раз открываем консоль в этой папке (ZooKeeper не закрывать!) и запускаем kafka:

start kafka-server-start.bat c:\kafka\config\server.properties

Для того, чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник со следующим содержимым:

start C:\kafka\bin\windows\zookeeper-server-start.bat C:\kafka\config\zookeeper.properties timeout 10 start C:\kafka\bin\windows\kafka-server-start.bat C:\kafka\config\server.properties

Строка timeout 10 нужна для того, чтобы задать паузу между запуском zookeeper и kafka. Если вы всё сделали правильно, при клике на батник должны открыться две консоли с запущенным zookeeper и kafka.Теперь мы можем прямо из командной строки создать продюсера сообщений и консьюмера с нужными параметрами. Но, на практике это может понадобиться разве что для тестирования сервиса. Гораздо больше нас будет интересовать, как работать с kafka из IDEA.

Работа с kafka из IDEA

Мы напишем максимально простое приложение, которое одновременно будет и продюсером и консьюмером сообщения, а затем добавим в него полезные фичи. Создадим новый спринг-проект. Удобнее всего делать это с помощью спринг-инициалайзера. Добавляем зависимости org.springframework.kafka и spring-boot-starter-web

В итоге файл pom.xml должен выглядеть так:

Для того, чтобы отправлять сообщения, нам потребуется объект KafkaTemplate. Как мы видим объект является типизированным. Первый параметр – это тип ключа, второй – самого сообщения. Пока оба параметра мы укажем как String. Объект будем создавать в классе-рестконтроллере. Объявим KafkaTemplate и попросим Spring инициализировать его, поставив аннотацию Autowired.

@Autowired private KafkaTemplate kafkaTemplate;

В принципе, наш продюсер готов. Всё что осталось сделать – это вызвать у него метод send(). Имеется несколько перегруженных вариантов данного метода. Мы используем в нашем проекте вариант с 3 параметрами — send(String topic, K key, V data). Так как KafkaTemplate типизирован String-ом, то ключ и данные в методе send будут являться строкой. Первым параметром указывается топик, то есть тема, в которую будут отправляться сообщения, и на которую могут подписываться консьюмеры, чтобы их получать. Если топик, указанный в методе send не существует, он будет создан автоматически. Полный текст класса выглядит так.

@RestController @RequestMapping("msg") public class MsgController < @Autowired private KafkaTemplatekafkaTemplate; @PostMapping public void sendOrder(String msgId, String msg) < kafkaTemplate.send("msg", msgId, msg); >> 

Контроллер мапится на localhost:8080/msg, в теле запроса передаётся ключ и само сообщений.

Отправитель сообщений готов, теперь создадим слушателя. Spring так же позволяет cделать это без особых усилий. Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которой можно указать только топик, который будет слушаться. В нашем случае это выглядит так.

@KafkaListener(topics="msg")

У самого метода, помеченного аннотацией, можно указать один принимаемый параметр, имеющий тип сообщения, передаваемого продюсером.

Класс, в котором будет создаваться консьюмер необходимо пометить аннотацией @EnableKafka.

@EnableKafka @SpringBootApplication public class SimpleKafkaExampleApplication < @KafkaListener(topics="msg") public void msgListener(String msg)< System.out.println(msg); >public static void main(String[] args) < SpringApplication.run(SimpleKafkaExampleApplication.class, args); >>

Так же в файле настроек application.property необходимо указать параметр консьюмера groupe-id. Если этого не сделать, приложение не запустится. Параметр имеет тип String и может быть любым.

spring.kafka.consumer.group-id=app.1

Наш простейший кафка-проект готов. У нас есть отправитель и получатель сообщений. Осталось только запустить. Для начала запускаем ZooKeeper и Kafka с помощью батника, который мы написали ранее, затем запускаем наше приложение. Отправлять запрос удобнее всего с помощью Postman. В теле запроса не забываем указывать параметры msgId и msg.

Если мы видим в IDEA такую картину, значит всё работает: продюсер отправил сообщение, консьюмер получил его и вывел в консоль.

Усложняем проект

Реальные проекты с использованием Kafka конечно же сложнее, чем тот, который мы создали. Теперь, когда мы разобрались с базовыми функциями сервиса, рассмотрим, какие дополнительные возможности он предоставляет. Для начала усовершенствуем продюсера.

Если вы открывали метод send(), то могли заметить, что у всех его вариантов есть возвращаемое значение ListenableFuture>. Сейчас мы не будем подробно рассматривать возможности данного интерфейса. Здесь будет достаточно сказать, что он нужен для просмотра результата отправки сообщения.

@PostMapping public void sendMsg(String msgId, String msg)< ListenableFuture> future = kafkaTemplate.send("msg", msgId, msg); future.addCallback(System.out::println, System.err::println); kafkaTemplate.flush(); >

Метод addCallback() принимает два параметра – SuccessCallback и FailureCallback. Оба они являются функциональными интерфейсами. Из названия можно понять, что метод первого будет вызван в результате успешной отправки сообщения, второго – в результате ошибки.Теперь, если мы запустим проект, то увидим на консоли примерно следующее:

SendResult [producerRecord=ProducerRecord(topic=msg, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=1, value=Hello, world!, timestamp=null), recordMetadata=msg-0@6]

Посмотрим ещё раз внимательно на нашего продюсера. Интересно, что будет если в качестве ключа будет не String, а, допустим, Long, а в качестве передаваемого сообщения и того хуже – какая-нибудь сложная DTO? Попробуем для начала изменить ключ на числовое значение…

Если мы укажем в продюсере в качестве ключа Long, то приложение нормально запуститься, но при попытке отправить сообщение будет выброшен ClassCastException и будет сообщено, что класс Long не может быть приведён к классу String.

Если мы попробуем вручную создать объект KafkaTemplate, то увидим, что в конструктор в качестве параметра передаётся объект интерфейса ProducerFactory, например DefaultKafkaProducerFactory<>. Для того, чтобы создать DefaultKafkaProducerFactory, нам нужно в его конструктор передать Map, содержащий настройки продюсера. Весь код по конфигурации и созданию продюсера вынесем в отдельный класс. Для этого создадим пакет config и в нём класс KafkaProducerConfig.

@Configuration public class KafkaProducerConfig < private String kafkaServer="localhost:9092"; @Bean public MapproducerConfigs() < Mapprops = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; > @Bean public ProducerFactory producerFactory() < return new DefaultKafkaProducerFactory<>(producerConfigs()); > @Bean public KafkaTemplate kafkaTemplate() < return new KafkaTemplate<>(producerFactory()); > > 

В методе producerConfigs() создаём мапу с конфигурациями и в качестве сериализатора для ключа указываем LongSerializer.class. Запускаем, отправляем запрос из Postman и видим, что теперь всё работает, как надо: продюсер отправляет сообщение, а консьюмер принимает его.

Теперь изменим тип передаваемого значения. Что если у нас не стандартный класс из библиотеки Java, а какой-нибудь кастомный DTO. Допустим такой.

@Data public class UserDto < private Long age; private String name; private Address address; >@Data @AllArgsConstructor public class Address

Для отправки DTO в качестве сообщения, нужно внести некоторые изменения в конфигурацию продюсера. В качестве сериализатора значения сообщения укажем JsonSerializer.class и не забудем везде изменить тип String на UserDto.

@Configuration public class KafkaProducerConfig < private String kafkaServer="localhost:9092"; @Bean public MapproducerConfigs() < Mapprops = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; > @Bean public ProducerFactory producerFactory() < return new DefaultKafkaProducerFactory<>(producerConfigs()); > @Bean public KafkaTemplate kafkaTemplate() < return new KafkaTemplate<>(producerFactory()); > >

Отправим сообщение. В консоль будет выведена следующая строка:

Теперь займёмся усложнением консьюмера. До этого наш метод public void msgListener(String msg), помеченный аннотацией @KafkaListener(topics=«msg») в качестве параметра принимал String и выводил его на консоль. Как быть, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или партицию? В этом случае тип передаваемого значения необходимо изменить.

@KafkaListener(topics="msg") public void orderListener(ConsumerRecord record)

Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.

Мы видим, что вместо ключа на консоль выводятся какие-то кракозябры. Это потому, что для десериализации ключа по умолчанию используется StringDeserializer, и если мы хотим, чтобы ключ в целочисленном формате корректно отображался, мы должны изменить его на LongDeserializer. Для настройки консьюмера в пакете config создадим класс KafkaConsumerConfig.

@Configuration public class KafkaConsumerConfig < @Value("$") private String kafkaServer; @Value("$") private String kafkaGroupId; @Bean public Map consumerConfigs() < Mapprops = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); return props; > @Bean public KafkaListenerContainerFactory kafkaListenerContainerFactory() < ConcurrentKafkaListenerContainerFactoryfactory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; > @Bean public ConsumerFactory consumerFactory() < return new DefaultKafkaConsumerFactory<>(consumerConfigs()); > >

Класс KafkaConsumerConfig очень похож на KafkaProducerConfig, который мы создавали ранее. Здесь так же присутствует Map, содержащий необходимые конфигурации, например, такие как десериализатор для ключа и значения. Созданная мапа используется при создании ConsumerFactory<>, которая в свою очередь, нужна для создания KafkaListenerContainerFactory. Важная деталь: метод возвращающий KafkaListenerContainerFactory должен называться kafkaListenerContainerFactory(), иначе Spring не сможет найти нужного бина и проект не скомпилируется. Запускаем.

Видим, что теперь ключ отображается как надо, а это значит, что всё работает. Конечно, возможности Apache Kafka далеко выходят за пределы тех, что описаны в данной статье, однако, надеюсь, прочитав её, вы составите представление о данном сервисе и, самое главное, сможете начать работу с ним.

Мойте руки чаще, носите маски, не выходите без необходимости на улицу, и будьте здоровы.

Zookeeper

Apache Zookeeper — open source проект Apache Software Foundation, cервис-координатор, который обеспечивает распределенную синхронизацию небольших по объему данных (конфигурационная информация, пространство имен) для группы приложений. Zookeeper представляет из себя распределенное хранилище ключ-значение (key-value store), гарантирующий надежное консистентное (consistency) хранение информации за счет синхронной репликации между узлами, контроля версий, механизма очередей (queue) и блокировок (lock). За счет использования оперативной памяти и масштабируемости обладает высокой скоростью

Сценарии использования Зукипер:

  • Распределенный сервер имен (namespacetopics для Kafka)
  • Распределенная конфигурация (Hadoop, Kafka)
  • Распределенный членство в группах (распределенные сервисы Kafka, Hadoop)
  • Выбор главного в распределенных системах с арбитражом (Leader election).

Как устроен Apache Zookeeper

Архитектурно Зукипер организован по клиент-серверной технологии, когда клиентские приложения обращаются к одному из узлов, объединенных в ансамбль. Среди ансамбля серверов выделяется главный узел — лидер, который выполняет все операции записи и запускает автоматическое восстановление при отказе любого из подключенных серверов. Остальные узлы — подписчики или последователи, реплицируют данные с лидера и используются клиентскими приложениями для чтения.

ZooKeeper имитирует виртуальную древовидную файловую систему из взаимосвязанных узлов, которые представляют собой совмещенное понятие файла и директории. Каждый узел этой иерархии может одновременно хранить данные и иметь подчиненные узлы-потомки.

Достоинства и недостатки Зукипер

Ключевыми преимуществами Zookeeper в распределенных Big Data системах считаются следующие:

  • отказоустойчивость кластера;
  • синхронизация распределенных сервисов;
  • автоматическая синхронизация данных;
  • упорядоченность сообщений;
  • транзакционность передачи даннных.

Обратной стороной этих достоинств являются следующие недостатки:

  • зависимость от оперативной памяти узла;
  • избыточное количество серверов;
  • особенности ZAB-протокола синхронизации данных в ансамбле серверов;
  • ограниченность пространства имен и числа потомков каждого узла.

Подробнее, зачем Apache Zookeeper используется в кластерах Hadooop, Kafka и HBase, а также чем можно его заменить, мы писали здесь. А об архитектуре, основных принципах работы и главных проблемах Зукипер читайте в этой статье.

Zookeeper или как живётся работнику зоопарка

Java-университет

Приложения в Java часто имеют различные конфигурации. Например, адрес и порт подключения. Например, это могло бы выглядеть следующим образом, если бы использовали класс Properties:

 public static void main(String []args)

И этого вроде достаточно, т.к. мы можем Properties получить из файла. И вроде у нас всё на одной машине уживается хорошо. Но представте, что наша система начинает состоять из разных систем, которые разделены между собой? Такая система ещё называется распределённой (Distributed Systems). В википедии можно найти следующее определение: Распределённые системы — это системы, компоненты который расположены на разных сетевых компьютерах, которые общаются между собой и координируют свои действия обмениваясь друг с другом сообщениями. Можно взглянуть на следующую схему:

Zookeeper или как живётся работнику зоопарка - 2

При таком подходе единая система разделена на компоненты. Конфигурирование — отдельный общий компонент. Каждый из других компонентов выступает в роли клиента для компонента конфигурирования. Такой случай называется «Распределённая конфигурация«. Существует множество различных реализаций распределённой конфигурации. И в сегодняшем обзоре предлагаю познакомиться с одной из них, которая называется Zookeeper.

Zookeeper или как живётся работнику зоопарка - 3

Zookeeper

Путь к знакомству с Zookeeper начинается с их официального сайта: zookeeper.apache.org На официальном сайте необходимо перейти в раздел «Download». В этом разделе скачиваем архив в формате .tar.gz, например «zookeeper-3.4.13.tar.gz». tar — это формат архива, традиционный для Unit систем. gz — означает, что для сжатия архива используется gzip. Если мы работаем на Windows машине, то нас это не должно смущать. Большинство современных архиваторов (например, 7-zip), прекрасно умеют с ними работать и на Windows. Извлечём содержимое в какой-нибудь каталог. Заодно увидим разницу — на диске в извлечённом состоянии оно будет занимать примерно 60 мегабайт, а скачивали мы архив размером около 35 мегабайт. Как видно, сжатие действительно работает. Теперь нужно запустить Zookeeper. Вообще, Zookeeper является своего рода сервером. Zookeeper может быть запущен в одном из двух режимов: Standalone или Replicated. Рассмотрим самый простой вариант, он же первый вариант — Standalone mode. Чтобы Zookeper запустился, ему нужен конфигурационный файл. Поэтому, создадим его тут: [КаталогРаспаковкиZookeeper]/conf/zoo.cfg . Для Windows воспользуемся рекомендацией из Medium : «Installing Apache ZooKeeper on Windows». Содержание конфигурационного файла будет примерно следующим:

 tickTime=2000 dataDir=C:/zookeeper-3.4.13/data clientPort=2181 

Добавим переменную среды окружения ZOOKEEPER_HOME, содержащую путь к корневому каталогу zookeper (как в инструкции на medium), а так же добавим в переменную среды окружения PATH следующий фрагмент: ;%ZOOKEEPER_HOME%\bin; Так же каталог, указанный в dataDir, должен существовать, иначе Zookeeper не сможет запустить сервер. Теперь мы можем смело запускать сервер при помощи команды: zkServer. Благодаря тому, что каталог Zookeeper был добавлен в переменную среды окружения path мы можем вызывать команды Zookeper откуда угодно, а не только из каталога bin.

Zookeeper или как живётся работнику зоопарка - 4

ZNode

Как сказано в «Zookeeper Overview», данные в Zookeper представлены в виде ZNode (узлов), которые объединены в древовидную структуру. То есть каждый ZNode может содержать данные и иметь дочерние ZNode. Подробнее про организацию ZNode можно прочитать в документации Zookeeper: «Data model and the hierarchical namespace». Для работы с Zookeeper и ZNode воспользуемся Zookeeper CLI (Command Line interface — интерфейс командной строки). Ранее мы запустили сервер при помощи команды zkServer. Теперь, для подключения выполним zkCli.cmd -server 127.0.0.1:2181 При успешном выполнении будет создана сессия подключения к Zookeeper и мы увидим примерно следующий вывод:

Zookeeper или как живётся работнику зоопарка - 5

Интересно, что даже сразу после установки Zookeeper уже имеет ZNode. Имеет он следующий путь: /zookeeper/quota

Zookeeper или как живётся работнику зоопарка - 6

Это так называемые «квоты». Как сказано в «Apache ZooKeeper Essentials», каждый ZNode может иметь ассоциированную с ним квоту, ограничивающую хранимые данные. Может быть указано ограничение по количеству znode и на объём хранимых данных. При этом если это ограничение превышено, то операции с ZNode не отменяется, но будет получено предупреждение о превышении лимита. Про ZNode рекомендуется прочитать в «ZooKeeper Programmer’s Guide : ZNodes». Несколько примеров от туда, как можно работать с ZNode:

Zookeeper или как живётся работнику зоопарка - 7

Хочется отметить также, что ZNode бывают разные. Обычные ZNode (если не указать дополнительные флаги) являются тип «persistent«. Есть ZNode типа «Ephemeral Node». Такие ZNode существуют только на время существование сессии подключения к Zookeeper, в рамках которой они создавались. Есть ZNode типа «Sequence Node». К таким ZNode добавляется номер из последовательности, чтобы гарантировать уникальность. Sequence Node могут быть как persistent, так и ephemeral. Про ZNode так же рекомендуется небольшая справочная информация тут: «Zookeeper ZNodes – Characteristics & Example».

Zookeeper или как живётся работнику зоопарка - 8

ZNode Watcher

Хотелось бы ещё поговорить про наблюдателей (watchers). Подробно про них написано в документации Zookeeper’а: «ZooKeeper Watches». Если кратко, то вотчер — это такой одноразовый триггер, который срабатывает на некоторое событие. Получая данные, выполняя операции getData(), getChildren() или exists() мы можем создать триггер как дополнительное действие. Zookeeper обеспечивает порядок обработки event. Кроме того, в документации указано, что прежде чем мы сможем увидеть новое значение ZNode мы увидим event об изменении старого значения на новое. Подробнее про Watcher’ов можно прочитать здесь: «ZooKeeper Watches – Features & Guarantees». Для того, чтобы это попробовать, снова воспользуемся CLI: Предположим, у нас есть некоторый ZNode со значением, где мы храним статус некоторого сервиса:

 [zk: 127.0.0.1:2181(CONNECTED) 0] create /services/service1/status stopped Created /services/service1/status [zk: 127.0.0.1:2181(CONNECTED) 1] get /services/service1/status [watch] stopped 

Теперь, если данные в /services/service1/status изменятся, то отработает наш одноразовый триггер:

Zookeeper или как живётся работнику зоопарка - 9

Интересно, что при подключении к Zookeeper’у мы так же видим, как отрабатывает вотчер:

 WATCHER:: WatchedEvent state:SyncConnected type:None path:null 

SyncConnected является одним из возможных событий Zookeper. Подробнее про него можно посмотреть в описании API.

Zookeeper или как живётся работнику зоопарка - 10

Zookeeper и Java

Теперь у нас есть некоторое базовое представление о том, что может Zookeeper. Давайте теперь с ним поработаем через Java, а не через CLI. И для этого нам понадобится Java приложение, на котором мы увидим, как же с Zookeeper’ом работать. Для создания приложения воспользуемся системой сборки проектов Gradle. При помощи «Gradle Build Init plugin» выполним создание проекта. Для этого выполним команду: gradle init —type java-application В случае, если Gradle нас будет спрашивать уточняющие вопросы, то оставим значения по умолчанию (просто нажимаем Enter). Теперь откроем билд скрипт, т.е. файл build.gradle. В нём описание того, из чего устроен наш проект и от каких артефактов(библиотек, фрэймворков) зависит. Т.к. мы хотим использовать Zookeeper, то надо добавить его. Поэтому, добавим в блок dependencies зависимость от Zookeeper’а:

 dependencies < implementation 'org.apache.zookeeper:zookeeper:3.4.13' 

Подробнее про Gradle можно прочитать в обзоре: "Краткое знакомство с Gradle". Итак, у нас есть Java проект, к нему мы подключили библиотеку Zookeeper'а. Давайте теперь что-нибудь напишем. Как мы помним, при помощи CLI мы подключались примерно так: zkCli.cmd -server 127.0.0.1:2181 Давайте в классе App в main методе объявим атрибут "сервер":

 String server = "127.0.0.1:2181"; 

Подключение — действие не моментальное. Нам придётся как-то в главное потоке выполнения программы ждать, когда произойдёт подключение. Поэтому, нам понадобится лок. Объявим его ниже:

 Object lock = new Object(); 

Теперь нам нужен кто-то, кто скажет, что подключение установлено. Как мы помним, когда мы это делали через CLI, то у нас срабатывал вотчер. Так вот в Java коде всё точно так же. Наш вотчер будет выводить сообщение об успешном выполнении и уведомлять об этом всех ждущих через лок. Напишем вотчер:

 Watcher connectionWatcher = new Watcher() < public void process(WatchedEvent we) < if (we.getState() == Event.KeeperState.SyncConnected) < System.out.println("Connected to Zookeeper in " + Thread.currentThread().getName()); synchronized (lock) < lock.notifyAll(); >> > >; 

Теперь допишем подключение к серверу zooKeeper'а:

 int sessionTimeout = 2000; ZooKeeper zooKeeper = null; synchronized (lock)

Здесь всё просто. При выполнении main метода в главном потоке программы мы захватываем lock и запрашиваем подключение к zookeeper'у. При этом мы отпускаем лок, и ожидаем, пока кто-то другой не захватит лок и не уведомит нас, что можно продолжать. Когда подключение будет установлено, то сработает вотчер. Он проверит, что пришло событие - SyncConnected (как мы помним, именно его ловил вотчер через CLI), и тогда напишет сообщение. Далее мы захватываем lock (т.к. ранее главный поток его отпустил) и уведомляем все ждущие lock потоки, что можно продолжать. Поток обработки события выходит из synchronized блока, тем самым освобождая lock. Главный поток получил уведомление и дождавшись освобождение lock продолжает выполнение, т.к. пока не получит lock, то он не сможет выйти из synchronized блока и продолжить работу. Таким образом, используя многопоточность и Zookeeper API мы можем выполнять различные действия. Zookeeper API гораздо шире, чем позволяет использовать CLI. Например:

 // Создание нового узла String znodePath = "/zookeepernode2"; List acls = ZooDefs.Ids.OPEN_ACL_UNSAFE; if (zooKeeper.exists(znodePath, false) == null) < zooKeeper.create(znodePath, "data".getBytes(), acls, CreateMode.PERSISTENT); >// Получение данных из узла byte[] data = zooKeeper.getData(znodePath, null, null); System.out.println("Result: " + new String(data, "UTF-8")); 

Как видно, при создании узла мы можем настроить ACL. Это ещё одна важная особенность. ACL - это разрешения, которые распостраняются на действия с ZNode. Настроек много, поэтому за подробностями рекомендую обратиться к официальной документации: "Zookeeper ACL Permissions".

Zookeeper или как живётся работнику зоопарка - 11

Заключение

  • "ZooKeeper: Distributed Process Coordination", Flavio Junqueira, Benjamin Reed
  • "Apache ZooKeeper Essentials"
  • Семинар в Яндексе: "В чем польза ZooKeeper для разработчиков"
  • ZooKeeper intro
  • Centralized Application Configuration with Spring and Apache ZooKeeper
  • ZooKeeper или пишем сервис распределенных блокировок
  • Национальная библиотека им. Н. Э. Баумана : Apache ZooKeeper
  • Apache ZooKeeper Tutorial – ZooKeeper Guide for Beginners

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *