Перейти к основному содержимому
Перейти к основному содержимому

Интеграция ClickHouse с Kafka с использованием именованных коллекций

Введение

В этом руководстве мы рассмотрим, как подключить ClickHouse к Kafka с использованием именованных коллекций. Использование файла конфигурации для именованных коллекций предлагает несколько преимуществ:

  • Централизованное и более простое управление настройками конфигурации.
  • Изменения в настройках могут быть выполнены без изменения определений SQL таблиц.
  • Упрощенный обзор и устранение неполадок конфигураций за счет проверки одного файла конфигурации.

Это руководство было протестировано на Apache Kafka 3.4.1 и ClickHouse 24.5.1.

Предположения

В этом документе предполагается, что у вас есть:

  1. Рабочий кластер Kafka.
  2. Настроенный и работающий кластер ClickHouse.
  3. Базовые знания SQL и знакомство с конфигурациями ClickHouse и Kafka.

Предварительные требования

Убедитесь, что пользователь, создающий именованную коллекцию, имеет необходимые права доступа:

<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>

Для получения дополнительных сведений о включении контроля доступа обратитесь к Руководству по управлению пользователями.

Конфигурация

Добавьте следующий раздел в файл config.xml ClickHouse:

<!-- Named collections for Kafka integration -->
<named_collections>
    <cluster_1>
        <!-- ClickHouse Kafka engine parameters -->
        <kafka_broker_list>c1-kafka-1:9094,c1-kafka-2:9094,c1-kafka-3:9094</kafka_broker_list>
        <kafka_topic_list>cluster_1_clickhouse_topic</kafka_topic_list>
        <kafka_group_name>cluster_1_clickhouse_consumer</kafka_group_name>
        <kafka_format>JSONEachRow</kafka_format>
        <kafka_commit_every_batch>0</kafka_commit_every_batch>
        <kafka_num_consumers>1</kafka_num_consumers>
        <kafka_thread_per_consumer>1</kafka_thread_per_consumer>

        <!-- Kafka extended configuration -->
        <kafka>
            <security_protocol>SASL_SSL</security_protocol>
            <enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
            <sasl_mechanism>PLAIN</sasl_mechanism>
            <sasl_username>kafka-client</sasl_username>
            <sasl_password>kafkapassword1</sasl_password>
            <debug>all</debug>
            <auto_offset_reset>latest</auto_offset_reset>
        </kafka>
    </cluster_1>

    <cluster_2>
        <!-- ClickHouse Kafka engine parameters -->
        <kafka_broker_list>c2-kafka-1:29094,c2-kafka-2:29094,c2-kafka-3:29094</kafka_broker_list>
        <kafka_topic_list>cluster_2_clickhouse_topic</kafka_topic_list>
        <kafka_group_name>cluster_2_clickhouse_consumer</kafka_group_name>
        <kafka_format>JSONEachRow</kafka_format>
        <kafka_commit_every_batch>0</kafka_commit_every_batch>
        <kafka_num_consumers>1</kafka_num_consumers>
        <kafka_thread_per_consumer>1</kafka_thread_per_consumer>

        <!-- Kafka extended configuration -->
        <kafka>
            <security_protocol>SASL_SSL</security_protocol>
            <enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
            <sasl_mechanism>PLAIN</sasl_mechanism>
            <sasl_username>kafka-client</sasl_username>
            <sasl_password>kafkapassword2</sasl_password>
            <debug>all</debug>
            <auto_offset_reset>latest</auto_offset_reset>
        </kafka>
    </cluster_2>
</named_collections>

Примечания к конфигурации

  1. Настройте адреса Kafka и связанные параметры в соответствии с настройками вашего кластера Kafka.
  2. Раздел перед <kafka> содержит параметры движка Kafka ClickHouse. Для полного списка параметров обратитесь к параметрам движка Kafka.
  3. Раздел внутри <kafka> содержит расширенные параметры конфигурации Kafka. Для получения дополнительных параметров обратитесь к конфигурации librdkafka.
  4. В этом примере используется протокол безопасности SASL_SSL и механизм PLAIN. Настройте эти параметры в зависимости от конфигурации вашего кластера Kafka.

Создание таблиц и баз данных

Создайте необходимые базы данных и таблицы на вашем кластере ClickHouse. Если вы запускаете ClickHouse как единичный узел, опустите часть кластера SQL команды и используйте любой другой движок вместо ReplicatedMergeTree.

Создание базы данных

CREATE DATABASE kafka_testing ON CLUSTER LAB_CLICKHOUSE_CLUSTER;

Создание таблиц Kafka

Создайте первую таблицу Kafka для первого кластера Kafka:

CREATE TABLE kafka_testing.first_kafka_table ON CLUSTER LAB_CLICKHOUSE_CLUSTER
(
    `id` UInt32,
    `first_name` String,
    `last_name` String
)
ENGINE = Kafka(cluster_1);

Создайте вторую таблицу Kafka для второго кластера Kafka:

CREATE TABLE kafka_testing.second_kafka_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
    `id` UInt32,
    `first_name` String,
    `last_name` String
)
ENGINE = Kafka(cluster_2);

Создание реплицированных таблиц

Создайте таблицу для первой таблицы Kafka:

CREATE TABLE kafka_testing.first_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
    `id` UInt32,
    `first_name` String,
    `last_name` String
) ENGINE = ReplicatedMergeTree()
ORDER BY id;

Создайте таблицу для второй таблицы Kafka:

CREATE TABLE kafka_testing.second_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
    `id` UInt32,
    `first_name` String,
    `last_name` String
) ENGINE = ReplicatedMergeTree()
ORDER BY id;

Создание материализованных представлений

Создайте материализованное представление для вставки данных из первой таблицы Kafka в первую реплицированную таблицу:

CREATE MATERIALIZED VIEW kafka_testing.cluster_1_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO first_replicated_table AS
SELECT 
    id,
    first_name,
    last_name
FROM first_kafka_table;

Создайте материализованное представление для вставки данных из второй таблицы Kafka во вторую реплицированную таблицу:

CREATE MATERIALIZED VIEW kafka_testing.cluster_2_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO second_replicated_table AS
SELECT 
    id,
    first_name,
    last_name
FROM second_kafka_table;

Проверка настройки

Теперь вы должны увидеть соответствующие группы потребителей на ваших кластерах Kafka:

  • cluster_1_clickhouse_consumer на cluster_1
  • cluster_2_clickhouse_consumer на cluster_2

Запустите следующие запросы на любом из ваших узлов ClickHouse, чтобы увидеть данные в обеих таблицах:

SELECT * FROM first_replicated_table LIMIT 10;
SELECT * FROM second_replicated_table LIMIT 10;

Примечание

В этом руководстве данные, поступающие в обе темы Kafka, одинаковы. В вашем случае они могут отличаться. Вы можете добавить столько кластеров Kafka, сколько хотите.

Пример вывода:

┌─id─┬─first_name─┬─last_name─┐
│  0 │ FirstName0 │ LastName0 │
│  1 │ FirstName1 │ LastName1 │
│  2 │ FirstName2 │ LastName2 │
└────┴────────────┴───────────┘

Это завершает настройку интеграции ClickHouse с Kafka с использованием именованных коллекций. Централизуя настройки Kafka в файле config.xml ClickHouse, вы можете легче управлять и регулировать настройки, обеспечивая упрощенную и эффективную интеграцию.