Перейти к основному содержимому
Перейти к основному содержимому

Подключение Apache NiFi к ClickHouse

Community Maintained

Apache NiFi - это программное обеспечение для управления рабочими процессами с открытым исходным кодом, предназначенное для автоматизации потока данных между программными системами. Оно позволяет создавать конвейеры передачи данных ETL и поставляется с более чем 300 процессорами данных. Этот пошаговый учебник показывает, как подключить Apache NiFi к ClickHouse в качестве источника и получателя, а также загрузить пример набора данных.

1. Соберите данные для подключения

Чтобы подключиться к ClickHouse с помощью HTTP(S), вам необходима следующая информация:

  • ХОСТ и ПОРТ: как правило, порт 8443 при использовании TLS или 8123 при отсутствии TLS.

  • ИМЯ БАЗЫ ДАННЫХ: по умолчанию существует база данных с именем default, используйте имя базы данных, к которой вы хотите подключиться.

  • ИМЯ ПОЛЬЗОВАТЕЛЯ и ПАРОЛЬ: по умолчанию имя пользователя default. Используйте имя пользователя, подходящее для вашего случая.

Данные для вашего сервиса ClickHouse Cloud доступны в консоли ClickHouse Cloud. Выберите сервис, к которому вы хотите подключиться, и нажмите Подключиться:

Кнопка подключения к сервису ClickHouse Cloud

Выберите HTTPS, и данные будут доступны в примере команды curl.

Детали подключения ClickHouse Cloud по HTTPS

Если вы используете самоуправляемый ClickHouse, детали подключения устанавливаются вашим администратором ClickHouse.

2. Загрузите и запустите Apache NiFi

  1. Для новой установки загрузите двоичный файл с https://nifi.apache.org/download.html и начните с запуска ./bin/nifi.sh start

3. Загрузите драйвер JDBC для ClickHouse

  1. Перейдите на страницу релиза драйвера ClickHouse JDBC на GitHub и найдите последнюю версию релиза JDBC
  2. В релизной версии нажмите на "Показать все xx активы" и найдите JAR файл с ключевым словом "shaded" или "all", например, clickhouse-jdbc-0.5.0-all.jar
  3. Поместите JAR файл в папку, доступную Apache NiFi, и запомните абсолютный путь

4. Добавьте службу контроллера DBCPConnectionPool и настройте его свойства

  1. Чтобы настроить службу контроллера в Apache NiFi, посетите страницу конфигурации потока NiFi, нажав на кнопку "шестигранник"

    Страница конфигурации потока NiFi с выделенной кнопкой шестеренки
  2. Выберите вкладку Службы контроллера и добавьте новую службу контроллера, нажав на кнопку + в верхнем правом углу

    Вкладка Службы контроллера с выделенной кнопкой добавления
  3. Найдите DBCPConnectionPool и нажмите на кнопку "Добавить"

    Диалог выбора службы контроллера с выделенным DBCPConnectionPool
  4. Новая добавленная служба DBCPConnectionPool по умолчанию будет находиться в состоянии Неверно. Нажмите на кнопку "шестигранник", чтобы начать конфигурацию

    Список служб контроллера показывает недействительный DBCPConnectionPool с выделенной кнопкой шестеренки
  5. В разделе "Свойства" введите следующие значения

СвойствоЗначениеПримечание
URL подключения к базе данныхjdbc:ch:https://HOSTNAME:8443/default?ssl=trueУбедитесь, что HOSTNAME в URL подключения заменен соответственно
Имя класса драйвера базы данныхcom.clickhouse.jdbc.ClickHouseDriver
Местоположение драйвера базы данных/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarАбсолютный путь к JAR файлу драйвера JDBC для ClickHouse
Пользователь базы данныхdefaultИмя пользователя ClickHouse
ПарольpasswordПароль ClickHouse
  1. В разделе Настройки измените имя службы контроллера на "ClickHouse JDBC" для удобства

    Диалог конфигурации DBCPConnectionPool с заполненными свойствами
  2. Активируйте службу контроллера DBCPConnectionPool, нажав на кнопку "молния", а затем кнопку "Включить"

    Список служб контроллера с выделенной кнопкой молнии

    Диалог подтверждения включения службы контроллера
  3. Проверьте вкладку Службы контроллера и убедитесь, что служба контроллера включена

    Список служб контроллера показывает включенную службу ClickHouse JDBC

5. Чтение из таблицы с помощью процессора ExecuteSQL

  1. Добавьте процессор ExecuteSQL, вместе с соответствующими входящими и исходящими процессорами

    Полотно NiFi с процессором ExecuteSQL в рабочем процессе
  2. В разделе "Свойства" процессора ExecuteSQL введите следующие значения

    СвойствоЗначениеПримечание
    Служба пулов подключения к базе данныхClickHouse JDBCВыберите службу контроллера, настроенную для ClickHouse
    SQL запрос SELECTSELECT * FROM system.metricsВпишите ваш запрос сюда
  3. Запустите процессор ExecuteSQL

    Конфигурация процессора ExecuteSQL с заполненными свойствами
  4. Чтобы подтвердить, что запрос был успешно обработан, проверьте один из FlowFile в выходной очереди

    Диалог списка очереди, показывающий FlowFiles готовые к проверке
  5. Переключите вид на "форматированный", чтобы увидеть результат выходного FlowFile

    Просмотр содержимого FlowFile показывает результаты запроса в форматированном виде

6. Запись в таблицу с помощью процессоров MergeRecord и PutDatabaseRecord

  1. Чтобы записать несколько строк за одну вставку, сначала нужно объединить несколько записей в одну запись. Это можно сделать с помощью процессора MergeRecord.

  2. В разделе "Свойства" процессора MergeRecord введите следующие значения

    СвойствоЗначениеПримечание
    Читатель записейJSONTreeReaderВыберите подходящий читатель записей
    Писатель записейJSONReadSetWriterВыберите подходящий писатель записей
    Минимальное число записей1000Измените это на большее значение, чтобы минимальное количество строк объединялось в одну запись. По умолчанию - 1 строка
    Максимальное число записей10000Измените это на большее число, чем "Минимальное число записей". По умолчанию - 1 000 строк
  3. Чтобы подтвердить, что несколько записей объединены в одну, проверьте входные и выходные данные процессора MergeRecord. Обратите внимание, что выходные данные представляют собой массив нескольких входных записей

    Входные данные

    Входные данные процессора MergeRecord показывают отдельные записи

    Выходные данные

    Выходные данные процессора MergeRecord показывают объединенный массив записей
  4. В разделе "Свойства" процессора PutDatabaseRecord введите следующие значения

    СвойствоЗначениеПримечание
    Читатель записейJSONTreeReaderВыберите подходящий читатель записей
    Тип базы данныхGenericОставьте по умолчанию
    Тип оператораINSERT
    Служба пулов подключения к базе данныхClickHouse JDBCВыберите службу контроллера ClickHouse
    Имя таблицыtblВведите имя вашей таблицы здесь
    Перевод имен полейfalseУстановите в "false", чтобы имена полей, вставленных, должны соответствовать именам колонок
    Максимальный размер пакета1000Максимальное количество строк на вставку. Это значение не должно быть ниже значения "Минимальное число записей" в процессоре MergeRecord
  5. Чтобы подтвердить, что каждая вставка содержит несколько строк, проверьте, что количество строк в таблице увеличивается как минимум на значение "Минимальное число записей", определенное в MergeRecord.

    Результаты запроса показывают количество строк в целевой таблице
  6. Поздравляем - вы успешно загрузили ваши данные в ClickHouse с помощью Apache NiFi!