Перейти к основному содержанию
Перейти к основному содержанию

Коннектор Spark

Этот коннектор использует оптимизации, специфичные для ClickHouse, такие как продвинутое разбиение на партиции и проталкивание предикатов (predicate pushdown), чтобы улучшить производительность запросов и обработку данных. Коннектор основан на официальном JDBC-коннекторе ClickHouse и управляет собственным каталогом.

До версии Spark 3.0 в Spark не было встроенной концепции каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. При использовании этих внешних решений пользователям приходилось вручную регистрировать таблицы источников данных, прежде чем получать к ним доступ в Spark. Однако с тех пор, как в Spark 3.0 была введена концепция каталога, Spark теперь может автоматически обнаруживать таблицы посредством регистрации плагинов каталогов.

Каталог по умолчанию в Spark — spark_catalog, а таблицы идентифицируются как {catalog name}.{database}.{table}. С новой возможностью работы с каталогами теперь можно добавлять несколько каталогов и работать с ними в одном приложении Spark.

Требования

  • Java 8 или 17 (для Spark 4.0 требуется Java 17+)
  • Scala 2.12 или 2.13 (Spark 4.0 поддерживает только Scala 2.13)
  • Apache Spark 3.3, 3.4, 3.5 или 4.0

Матрица совместимости

ВерсияСовместимые версии SparkВерсия JDBC-драйвера ClickHouse
mainSpark 3.3, 3.4, 3.5, 4.00.9.4
0.9.0Spark 3.3, 3.4, 3.5, 4.00.9.4
0.8.1Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3Нет зависимости
0.3.0Spark 3.2, 3.3Нет зависимости
0.2.1Spark 3.2Нет зависимости
0.1.2Spark 3.2Нет зависимости

Установка и настройка

Для интеграции ClickHouse со Spark доступно несколько вариантов установки, подходящих для разных конфигураций проектов. Вы можете добавить коннектор ClickHouse для Spark как зависимость непосредственно в файл сборки вашего проекта (например, в pom.xml для Maven или build.sbt для SBT). Либо вы можете поместить необходимые JAR-файлы в каталог $SPARK_HOME/jars/ или передать их напрямую как параметр Spark с помощью флага --jars в команде spark-submit. Оба подхода обеспечивают доступность коннектора ClickHouse в вашей среде Spark.

Импорт как зависимость

<dependency>
  <groupId>com.clickhouse.spark</groupId>
  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
  <version>{{ stable_version }}</version>
</dependency>
<dependency>
  <groupId>com.clickhouse</groupId>
  <artifactId>clickhouse-jdbc</artifactId>
  <classifier>all</classifier>
  <version>{{ clickhouse_jdbc_version }}</version>
  <exclusions>
    <exclusion>
      <groupId>*</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Добавьте следующий репозиторий, если вы хотите использовать версию SNAPSHOT.

<repositories>
  <repository>
    <id>sonatype-oss-snapshots</id>
    <name>Sonatype OSS Snapshots Repository</name>
    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
  </repository>
</repositories>

Скачайте библиотеку

Шаблон имени бинарного JAR-файла:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

Вы можете найти все доступные релизные JAR‑файлы в Maven Central Repository и все ежедневные SNAPSHOT‑сборки JAR‑файлов в Sonatype OSS Snapshots Repository.

Справочные материалы

Крайне важно включить clickhouse-jdbc JAR с классификатором «all», так как коннектор зависит от clickhouse-http и clickhouse-client, — оба они входят в clickhouse-jdbc:all. В качестве альтернативы вы можете добавить clickhouse-client JAR и clickhouse-http по отдельности, если предпочитаете не использовать полный JDBC‑пакет.

В любом случае убедитесь, что версии пакетов совместимы в соответствии с матрицей совместимости.

Зарегистрируйте каталог (обязательно)

Чтобы получить доступ к своим таблицам ClickHouse, необходимо настроить новый каталог Spark со следующими параметрами:

СвойствоЗначениеЗначение по умолчаниюОбязательно
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/AYes
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhostNo
spark.sql.catalog.<catalog_name>.protocolhttphttpNo
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123No
spark.sql.catalog.<catalog_name>.user<clickhouse_username>defaultNo
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(пустая строка)No
spark.sql.catalog.<catalog_name>.database<database>defaultNo
spark.<catalog_name>.write.formatjsonarrowNo

Эти параметры можно задать одним из следующих способов:

  • Отредактировать или создать spark-defaults.conf.
  • Передать конфигурацию в команду spark-submit (или в команды CLI spark-shell/spark-sql).
  • Добавить конфигурацию при инициализации контекста.
Справочные материалы

При работе с кластером ClickHouse необходимо задать уникальное имя каталога для каждого экземпляра. Например:

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host           10.0.0.1
spark.sql.catalog.clickhouse1.protocol       https
spark.sql.catalog.clickhouse1.http_port      8443
spark.sql.catalog.clickhouse1.user           default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database       default
spark.sql.catalog.clickhouse1.option.ssl     true

spark.sql.catalog.clickhouse2                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host           10.0.0.2
spark.sql.catalog.clickhouse2.protocol       https
spark.sql.catalog.clickhouse2.http_port      8443
spark.sql.catalog.clickhouse2.user           default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database       default
spark.sql.catalog.clickhouse2.option.ssl     true

Таким образом, вы сможете обращаться к таблице <ck_db>.<ck_table> в clickhouse1 из Spark SQL как clickhouse1.<ck_db>.<ck_table>, а к таблице <ck_db>.<ck_table> в clickhouse2 — как clickhouse2.<ck_db>.<ck_table>.

Настройки ClickHouse Cloud

При подключении к ClickHouse Cloud обязательно включите SSL и задайте необходимый режим SSL. Например:

spark.sql.catalog.clickhouse.option.ssl        true
spark.sql.catalog.clickhouse.option.ssl_mode   NONE

Чтение данных

public static void main(String[] args) {
        // Создайте сеанс Spark
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

        df.show();

        spark.stop();
    }

Запись данных

 public static void main(String[] args) throws AnalysisException {

        // Создание сессии Spark
        SparkSession spark = SparkSession.builder()
                .appName("example")
                .master("local[*]")
                .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
                .config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
                .config("spark.sql.catalog.clickhouse.protocol", "http")
                .config("spark.sql.catalog.clickhouse.http_port", "8123")
                .config("spark.sql.catalog.clickhouse.user", "default")
                .config("spark.sql.catalog.clickhouse.password", "123456")
                .config("spark.sql.catalog.clickhouse.database", "default")
                .config("spark.clickhouse.write.format", "json")
                .getOrCreate();

        // Определение схемы для DataFrame
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType, false),
        });

        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob")
        );

        // Создание DataFrame
        Dataset<Row> df = spark.createDataFrame(data, schema);

        df.writeTo("clickhouse.default.example_table").append();

        spark.stop();
    }

Операции DDL

Вы можете выполнять операции DDL в экземпляре ClickHouse с помощью Spark SQL, при этом все изменения немедленно сохраняются в ClickHouse. Spark SQL позволяет писать запросы так же, как и в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие, без каких-либо изменений, например:

Примечание

При использовании Spark SQL за один раз может быть выполнен только один оператор.

USE clickhouse; 

CREATE TABLE test_db.tbl_sql (
  create_time TIMESTAMP NOT NULL,
  m           INT       NOT NULL COMMENT 'ключ партиции',
  id          BIGINT    NOT NULL COMMENT 'ключ сортировки',
  value       STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
  engine = 'MergeTree()',
  order_by = 'id',
  settings.index_granularity = 8192
);

Приведённые выше примеры демонстрируют запросы Spark SQL, которые вы можете выполнять в своём приложении с использованием любого из API — Java, Scala, PySpark или оболочки.

Конфигурации

Ниже приведены настраиваемые параметры, доступные в коннекторе:


КлючПо умолчаниюОписаниеНачиная с
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиционирования, например cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если true, неподдерживаемые выражения игнорируются, в противном случае выполнение немедленно завершается с исключением. Обратите внимание, что при включённой настройке spark.clickhouse.write.distributed.convertLocal игнорирование неподдерживаемых ключей шардирования может привести к повреждению данных.0.4.0
spark.clickhouse.read.compression.codeclz4Кодек, используемый для распаковки данных при чтении. Поддерживаемые кодеки: none, lz4.0.5.0
spark.clickhouse.read.distributed.convertLocaltrueПри чтении таблицы Distributed использовать локальную таблицу вместо самой распределённой. Если true, игнорировать spark.clickhouse.read.distributed.useClusterNodes.0.1.0
spark.clickhouse.read.fixedStringAsдвоичныйСчитывать тип ClickHouse FixedString как заданный тип данных Spark. Поддерживаемые типы: binary, string0.8.0
spark.clickhouse.read.formatjsonФормат сериализации для чтения. Поддерживаемые форматы: JSON, Binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalseВключить динамический фильтр при чтении.0.8.0
spark.clickhouse.read.splitByPartitionIdtrueЕсли установлено значение true, формировать входной фильтр партиций по виртуальному столбцу _partition_id, а не по значению партиции. Известны проблемы при построении SQL-предикатов по значению партиции. Для использования этой возможности требуется ClickHouse Server v21.6+.0.4.0
spark.clickhouse.useNullableQuerySchemafalseЕсли true, помечать все поля схемы запроса как допускающие значение NULL при выполнении CREATE/REPLACE TABLE ... AS SELECT ... при создании таблицы. Обратите внимание, что эта настройка требует SPARK-43390 (доступно в Spark 3.5); без этого патча данная опция фактически всегда равна true.0.8.0
spark.clickhouse.write.batchSize10000Количество записей в одном пакете при записи в ClickHouse.0.1.0
spark.clickhouse.write.compression.codeclz4Кодек, используемый для сжатия данных при их записи. Поддерживаемые кодеки: none, lz4.0.3.0
spark.clickhouse.write.distributed.convertLocalfalseПри записи в таблицу Distributed данные записываются в локальную таблицу, а не в саму распределённую таблицу. Если установлено значение true, параметр spark.clickhouse.write.distributed.useClusterNodes игнорируется.0.1.0
spark.clickhouse.write.distributed.useClusterNodestrueПри записи в распределённую таблицу записывать данные на все узлы кластера.0.1.0
spark.clickhouse.write.formatстрелкаФормат сериализации при записи. Поддерживаемые форматы: JSON, Arrow0.4.0
spark.clickhouse.write.localSortByKeytrueЕсли установлено значение true, выполнять локальную сортировку по ключам сортировки перед записью.0.3.0
spark.clickhouse.write.localSortByPartitionзначение параметра spark.clickhouse.write.repartitionByPartitionЕсли имеет значение true, выполняется локальная сортировка по разделу перед записью. Если не задано, используется значение spark.clickhouse.write.repartitionByPartition.0.3.0
spark.clickhouse.write.maxRetry3Максимальное количество повторных попыток записи для одной пакетной операции, завершившейся сбоем с кодами ошибок, допускающими повторную попытку.0.1.0
spark.clickhouse.write.repartitionByPartitiontrueОпределяет, нужно ли переразбивать данные по ключам партиционирования ClickHouse, чтобы они соответствовали распределению данных в таблице ClickHouse перед записью.0.3.0
spark.clickhouse.write.repartitionNum0Перед записью требуется перераспределить данные в соответствии с распределением таблицы ClickHouse. Используйте этот параметр конфигурации для указания количества переразбиений; значение меньше 1 означает, что перераспределение не требуется.0.1.0
spark.clickhouse.write.repartitionStrictlyfalseЕсли true, Spark будет строго распределять входящие записи по разделам, чтобы обеспечить требуемое распределение перед записью данных в таблицу источника. В противном случае Spark может применять определённые оптимизации для ускорения запроса, но при этом нарушить требуемое распределение. Обратите внимание, что для этой конфигурации необходим патч SPARK-37523 (доступен в Spark 3.4); без этого патча она всегда ведёт себя как true.0.3.0
spark.clickhouse.write.retryInterval10sИнтервал в секундах между повторными попытками записи.0.1.0
spark.clickhouse.write.retryableErrorCodes241Коды ошибок, допускающих повторную попытку, возвращаемые сервером ClickHouse при сбое записи.0.1.0

Поддерживаемые типы данных

В этом разделе описывается соответствие типов данных между Spark и ClickHouse. Таблицы ниже служат быстрым справочником по преобразованию типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.

Чтение данных из ClickHouse в Spark

Тип данных ClickHouseТип данных SparkПоддерживаетсяПримитивный типПримечания
NothingNullTypeДа
BoolBooleanTypeДа
UInt8, Int16ShortTypeДа
Int8ByteTypeДа
UInt16,Int32IntegerTypeДа
UInt32,Int64, UInt64LongTypeДа
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Да
Float32FloatTypeДа
Float64DoubleTypeДа
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeДа
FixedStringBinaryType, StringTypeДаОпределяется настройкой READ_FIXED_STRING_AS
DecimalDecimalTypeДаТочность и масштаб до Decimal128
Decimal32DecimalType(9, scale)Да
Decimal64DecimalType(18, scale)Да
Decimal128DecimalType(38, scale)Да
Date, Date32DateTypeДа
DateTime, DateTime32, DateTime64TimestampTypeДа
ArrayArrayTypeНетТип элементов массива также преобразуется
MapMapTypeНетКлючи ограничены типом StringType
IntervalYearYearMonthIntervalType(Year)Да
IntervalMonthYearMonthIntervalType(Month)Да
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeНетИспользуется соответствующий тип интервала
Object
Nested
TupleStructTypeНетПоддерживает как именованные, так и неименованные кортежи. Именованные кортежи сопоставляются с полями структуры по имени, неименованные используют _1, _2 и т. д. Поддерживаются вложенные структуры и Nullable-поля
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

Вставка данных из Spark в ClickHouse

Тип данных SparkТип данных ClickHouseПоддерживаетсяПримитивныйПримечания
BooleanTypeBoolДаОтображается в тип Bool (а не UInt8) начиная с версии 0.9.0
ByteTypeInt8Да
ShortTypeInt16Да
IntegerTypeInt32Да
LongTypeInt64Да
FloatTypeFloat32Да
DoubleTypeFloat64Да
StringTypeStringДа
VarcharTypeStringДа
CharTypeStringДа
DecimalTypeDecimal(p, s)ДаТочность и масштаб до Decimal128
DateTypeDateДа
TimestampTypeDateTimeДа
ArrayType (list, tuple, or array)ArrayНетТип элементов массива также преобразуется
MapTypeMapНетКлючи ограничены типом StringType
StructTypeTupleНетПреобразуется в именованный Tuple с именами полей.
VariantTypeVariantTypeНет
Object
Nested

Участие и поддержка

Если вы хотите внести вклад в проект или сообщить о каких-либо проблемах, мы будем рады вашей обратной связи! Посетите наш репозиторий на GitHub, чтобы создать issue, предложить улучшения или отправить pull request. Мы рады любому вкладу! Прежде чем начать, ознакомьтесь с руководством по внесению изменений в репозитории. Спасибо, что помогаете делать наш коннектор ClickHouse для Spark лучше!