Заметки по архитектуре аналитических сервисов. Часть 1: из Kafka на S3 при помощи Secor

Мы начинаем цикл статей, посвященных архитектуре аналитических сервисов, в том числе работающих с BigData. Данный цикл будет интересен, в первую очередь, не столько специалистам в области digital marketing, хотя и они могут почерпнуть для себя интересные подробности и составить общее представление о технических особенностях архитектур подобного рода, сколько IT-специалистам, занимающимся построением таких систем.

kafka_s3_rainbow

Одним из самых простых, надежных, удобных и расширяемых архитектурных решений для построения системы является сбор логов от всех компонентов и вечное их хранение для последующей независимой обработки. Такое решение, используемое во многих компаниях, в первую очередь в LinkedIn (где изобрели Apache Kafka), и соответствующее популярной ныне на клиентской стороне архитектуре unidirectional data flow (Flux, Redux), позволяет быстро начать работу с огромным количеством поступающих данных, не волнуясь по поводу их хранения, не раздумывая над выбором БД, схемы и индексов, и даже не требуя особенно интенсивной кооперации между программистами, работающими над приемом данных и теми, кто работает над их обработкой. Данные, собираемые и вечно хранимые просто в виде логов, доступны для батчевой обработки, в том числе ручной; более того, их очень легко использовать в дальнейшем и для наполнения других видов баз данных, подходящих для других применений, например, в реальном времени.

Подобную архитектуру часто строят вокруг Apache Kafka, выполняющей почти всю необходимую работу: собирающей сообщения, раздающей их обработчикам, требующим данных в реальном времени, и гарантирующей опеределенный уровень доступности. Однако, для того, чтобы данные действительно хранились и были доступны вечно, их стоит сохранять вне ее – в месте, в котором можно было бы хранить большое число больших файлов, доступных для параллельной обработки. Одним из самых популярных хранилищ такого рода является Amazon S3.

В этой серии статей мы расскажем, как с помощью Secor сохранять данные из Apache Kafka на S3, и как удобно и, при желании, интерактивно обрабатывать эти данные при помощи Python на платформе Apache Spark (Spark умеет работать с Kafka и напрямую, но это никак не годится для вечного хранения и батчевой распределенной обработки данных). Примерный план будет таков:

Часть 1

  • Подготовка и настройка Kafka
  • Запуск Secor

Часть 2

  • Обработка при помощи Spark

Часть 3

  • Сбор статистики и мониторинг
  • Распределенный Secor для надежности
  • Распределенный Spark для скорости

Часть 4

  • Возможности для оптимизации, в том числе финансовой
  • Обзор альтернативных решений

Подготовка и настройка Kafka

Эта статья предполагает, что вы уже установили Kafka и работаете с ней: как минимум, шлете в нее какие-то сообщения. Желательно, конечно, чтобы Kafka была установлена распределенно, на нескольких серверах, но это дело наживное.

В общем-то для подготовки нас интересуют три вещи: организация данных в топики, настройка хранения топиков и формат данных. Данные в Кафка организуются по двум уровням: топики (примерно как таблички в SQL или, скорее, коллекции в Mongo), и ключи сообщений, что-то вроде неуникального индекса, позволяющего шардинг сообщений как по разным серверам, так и по разным кафкианским консумерам – однако, их Secor игнорирует (к сожалению), а значит, мы тоже будем. Поэтому просто создавайте топики, которые вам нужны, и используйте ключи, которые вам удобны для несвязанных с этой статьей занятий.

Данные мы будем слать в формате JSON: Secor еще поддерживает MessagePack, Thrift или что угодно другое (в таком случае придется либо писать свой парсер на Scala, либо остаться без разбивки данных по дням) – с этим всем вы тоже легко сами разбиретесь. Обязательно надо сделать одно поле вроде timestamp, с обычным юниксовым таймштампом (можно назвать как угодно, это настраивается) – тогда данные будут храниться на S3, разбитые по дням. В итоге, у нас будут один или несколько топиков с данными в формате JSON, одним из полей которых является timestamp:

Теперь нужно настроить хранение этих топиков так, чтобы они, с одной стороны, не забили нам все место на диске, а, с другой, не чистились быстрее, чем Secor успевает их сохранять. Для этого используются настройки топика retention.ms и retention.bytes: первая задает минимальный срок жизни, после которого сообщение будет удалено, а вторая – максимальный размер топика. В общем-то, думаю, стоит использовать только вторую, поставив туда сразу максимум, который вам не жалко – это сделает использование диска предсказуемым, а потому уменьшит вероятность того, что всё грохнется. У нас вот, например, сейчас там 107374182400 – это что-то вроде 100 гигабайт. Риск здесь, однако, в том, что если сообщений будет слишком много, гигабайты в минуту, то Secor, возможно, перестанет успевать их все загрузить на S3 (что будет возможно решить, запустив его на нескольких серверах – нагрузка будет распределена). То, как мониторить эти вещи, чтобы не тревожиться зря, мы объясним чуть попозже, в другой статье. А пока – команды для настройки Kafka:

Теперь всё готово для Secor.

Запуск Secor

Secor – это программа на языке Scala, которая собирает данные из Kafka, как обычный консумер и сохраняет их в файлы на диске, которые время от времени отправляет на S3. Ее нужно настроить и запустить (что я делаю при помощи Docker), а дальше она всё будет делать сама. Если вы почему-то не используете Docker, думаю, у вас все получится и без него (в конце концов, это просто программа…), но, может быть, стоит воспользоваться поводом и начать.

Недавно в репозитории Secor появились какие-то Dockerfile’ы, но о них ничего нигде не написано, и к использованию они тоже в общем-то не приспособлены. Ничего страшного, у нас уже есть готовый. Мы пойдем самым простым путем – будем собирать Secor вместе с докеровским образом (image), и использовать его прямо на месте. Это значит, что образ будет больше, чем нужно, на величину всего необходимого для сборки – ну ничего страшного, потом когда-нибудь оптимизируем.

Итак, нам понадобятся Dockerfile и Secor-овские конфиги. Они все есть в нашем gist-е: https://gist.github.com/va1en0k/ff07621ebe815ed449cae0fa1fe959b2 – там все необходимые файлы для сборки докером версии v0.21. Собственно, создать докеровский образ с помощью этого гиста достаточно просто:

Пока Secor собирается, его можно настроить – теми самыми конфигами из нашего гиста. Особенно интересуют следующие настройки:

В файле secor.common.properties:

  • Ключи для S3 в блоке “MUST SET” наверху. Рекомендую создать новый бакет для S3, и для него сделать отдельные ключи.
  • В том же блоке там же можно ограничить набор загружаемых кафкианских топиков регуляркой.
  • Настройки вида message.timestamp.*: это настройки поля timestamp, которое будет использоваться для разделения логов по датам. Не забудьте про разделитель названий полей в пути (message.timestamp.separator), если хотите использовать поле из вложенного словаря.

В файле secor.prod.properties:

  • Он короткий, посмотрите его весь: в нем – обязательные настройки для соединения с Kafka, а так же название бакета на S3, в который будут загружаться логи. В конце – две настройки про “upload policies”, т.е., о том, как часто файлы с логами будут сгружаться на s3, и насколько они будут большими.

В файле secor.prod.partition.properties:

  • Папки и пути на S3, в которых будут храниться архивы логов.

Когда все эти конфиги заполнены и секор собран, можно запускать:

Если вы видите, что что-то идет не так, исправьте конфиги и перезапустите контейнер:

Чтобы проверить, что всё работает, можно, не дожидаясь первой загрузки на S3, заглянуть в папку, в которую Secor собирает логи перед отправкой:

Если размеры файлов, выданные этой командой, меняются – значит, всё идет хорошо, и сообщения копятся. Через некоторое время – не более, чем указанное в “Upload policies” файла secor.prod.properties, они появятся и на S3. Как их распределенно и параллельно обрабатывать при помощи Apache Spark, как мониторить процесс архивирования логов, и какие могут быть способы оптимизации этих процессов мы расскажем в следующих сериях.

Заметки по архитектуре аналитических сервисов. Часть 1: из Kafka на S3 при помощи Secor: 2 комментария

    1. Спасибо за проявленный интерес! Передали нашему автору, он
      обещал написать продолжение.
      Подписывайтесь на блог и следите за обновлениями!

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *