Движок распределенной таблицы
Чтобы создать движок распределенной таблицы в ClickHouse Cloud, вы можете использовать remote
и remoteSecure
функции таблиц.
Синтаксис Distributed(...)
не может быть использован в ClickHouse Cloud.
Таблицы с движком Distributed не хранят собственные данные, но позволяют выполнять распределенную обработку запросов на нескольких серверах. Чтение автоматически параллелизуется. Во время чтения используются индексы таблицы на удаленных серверах, если они существуют.
Создание таблицы
Из таблицы
Когда таблица Distributed
указывает на таблицу на текущем сервере, вы можете адаптировать схему этой таблицы:
Параметры распределенности
Параметр | Описание |
---|---|
cluster | Имя кластера в файле конфигурации сервера |
database | Имя удаленной базы данных |
table | Имя удаленной таблицы |
sharding_key (Опционально) | Ключ шардирования. Указание sharding_key необходимо для следующих случаев:
|
policy_name (Опционально) | Имя политики, которое будет использовано для хранения временных файлов для фона отправки |
Смотрите также
- distributed_foreground_insert настройка
- MergeTree для примеров
Настройки распределенности
Настройка | Описание | Значение по умолчанию |
---|---|---|
fsync_after_insert | Выполнять fsync для данных файла после фоновой вставки в Distributed. Гарантирует, что ОС сбросила все вставленные данные на диск на узле инициатора. | false |
fsync_directories | Выполнять fsync для директорий. Гарантирует, что ОС обновила метаданные директорий после операций, связанных с фоновой вставкой в распределенную таблицу (после вставки, после отправки данных в шард и т.д.). | false |
skip_unavailable_shards | Если true, ClickHouse бесшумно пропускает недоступные шарды. Шард помечается как недоступный, когда: 1) шард недоступен из-за сбоя соединения. 2) Шард неразрешим через DNS. 3) Таблица не существует на шарде. | false |
bytes_to_throw_insert | Если более чем это количество сжатых байт будет ожидать фоновой INSERT , будет выброшено исключение. 0 - не выбрасывать. | 0 |
bytes_to_delay_insert | Если более чем это количество сжатых байт будет ожидать фоновой вставки, запрос будет задержан. 0 - не задерживать. | 0 |
max_delay_to_insert | Максимальная задержка вставки данных в распределенную таблицу в секундах, если имеется много ожидающих байт для фоновой отправки. | 60 |
background_insert_batch | То же самое, что и distributed_background_insert_batch | 0 |
background_insert_split_batch_on_failure | То же самое, что и distributed_background_insert_split_batch_on_failure | 0 |
background_insert_sleep_time_ms | То же самое, что и distributed_background_insert_sleep_time_ms | 0 |
background_insert_max_sleep_time_ms | То же самое, что и distributed_background_insert_max_sleep_time_ms | 0 |
flush_on_detach | Сбрасывать данные на удаленные узлы при DETACH /DROP /выключении сервера. | true |
Настройки долговечности (fsync_...
):
- Влияют только на фоновые
INSERT
(т.е.distributed_foreground_insert=false
), когда данные впервые хранятся на диске узла инициатора, а затем, в фоновом режиме, отправляются на шарды. - Могут значительно снизить производительность
INSERT
- Влияют на запись данных, хранящихся внутри папки распределенной таблицы, в узел, который принял ваш вставлянный запрос. Если вам нужны гарантии записи данных в базовые таблицы MergeTree, смотрите настройки долговечности (
...fsync...
) вsystem.merge_tree_settings
Для Настроек пределов вставки (..._insert
) смотрите также:
distributed_foreground_insert
настройкаprefer_localhost_replica
настройкаbytes_to_throw_insert
обрабатывается доbytes_to_delay_insert
, поэтому вы не должны устанавливать его на значение меньше, чемbytes_to_delay_insert
Пример
Данные будут считываться со всех серверов в кластере logs
, из таблицы default.hits
, расположенной на каждом сервере в кластере. Данные не только считываются, но и частично обрабатываются на удаленных серверах (в той мере, в какой это возможно). Например, для запроса с GROUP BY
, данные будут агрегироваться на удаленных серверах, а промежуточные состояния агрегатных функций будут отправлены на сервер запроса. Затем данные будут агрегироваться дальше.
Вместо имени базы данных вы можете использовать константное выражение, которое возвращает строку. Например: currentDatabase()
.
Кластеры
Кластеры настраиваются в файле конфигурации сервера:
Здесь определен кластер с именем logs
, который состоит из двух шардов, каждый из которых содержит две реплики. Шарды относятся к серверам, которые содержат различные части данных (для чтения всех данных необходимо получить доступ ко всем шардом). Реплики дублируют сервера (для чтения всех данных можно получить доступ к данным на любой из реплик).
Именам кластеров не должен содержать точек.
Для каждого сервера указаны параметры host
, port
, а опционально user
, password
, secure
, compression
, bind_host
:
Параметр | Описание | Значение по умолчанию |
---|---|---|
host | Адрес удаленного сервера. Вы можете использовать как доменное имя, так и IPv4 или IPv6 адрес. Если вы указываете доменное имя, сервер выполняет DNS-запрос при старте, и результат сохраняется на время работы сервера. Если DNS-запрос потерпел неудачу, сервер не запускается. Если вы измените DNS-запись, перезапустите сервер. | - |
port | TCP-порт для деятельности обмена сообщениями (tcp_port в конфигурации, обычно установлен на 9000). Не путать с http_port . | - |
user | Имя пользователя для подключения к удаленному серверу. Этот пользователь должен иметь доступ для подключения к указанному серверу. Доступ настраивается в файле users.xml . Для получения дополнительной информации смотрите раздел Права доступа. | default |
password | Пароль для подключения к удаленному серверу (не скрыт). | '' |
secure | Использовать ли защищенное SSL/TLS соединение. Обычно также требует указания порта (стандартный защищенный порт - 9440 ). Сервер должен слушать на <tcp_port_secure>9440</tcp_port_secure> и быть настроен с правильными сертификатами. | false |
compression | Использовать сжатие данных. | true |
bind_host | Исходный адрес, который должен использоваться при подключении к удаленному серверу с этого узла. Поддерживается только IPv4 адрес. Предназначен для сложных сценариев развертывания, где требуется установить исходный IP-адрес, используемый запросами ClickHouse. | - |
При указании реплик одна из доступных реплик будет выбрана для каждого из шардов при чтении. Вы можете настроить алгоритм балансировки нагрузки (предпочтение к какой реплике обращаться) – смотрите настройку load_balancing. Если соединение с сервером не установлено, будет предпринята попытка подключения с коротким тайм-аутом. Если соединение не удалось, будет выбрана следующая реплика и так далее для всех реплик. Если попытка соединения не удалась для всех реплик, попытка будет повторена тем же способом несколько раз. Это работает в пользу устойчивости, но не обеспечивает полной отказоустойчивости: удаленный сервер может принять соединение, но не работать или работать плохо.
Вы можете указать только один из шардов (в этом случае обработка запроса должна называться удаленной, а не распределенной) или любое количество шардов. В каждом шарде вы можете указать от одной до любого количества реплик. Вы можете указать разное количество реплик для каждого шард.
В конфигурации можно указать столько кластеров, сколько вам нужно.
Чтобы просмотреть ваши кластеры, используйте таблицу system.clusters
.
Движок Distributed
позволяет работать с кластером так же, как с локальным сервером. Однако конфигурацию кластера нельзя указывать динамически, её необходимо настраивать в файле конфигурации сервера. Обычно все серверы в кластере будут иметь одинаковую конфигурацию кластера (хотя это не обязательно). Кластеры из файла конфигурации обновляются на лету, без перезапуска сервера.
Если вам нужно отправлять запрос в неизвестный набор шардов и реплик каждый раз, вам не нужно создавать таблицу Distributed
– используйте вместо этого функцию таблицы remote
. См. раздел Функции таблиц.
Запись данных
Существует два метода записи данных в кластер:
Во-первых, вы можете определить, на какие серверы записывать какие данные, и выполнять запись непосредственно на каждом шарде. Другими словами, выполнять прямые операторы INSERT
на удаленных таблицах в кластере, на которые указывает таблица Distributed
. Это наиболее гибкое решение, так как вы можете использовать любую схему шардирования, даже ту, которая является нетривиальной из-за требований предметной области. Это также наиболее оптимальное решение, так как данные могут быть записаны на разные шарды совершенно независимо.
Во-вторых, вы можете выполнять операторы INSERT
в таблицу Distributed
. В этом случае таблица распределит вставленные данные между серверами самостоятельно. Чтобы записать данные в таблицу Distributed
, необходимо настроить параметр sharding_key
(кроме случая, если имеется только один шард).
Каждый шард может иметь определенный <weight>
в файле конфигурации. По умолчанию вес равен 1
. Данные распределяются по шардом в количестве, пропорциональном весу шарда. Все веса шардов суммируются, после чего вес каждого шарда делится на итог для определения пропорции каждого шарда. Например, если имеется два шарда, и первый имеет вес 1, тогда как второй имеет вес 2, первый будет получать одну треть (1 / 3) вставленных строк, а второй будет получать две трети (2 / 3).
Каждый шард может иметь параметр internal_replication
, определенный в файле конфигурации. Если этот параметр установлен в true
, операция записи выбирает первую здоровую реплику и записывает данные в неё. Используйте это, если таблицы, лежащие в основании таблицы Distributed
, являются реплицированными таблицами (например, любая из Replicated*MergeTree
). Одна из реплик таблицы получит запись, и она будет автоматически реплицирована на другие реплики.
Если internal_replication
установлен в false
(по умолчанию), данные записываются во все реплики. В этом случае таблица Distributed
самостоятельно реплицирует данные. Это хуже, чем использование реплицированных таблиц, потому что согласованность реплик не проверяется и со временем они будут содержать несколько разных данных.
Чтобы выбрать, в какой шард отправляется строка данных, анализируется выражение шардирования, и берется остаток от деления на общий вес шардов. Строка отправляется в шард, который соответствует полуинтервалу остатков от prev_weights
до prev_weights + weight
, где prev_weights
- это общий вес шардов с наименьшим номером, а weight
- это вес этого шард. Например, если имеется два шарда, и первый имеет вес 9, тогда как второй имеет вес 10, строка будет отправлена в первый шард для остатков из диапазона [0, 9), и во второй для остатков из диапазона [9, 19).
Выражение шардирования может быть любым выражением из констант и столбцов таблицы, возвращающим целое число. Например, вы можете использовать выражение rand()
для случайного распределения данных или UserID
для распределения по остатку от деления ID пользователя (в этом случае данные одного пользователя будут находиться на одном шарде, что упрощает выполнение IN
и JOIN
по пользователям). Если один из столбцов не распределен достаточно равномерно, вы можете обернуть его в хеш-функцию, например, intHash64(UserID)
.
Простое взятие остатка от деления является ограниченным решением для шардирования и не всегда подходит. Оно работает для средних и больших объемов данных (десятки серверов), но не подходит для очень больших объемов данных (сотни серверов и более). В последнем случае лучше использовать схему шардирования, требуемую предметной областью, вместо того чтобы использовать записи в таблицах Distributed
.
Вам следует уделить внимание схеме шардирования в следующих случаях:
- Используются запросы, требующие объединения данных (
IN
илиJOIN
) по определенному ключу. Если данные шардированы по этому ключу, вы можете использовать локальныеIN
илиJOIN
, вместоGLOBAL IN
илиGLOBAL JOIN
, что намного более эффективно. - Используется большое количество серверов (сотни или более) с большим количеством небольших запросов, например, запросов на данные отдельных клиентов (например, веб-сайтов, рекламодателей или партнеров). Чтобы небольшие запросы не влияли на весь кластер, имеет смысл расположить данные для одного клиента на одном шарде. В качестве альтернативы, вы можете настроить двуровневое шардирование: разделите весь кластер на "слои", где слой может состоять из нескольких шардов. Данные для одного клиента размещаются на одном слое, но шард может добавляться в слой по мере необходимости, и данные распределяются случайным образом внутри них. Таблицы
Distributed
создаются для каждого слоя, а одна общая распределенная таблица создается для глобальных запросов.
Данные записываются в фоновом режиме. Когда данные вставляются в таблицу, блок данных просто записывается в локальную файловую систему. Данные отправляются на удаленные сервера в фоновом режиме как можно скорее. Периодичность отправки данных управляется настройками distributed_background_insert_sleep_time_ms и distributed_background_insert_max_sleep_time_ms. Движок Distributed
отправляет каждый файл с вставленными данными отдельно, но вы можете включить пакетную отправку файлов с настройкой distributed_background_insert_batch. Эта настройка улучшает производительность кластера за счет более эффективного использования ресурсов локального сервера и сети. Вы должны проверить, успешно ли отправлены данные, проверив список файлов (данные, ожидающие отправки) в директории таблицы: /var/lib/clickhouse/data/database/table/
. Количество потоков, выполняющих фоновые задачи, можно задать настройкой background_distributed_schedule_pool_size.
Если сервер перестал существовать или имел резкий перезапуск (например, из-за сбоя аппаратного обеспечения) после INSERT
в таблицу Distributed
, вставленные данные могут быть потеряны. Если в директории таблицы обнаружена испорченная часть данных, она перемещается в подкаталог broken
и больше не используется.
Чтение данных
При запросе таблицы Distributed
запросы SELECT
отправляются на все шарды и работают независимо от того, как данные распределяются по шартам (они могут распределяться совершенно произвольно). Когда вы добавляете новый шард, вам не нужно переносить старые данные в него. Вместо этого вы можете записывать новые данные в него, используя более тяжелый вес – данные будут распределяться немного неравномерно, но запросы будут работать корректно и эффективно.
Когда опция max_parallel_replicas
включена, обработка запросов параллелизуется по всем репликам внутри одного шарда. Для получения дополнительной информации смотрите раздел max_parallel_replicas.
Чтобы узнать больше о том, как обрабатываются распределенные запросы in
и global in
, обратитесь к этой документации.
Виртуальные колонны
_Shard_num
_shard_num
— Содержит значение shard_num
из таблицы system.clusters
. Тип: UInt32.
Смотрите также
- Описание Виртуальных колонн
- Настройка
background_distributed_schedule_pool_size
- Функции
shardNum()
иshardCount()