Как упоминалось во вступлении, ClickHouse прекрасно масштабируется горизонтально, что дает нам возможность шардировать данные. Это позволяет нам распараллеливать большие запросы на нескольких серверах, используя всю их мощность. Итак, как же работает данная конструкция?
Все очень и очень просто. Представьте себе что у вас есть таблица с данными о переходе на ваш сайт (допустим access_log). В нее мы просто залили все логи из nginx'а. Все запросы по этой таблице работали очень хорошо и быстро, пока она не разрослать до нескольких миллиардов строк за сутки. И теперь мы хотим произвести шардирование - то есть разделить наши данные пополам - первая половина будет лежать на первом сервере, а вторая на втором. При этом мы естественно не хотим вручную делать запрос на каждом сервере, а потом агрегировать эти данные. Все должен делать кликхаус.
Итак, кликхаус позволяет нам это сделать достаточно простым способом. Мы создаем идентичные таблицы на каждой ноде и записываем половину данных на первый сервер, а вторую половину на второй. После этого на каждом сервере создаем distributed таблицу, которая будет опрашивать все наши шарды, забирать данные, агрегировать их и отдавать нам конечный результат. Естественно distributed таблице нужно знать какие ноды входят в наш кластер и какие шарды у нас есть - поэтому нам придется описать наш кластер в конфигурации. Но все остальное за нас сделает кликхаус.
Итак, в данном задании мы создадим кластер из 2-х нод ClickHouse, а для этого нам нужно:
- Установить ClickHouse на все машины кластера.
- Описать структуру кластера в конфигурационном файле.
- Создать локальные таблицы на каждой из нод кластера.
- Создать
distributed
таблицы (view) для таблиц кластера.
На самом деле здесь нет ничего хитрого - всего лишь надо так же добавить официальные репозитории и запустить команду apt для установки сервера. Единственный момент, который мы хотим посоветовать - это соблюдать одинаковую версию на всех нодах кластера, чтобы не возникало никаких проблем.
При обновлении, так же стоит выделять отдельную ноду для тестирования апгрейда и смотреть как кликхаус общается с ней. Вообще все большие изменения описываются в release notes для каждой конкретной версии кликхауса, поэтому читайте их перед обновлением и проводите тесты на своем внутреннем стенде.
А теперь переходим к конфигурации кластера. Основные параметры в конфигурационном файле config.xml
, которые нас интересуют - это:
- remote_servers - — основное место, где описывается структура кластера.
- include_from - — путь к конфигурационному файлу с подстановками.
<remote_servers incl="clickhouse_remote_servers" />
Убедитесь, что параметр выше в конфиге пустой. Если нет, то удалите содержимое этих параметров и оставьте так, как указано в примере, поскольку мы вынесем конфигурацию кластера в отдельный файл с подстановками.
В нашем примере есть атрибут incl="some_name"
. Он сообщает, что если в файле с подстановками есть параметр, имя которого соответствует значению этого атрибута, то он будут подставлен в соответствующее место. Сконфигурировать путь файла с подстановками можно через параметр <include_from>
.
Если этот параметр еще не указан в config.xml
, то нам нужно добавить его. Название файла с подстановками может быть произвольное, но я назову его cluster.xml
.
<include_from>/etc/clickhouse-server/cluster.xml</include_from>
Теперь перейдем к конфигурации самого кластера. Наш кластер состоит из 2-х шардов по 1-й реплике в каждом. Каждая нода ClickHouse является репликой. Для начала мы обойдемся без репликации данных. Полная конфигурация cluster.xml
выглядит следующим образом:
<?xml version="1.0"?>
<yandex>
<clickhouse_remote_servers>
<mycluster> // Название кластера
<shard>
<replica>
<host>167.99.142.32</host> // первая нода в нашем кластере
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>159.65.123.161</host> // вторая нода в нашем кластере
<port>9000</port>
</replica>
</shard>
</mycluster>
</clickhouse_remote_servers>
</yandex>
В данном параметре remote_servers
(clickhouse_remote_servers
) мы можем описать любое кол-во кластеров. В нем мы описываем название нашего кластера (mycluster
), кол-во шардов и реплик в нём. Поэтому для каждой реплики указываем адрес и порт (TCP) ch1
и ch2
соответственно.
Эта схема добавляет огромную гибкость - вы можете использовать кликхаус сервер в разных кластерах одновременно. То есть для некоторых таблиц он может являться 3им шардом в тестовом кластере, но при этом для другой таблице он может быть 1-ым шардом в стейджовом кластере.
Теперь, когда мы закончили с конфигурацией, нужно проверить, все ли корректно работает. Для начала убедимся, что ClickHouse обнаружил и успешно прочитал наш cluster.xml
конфиг-файл.
Перезапустим сервис для применения конфига и посмотрим логи, нас интересует строка Including configuration file '/etc/clickhouse-server/cluster.xml'.
. Если в логах есть это сообщение, а в логах ошибок ClickHouse нет замечаний — идем дальше.
Чтобы протестировать работоспособность кластера, создадим на нем таблицы. Они должны находиться на всех нодах и называться одинаково.
CREATE TABLE posts
(
id Int64,
title String,
description String,
content String,
date Date
)
ENGINE = MergeTree()
PARTITION BY date
ORDER BY id;
Здесь стоит отметить формат создания таблицы. Хоть мы и не углубляемся в данные и их хранение, но здесь стоит обратить внимание на ключевое слово ENGINE, которое указывает какой движок для таблицы необходимо использовать. Условно все движки можно разделить на два типа - виртуальные и для хранения данных. Виртуальные - это те, которое фактически не хранят никаких данных, а забирают их из других источников. А вот движки для хранения данных физически хранят данные в файлах на диске и читают их при селект запросах.
Основной движок для хранения данных - mergetree. Он хранит данные и в фоновом режиме производит их сортировку и слияние. При записи данных в таблицу типа merge tree, движок создает файл на диске - part и сохраняет туда полученные данные. Потом в фоновом режиме он берет несколько частей таблицы, сортирует значения по первичному ключу (чаще всего по дате / времени) и объединяет несколько файлов в один.
В процессе слияния и сортировки некоторые движки могут, например, агрегировать ваши данные по какому-то ключу или условию. Один из таких движков называется SummingMergeTree, который суммирует ваши данные.
Для начала проясним, что такое distributed таблица в ClickHouse?
Distributed (распределенная) таблица не хранит никаких данных и по сути является виртуальной, как мы выяснили выше. Её основная задача — распределение запросов на все локальные таблицы в узлах кластера. К примеру при запросе SELECT
Distributed таблица переписывает запрос, выбирает нужные удаленные узлы и отправляет им запрос. При этом стоить заметить, что сам запрос максимально обрабатывается на стороне узлов. Как только Distributed таблица получает данные она их объединяет.
Теперь к созданию Distributed таблиц. Они должны иметь такую же структуру, как и таблицы, которые мы создали ранее, но другой движок для запросов:
CREATE TABLE posts_distributed
(
id Int64,
title String,
description String,
content String,
date Date
)
ENGINE = Distributed('mycluster', 'default', 'posts', rand());
В параметрах движка таблицы мы указываем:
- имя кластера,
- база данных с таблицей,
- имя таблицы,
- ключ шардирования.
Теперь пришло время вставки данных в ClickHouse. Мы можем вставлять данные либо в конкретный шард, либо используя distributed таблицу. Для начала вставим данные только в один шард (ноду ch1
).
root@ch1:~# clickhouse-client --query "INSERT INTO posts FORMAT CSV" < posts.csv
root@ch1:~# clickhouse-client --query "SELECT count() FROM posts"
500000
Как мы видим, данные вставились. Теперь сделаем выборку с ch2
ноды:
root@ch2:~# clickhouse-client --query "SELECT count() FROM posts"
0
root@ch2:~# clickhouse-client --query "SELECT count() FROM posts_distributed"
500000
Видно, что на другой ноде данных в табличке posts
нету, поскольку мы вставили их прямо в шард. Но они есть в posts_distributed
, поскольку эта таблица смотрит на оба шарда.
Отлично, мы успешно реализовали ClickHouse кластер!