Перейти к основному содержимому
Перейти к основному содержимому

Движок распределенной таблицы

Движок в облаке

Чтобы создать движок распределенной таблицы в ClickHouse Cloud, вы можете использовать remote и remoteSecure функции таблиц. Синтаксис Distributed(...) не может быть использован в ClickHouse Cloud.

Таблицы с движком Distributed не хранят собственные данные, но позволяют выполнять распределенную обработку запросов на нескольких серверах. Чтение автоматически параллелизуется. Во время чтения используются индексы таблицы на удаленных серверах, если они существуют.

Создание таблицы

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
[SETTINGS name=value, ...]

Из таблицы

Когда таблица Distributed указывает на таблицу на текущем сервере, вы можете адаптировать схему этой таблицы:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] AS [db2.]name2 ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]]) [SETTINGS name=value, ...]

Параметры распределенности

ПараметрОписание
clusterИмя кластера в файле конфигурации сервера
databaseИмя удаленной базы данных
tableИмя удаленной таблицы
sharding_key (Опционально)Ключ шардирования.
Указание sharding_key необходимо для следующих случаев:
  • Для INSERT в распределенную таблицу (так как движок таблицы нуждается в sharding_key, чтобы определить, как разделить данные). Однако, если настройка insert_distributed_one_random_shard включена, тогда INSERT не нуждается в ключе шардирования.
  • Для использования с optimize_skip_unused_shards, так как sharding_key необходим для определения, какие шардированные таблицы следует запрашивать.
policy_name (Опционально)Имя политики, которое будет использовано для хранения временных файлов для фона отправки

Смотрите также

Настройки распределенности

НастройкаОписаниеЗначение по умолчанию
fsync_after_insertВыполнять fsync для данных файла после фоновой вставки в Distributed. Гарантирует, что ОС сбросила все вставленные данные на диск на узле инициатора.false
fsync_directoriesВыполнять fsync для директорий. Гарантирует, что ОС обновила метаданные директорий после операций, связанных с фоновой вставкой в распределенную таблицу (после вставки, после отправки данных в шард и т.д.).false
skip_unavailable_shardsЕсли true, ClickHouse бесшумно пропускает недоступные шарды. Шард помечается как недоступный, когда: 1) шард недоступен из-за сбоя соединения. 2) Шард неразрешим через DNS. 3) Таблица не существует на шарде.false
bytes_to_throw_insertЕсли более чем это количество сжатых байт будет ожидать фоновой INSERT, будет выброшено исключение. 0 - не выбрасывать.0
bytes_to_delay_insertЕсли более чем это количество сжатых байт будет ожидать фоновой вставки, запрос будет задержан. 0 - не задерживать.0
max_delay_to_insertМаксимальная задержка вставки данных в распределенную таблицу в секундах, если имеется много ожидающих байт для фоновой отправки.60
background_insert_batchТо же самое, что и distributed_background_insert_batch0
background_insert_split_batch_on_failureТо же самое, что и distributed_background_insert_split_batch_on_failure0
background_insert_sleep_time_msТо же самое, что и distributed_background_insert_sleep_time_ms0
background_insert_max_sleep_time_msТо же самое, что и distributed_background_insert_max_sleep_time_ms0
flush_on_detachСбрасывать данные на удаленные узлы при DETACH/DROP/выключении сервера.true
примечание

Настройки долговечности (fsync_...):

  • Влияют только на фоновые INSERT (т.е. distributed_foreground_insert=false), когда данные впервые хранятся на диске узла инициатора, а затем, в фоновом режиме, отправляются на шарды.
  • Могут значительно снизить производительность INSERT
  • Влияют на запись данных, хранящихся внутри папки распределенной таблицы, в узел, который принял ваш вставлянный запрос. Если вам нужны гарантии записи данных в базовые таблицы MergeTree, смотрите настройки долговечности (...fsync...) в system.merge_tree_settings

Для Настроек пределов вставки (..._insert) смотрите также:

  • distributed_foreground_insert настройка
  • prefer_localhost_replica настройка
  • bytes_to_throw_insert обрабатывается до bytes_to_delay_insert, поэтому вы не должны устанавливать его на значение меньше, чем bytes_to_delay_insert

Пример

CREATE TABLE hits_all AS hits
ENGINE = Distributed(logs, default, hits[, sharding_key[, policy_name]])
SETTINGS
    fsync_after_insert=0,
    fsync_directories=0;

Данные будут считываться со всех серверов в кластере logs, из таблицы default.hits, расположенной на каждом сервере в кластере. Данные не только считываются, но и частично обрабатываются на удаленных серверах (в той мере, в какой это возможно). Например, для запроса с GROUP BY, данные будут агрегироваться на удаленных серверах, а промежуточные состояния агрегатных функций будут отправлены на сервер запроса. Затем данные будут агрегироваться дальше.

Вместо имени базы данных вы можете использовать константное выражение, которое возвращает строку. Например: currentDatabase().

Кластеры

Кластеры настраиваются в файле конфигурации сервера:

<remote_servers>
    <logs>
        <!-- Inter-server per-cluster secret for Distributed queries
             default: no secret (no authentication will be performed)

             If set, then Distributed queries will be validated on shards, so at least:
             - such cluster should exist on the shard,
             - such cluster should have the same secret.

             And also (and which is more important), the initial_user will
             be used as current user for the query.
        -->
        <!-- <secret></secret> -->

        <!-- Optional. Whether distributed DDL queries (ON CLUSTER clause) are allowed for this cluster. Default: true (allowed). -->        
        <!-- <allow_distributed_ddl_queries>true</allow_distributed_ddl_queries> -->

        <shard>
            <!-- Optional. Shard weight when writing data. Default: 1. -->
            <weight>1</weight>
            <!-- Optional. The shard name.  Must be non-empty and unique among shards in the cluster. If not specified, will be empty. -->
            <name>shard_01</name>
            <!-- Optional. Whether to write data to just one of the replicas. Default: false (write data to all replicas). -->
            <internal_replication>false</internal_replication>
            <replica>
                <!-- Optional. Priority of the replica for load balancing (see also load_balancing setting). Default: 1 (less value has more priority). -->
                <priority>1</priority>
                <host>example01-01-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-01-2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <weight>2</weight>
            <name>shard_02</name>
            <internal_replication>false</internal_replication>
            <replica>
                <host>example01-02-1</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>example01-02-2</host>
                <secure>1</secure>
                <port>9440</port>
            </replica>
        </shard>
    </logs>
</remote_servers>

Здесь определен кластер с именем logs, который состоит из двух шардов, каждый из которых содержит две реплики. Шарды относятся к серверам, которые содержат различные части данных (для чтения всех данных необходимо получить доступ ко всем шардом). Реплики дублируют сервера (для чтения всех данных можно получить доступ к данным на любой из реплик).

Именам кластеров не должен содержать точек.

Для каждого сервера указаны параметры host, port, а опционально user, password, secure, compression, bind_host:

ПараметрОписаниеЗначение по умолчанию
hostАдрес удаленного сервера. Вы можете использовать как доменное имя, так и IPv4 или IPv6 адрес. Если вы указываете доменное имя, сервер выполняет DNS-запрос при старте, и результат сохраняется на время работы сервера. Если DNS-запрос потерпел неудачу, сервер не запускается. Если вы измените DNS-запись, перезапустите сервер.-
portTCP-порт для деятельности обмена сообщениями (tcp_port в конфигурации, обычно установлен на 9000). Не путать с http_port.-
userИмя пользователя для подключения к удаленному серверу. Этот пользователь должен иметь доступ для подключения к указанному серверу. Доступ настраивается в файле users.xml. Для получения дополнительной информации смотрите раздел Права доступа.default
passwordПароль для подключения к удаленному серверу (не скрыт).''
secureИспользовать ли защищенное SSL/TLS соединение. Обычно также требует указания порта (стандартный защищенный порт - 9440). Сервер должен слушать на <tcp_port_secure>9440</tcp_port_secure> и быть настроен с правильными сертификатами.false
compressionИспользовать сжатие данных.true
bind_hostИсходный адрес, который должен использоваться при подключении к удаленному серверу с этого узла. Поддерживается только IPv4 адрес. Предназначен для сложных сценариев развертывания, где требуется установить исходный IP-адрес, используемый запросами ClickHouse.-

При указании реплик одна из доступных реплик будет выбрана для каждого из шардов при чтении. Вы можете настроить алгоритм балансировки нагрузки (предпочтение к какой реплике обращаться) – смотрите настройку load_balancing. Если соединение с сервером не установлено, будет предпринята попытка подключения с коротким тайм-аутом. Если соединение не удалось, будет выбрана следующая реплика и так далее для всех реплик. Если попытка соединения не удалась для всех реплик, попытка будет повторена тем же способом несколько раз. Это работает в пользу устойчивости, но не обеспечивает полной отказоустойчивости: удаленный сервер может принять соединение, но не работать или работать плохо.

Вы можете указать только один из шардов (в этом случае обработка запроса должна называться удаленной, а не распределенной) или любое количество шардов. В каждом шарде вы можете указать от одной до любого количества реплик. Вы можете указать разное количество реплик для каждого шард.

В конфигурации можно указать столько кластеров, сколько вам нужно.

Чтобы просмотреть ваши кластеры, используйте таблицу system.clusters.

Движок Distributed позволяет работать с кластером так же, как с локальным сервером. Однако конфигурацию кластера нельзя указывать динамически, её необходимо настраивать в файле конфигурации сервера. Обычно все серверы в кластере будут иметь одинаковую конфигурацию кластера (хотя это не обязательно). Кластеры из файла конфигурации обновляются на лету, без перезапуска сервера.

Если вам нужно отправлять запрос в неизвестный набор шардов и реплик каждый раз, вам не нужно создавать таблицу Distributed – используйте вместо этого функцию таблицы remote. См. раздел Функции таблиц.

Запись данных

Существует два метода записи данных в кластер:

Во-первых, вы можете определить, на какие серверы записывать какие данные, и выполнять запись непосредственно на каждом шарде. Другими словами, выполнять прямые операторы INSERT на удаленных таблицах в кластере, на которые указывает таблица Distributed. Это наиболее гибкое решение, так как вы можете использовать любую схему шардирования, даже ту, которая является нетривиальной из-за требований предметной области. Это также наиболее оптимальное решение, так как данные могут быть записаны на разные шарды совершенно независимо.

Во-вторых, вы можете выполнять операторы INSERT в таблицу Distributed. В этом случае таблица распределит вставленные данные между серверами самостоятельно. Чтобы записать данные в таблицу Distributed, необходимо настроить параметр sharding_key (кроме случая, если имеется только один шард).

Каждый шард может иметь определенный <weight> в файле конфигурации. По умолчанию вес равен 1. Данные распределяются по шардом в количестве, пропорциональном весу шарда. Все веса шардов суммируются, после чего вес каждого шарда делится на итог для определения пропорции каждого шарда. Например, если имеется два шарда, и первый имеет вес 1, тогда как второй имеет вес 2, первый будет получать одну треть (1 / 3) вставленных строк, а второй будет получать две трети (2 / 3).

Каждый шард может иметь параметр internal_replication, определенный в файле конфигурации. Если этот параметр установлен в true, операция записи выбирает первую здоровую реплику и записывает данные в неё. Используйте это, если таблицы, лежащие в основании таблицы Distributed, являются реплицированными таблицами (например, любая из Replicated*MergeTree). Одна из реплик таблицы получит запись, и она будет автоматически реплицирована на другие реплики.

Если internal_replication установлен в false (по умолчанию), данные записываются во все реплики. В этом случае таблица Distributed самостоятельно реплицирует данные. Это хуже, чем использование реплицированных таблиц, потому что согласованность реплик не проверяется и со временем они будут содержать несколько разных данных.

Чтобы выбрать, в какой шард отправляется строка данных, анализируется выражение шардирования, и берется остаток от деления на общий вес шардов. Строка отправляется в шард, который соответствует полуинтервалу остатков от prev_weights до prev_weights + weight, где prev_weights - это общий вес шардов с наименьшим номером, а weight - это вес этого шард. Например, если имеется два шарда, и первый имеет вес 9, тогда как второй имеет вес 10, строка будет отправлена в первый шард для остатков из диапазона [0, 9), и во второй для остатков из диапазона [9, 19).

Выражение шардирования может быть любым выражением из констант и столбцов таблицы, возвращающим целое число. Например, вы можете использовать выражение rand() для случайного распределения данных или UserID для распределения по остатку от деления ID пользователя (в этом случае данные одного пользователя будут находиться на одном шарде, что упрощает выполнение IN и JOIN по пользователям). Если один из столбцов не распределен достаточно равномерно, вы можете обернуть его в хеш-функцию, например, intHash64(UserID).

Простое взятие остатка от деления является ограниченным решением для шардирования и не всегда подходит. Оно работает для средних и больших объемов данных (десятки серверов), но не подходит для очень больших объемов данных (сотни серверов и более). В последнем случае лучше использовать схему шардирования, требуемую предметной областью, вместо того чтобы использовать записи в таблицах Distributed.

Вам следует уделить внимание схеме шардирования в следующих случаях:

  • Используются запросы, требующие объединения данных (IN или JOIN) по определенному ключу. Если данные шардированы по этому ключу, вы можете использовать локальные IN или JOIN, вместо GLOBAL IN или GLOBAL JOIN, что намного более эффективно.
  • Используется большое количество серверов (сотни или более) с большим количеством небольших запросов, например, запросов на данные отдельных клиентов (например, веб-сайтов, рекламодателей или партнеров). Чтобы небольшие запросы не влияли на весь кластер, имеет смысл расположить данные для одного клиента на одном шарде. В качестве альтернативы, вы можете настроить двуровневое шардирование: разделите весь кластер на "слои", где слой может состоять из нескольких шардов. Данные для одного клиента размещаются на одном слое, но шард может добавляться в слой по мере необходимости, и данные распределяются случайным образом внутри них. Таблицы Distributed создаются для каждого слоя, а одна общая распределенная таблица создается для глобальных запросов.

Данные записываются в фоновом режиме. Когда данные вставляются в таблицу, блок данных просто записывается в локальную файловую систему. Данные отправляются на удаленные сервера в фоновом режиме как можно скорее. Периодичность отправки данных управляется настройками distributed_background_insert_sleep_time_ms и distributed_background_insert_max_sleep_time_ms. Движок Distributed отправляет каждый файл с вставленными данными отдельно, но вы можете включить пакетную отправку файлов с настройкой distributed_background_insert_batch. Эта настройка улучшает производительность кластера за счет более эффективного использования ресурсов локального сервера и сети. Вы должны проверить, успешно ли отправлены данные, проверив список файлов (данные, ожидающие отправки) в директории таблицы: /var/lib/clickhouse/data/database/table/. Количество потоков, выполняющих фоновые задачи, можно задать настройкой background_distributed_schedule_pool_size.

Если сервер перестал существовать или имел резкий перезапуск (например, из-за сбоя аппаратного обеспечения) после INSERT в таблицу Distributed, вставленные данные могут быть потеряны. Если в директории таблицы обнаружена испорченная часть данных, она перемещается в подкаталог broken и больше не используется.

Чтение данных

При запросе таблицы Distributed запросы SELECT отправляются на все шарды и работают независимо от того, как данные распределяются по шартам (они могут распределяться совершенно произвольно). Когда вы добавляете новый шард, вам не нужно переносить старые данные в него. Вместо этого вы можете записывать новые данные в него, используя более тяжелый вес – данные будут распределяться немного неравномерно, но запросы будут работать корректно и эффективно.

Когда опция max_parallel_replicas включена, обработка запросов параллелизуется по всем репликам внутри одного шарда. Для получения дополнительной информации смотрите раздел max_parallel_replicas.

Чтобы узнать больше о том, как обрабатываются распределенные запросы in и global in, обратитесь к этой документации.

Виртуальные колонны

_Shard_num

_shard_num — Содержит значение shard_num из таблицы system.clusters. Тип: UInt32.

примечание

Поскольку функции remote и cluster внутренне создают временную распределенную таблицу, _shard_num доступен также и там.

Смотрите также