Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Writers

Writers define where the processed data is stored after each pipeline batch. Each writer adapts the Arrow RecordBatch output to a specific storage format.

Available Writers

WriterFormatBest for
DUCKDBDuckDB databaseLocal analytics, prototyping
CLICKHOUSEClickHouseProduction analytics at scale
ICEBERGApache IcebergData lake with ACID transactions
DELTA_LAKEDelta LakeData lake with versioning
PYARROW_DATASETParquet filesSimple file-based storage
POSTGRESQLPostgreSQLRelational storage, existing PostgreSQL instances
CSVCSV filesSimple text export, interoperability

Each table in the pipeline data is written as a separate table or directory named after its key (e.g. "transfers"transfers table or transfers/ directory).

All writers support automatic table creation. tiders infers the output schema from the Arrow data and creates tables accordingly. No manual schema definition is needed.

A pipeline can write to more than one backend simultaneously by passing a list of writers. All writers receive the same processed data in parallel on each batch.


DuckDB

Inserts Arrow tables into a DuckDB database. Tables are auto-created on the first push using the Arrow schema. Decimal256 columns are automatically downcast to Decimal128(38, scale) since DuckDB does not support 256-bit decimals — use a cast_by_type step beforehand if you need control over overflow behavior.

Requires: pip install "tiders[duckdb]"

Python

import tiders as cc

# Option 1: plain path
writer = cc.Writer(
    kind=cc.WriterKind.DUCKDB,
    config=cc.DuckdbWriterConfig(
        path="./data/output.duckdb",   # path to create or connect to a DuckDB database
    ),
)

# Option 2: pre-built connection
import duckdb

duckdb_client=duckdb.connect("./data/output.duckdb")

writer = cc.Writer(
    kind=cc.WriterKind.DUCKDB,
    config=cc.DuckdbWriterConfig(
        connection=duckdb_client,   # optional — pre-built DuckDB connection
    ),
)

yaml

writer:
  kind: duckdb
  config:
    path: data/output.duckdb   # required

ClickHouse

Inserts Arrow tables into ClickHouse using the clickhouse-connect async client. Tables are auto-created on the first insert using the inferred Arrow schema. All tables except the anchor_table are inserted in parallel.

Requires: pip install "tiders[clickhouse]"

Python

import tiders as cc
from tiders.config import ClickHouseSkipIndex

# Option 1: plain connection parameters
writer = cc.Writer(
    kind=cc.WriterKind.CLICKHOUSE,
    config=cc.ClickHouseWriterConfig(
        host="localhost",               # optional, default: "localhost"
        port=8123,                      # optional, default: 8123
        username="default",             # optional, default: "default"
        password="",                    # optional, default: ""
        database="default",             # optional, default: "default"
        secure=False,                   # optional, default: False
        engine="MergeTree()",           # optional, default: "MergeTree()"
        order_by={"transfers": ["block_number", "log_index"]},   # optional, per-table ordering key columns.
        codec={"transfers": {"data": "ZSTD(3)"}},   # optional, per-table, per-column compression codecs.
        skip_index={"transfers": [ClickHouseSkipIndex(name="idx_value", val="value", type_="minmax", granularity=1)]},   # optional, per-table list of data-skipping indexes added after table creation.
        create_tables=True,             # optional, default: True.
        anchor_table="transfers",       # optional, default: None.
    ),
)

# Option 2: pre-built async client
import clickhouse_connect

clickhouse_client = await clickhouse_connect.get_async_client(
    host="localhost", port=8123, username="default", password="", database="default",
)

writer = cc.Writer(
    kind=cc.WriterKind.CLICKHOUSE,
    config=cc.ClickHouseWriterConfig(
        client=clickhouse_client,       # optional — pre-built async ClickHouse client
    ),
)

yaml

writer:
  kind: clickhouse
  config:                         # include params to create the ClickHouseClient.
    host: localhost               # required
    port: 8123                    # optional, default: 8123
    username: default             # optional, default: default
    password: ${CH_PASSWORD}      # optional, default: ""
    database: default             # optional, default: default
    secure: false                 # optional, default: false
    engine: MergeTree()           # optional, default: MergeTree()
    order_by:                     # optional — per-table list of ORDER BY columns
      transfers: [block_number, log_index]
    codec:                        # optional — per-table, per-column compression codec
      transfers:
        data: ZSTD(3)
    create_tables: true           # optional, default: true
    anchor_table: transfers       # optional — written last after all other tables

skip_index (Python only): ClickHouseSkipIndex takes name, val (index expression), type_ (e.g. "minmax", "bloom_filter"), and granularity. Indexes are added after table creation via ALTER TABLE ... ADD INDEX.


Iceberg

Writes Arrow tables into an Apache Iceberg catalog. Each table is created in the specified namespace if it does not already exist.

Requires: pip install "tiders[iceberg]"

Python

import tiders as cc

# Option 1: plain catalog parameters
writer = cc.Writer(
    kind=cc.WriterKind.ICEBERG,
    config=cc.IcebergWriterConfig(
        namespace="my_namespace",                          # required — Iceberg namespace (database) to write tables into
        catalog_uri="sqlite:///catalog.db",                # required — URI for the Iceberg catalog
        warehouse="s3://my-bucket/iceberg/",               # required — warehouse root URI for the catalog
        catalog_type="sql",                                # optional, default: "sql"
        write_location="s3://my-bucket/iceberg/",          # optional — storage URI for data files, default: warehouse
    ),
)

# Option 2: pre-built pyiceberg catalog
from pyiceberg.catalog import load_catalog

catalog = load_catalog(
    "my_catalog",
    type="sql",
    uri="sqlite:///catalog.db",
    warehouse="s3://my-bucket/iceberg/",
)

writer = cc.Writer(
    kind=cc.WriterKind.ICEBERG,
    config=cc.IcebergWriterConfig(
        namespace="my_namespace",                          # required
        catalog=catalog,                                   # optional — pre-built pyiceberg Catalog instance
        write_location="s3://my-bucket/iceberg/",          # optional — default: warehouse
    ),
)

yaml

writer:
  kind: iceberg
  config:
    namespace: my_namespace                          # required
    catalog_uri: sqlite:///catalog.db                # required
    warehouse: s3://my-bucket/iceberg/               # required
    catalog_type: sql                                # optional, default: sql
    write_location: s3://my-bucket/iceberg/          # optional, default: warehouse

Delta Lake

Appends Arrow tables to Delta tables using deltalake.write_deltalake with schema merging enabled. Each table is stored at /<table_name>/. All tables except the anchor_table are written in parallel.

Requires: pip install "tiders[delta_lake]"

Python

import tiders as cc

writer = cc.Writer(
    kind=cc.WriterKind.DELTA_LAKE,
    config=cc.DeltaLakeWriterConfig(
        data_uri="s3://my-bucket/delta/",               # required — base URI; each table is written to /<table_name>/
        partition_by={"transfers": ["block_number"]},   # optional — per-table list of partition columns
        storage_options={"AWS_REGION": "us-east-1"},    # optional — cloud storage credentials passed to deltalake
        anchor_table="transfers",                       # optional — written last after all other tables
    ),
)

yaml

writer:
  kind: delta_lake
  config:
    data_uri: s3://my-bucket/delta/    # required
    partition_by:                       # optional — per-table list of partition columns
      transfers: [block_number]
    storage_options:                    # optional — cloud storage credentials
      AWS_REGION: us-east-1
      AWS_ACCESS_KEY_ID: ${AWS_KEY}
    anchor_table: transfers             # optional — written last after all other tables

PyArrow Dataset (Parquet)

Writes Arrow tables as Parquet files using pyarrow.dataset.write_dataset. Each table is stored under <base_dir>/<table_name>/. A monotonic counter is appended to the file name to avoid collisions across successive pushes. All tables except the anchor_table are written in parallel.

Python

import tiders as cc

writer = cc.Writer(
    kind=cc.WriterKind.PYARROW_DATASET,
    config=cc.PyArrowDatasetWriterConfig(
        base_dir="./data/output",                       # required — root directory; each table is written to <base_dir>/<table_name>/
        partitioning={"transfers": ["block_number"]},   # optional — per-table list of partition columns or pyarrow.dataset.Partitioning
        partitioning_flavor={"transfers": "hive"},      # optional — per-table partitioning flavor
        basename_template="part-{i}.parquet",           # optional — output file name template, default: "part-{i}.parquet"
        max_rows_per_file=1_000_000,                    # optional — max rows per output file, default: 0 (unlimited)
        min_rows_per_group=0,                           # optional — min rows per Parquet row group, default: 0
        max_rows_per_group=1024 * 1024,                 # optional — max rows per Parquet row group, default: 1048576
        max_partitions=1024,                            # optional — max number of partitions, default: 1024
        max_open_files=1024,                            # optional — max files open simultaneously, default: 1024
        use_threads=True,                               # optional — use threads for writing, default: True
        create_dir=True,                                # optional — create output directory if missing, default: True
        anchor_table="transfers",                       # optional — written last after all other tables
    ),
)

yaml

writer:
  kind: pyarrow_dataset
  config:
    base_dir: data/output                # required
    partitioning:                        # optional — per-table list of partition columns
      transfers: [block_number]
    partitioning_flavor:                 # optional — per-table flavor (e.g. "hive")
      transfers: hive
    basename_template: part-{i}.parquet  # optional — output file name template
    max_rows_per_file: 1000000           # optional, default: 0 (unlimited)
    min_rows_per_group: 0                # optional, default: 0
    max_rows_per_group: 1048576          # optional, default: 1048576
    max_partitions: 1024                 # optional, default: 1024
    max_open_files: 1024                 # optional, default: 1024
    use_threads: true                    # optional, default: true
    create_dir: true                     # optional, default: true
    anchor_table: transfers              # optional — written last after all other tables

CSV

Writes Arrow tables as CSV files using pyarrow.csv.write_csv. Each table is written to <base_dir>/<table_name>.csv. On successive pushes the file is appended to. All tables except the anchor_table are written in parallel.

Python

import tiders as cc

writer = cc.Writer(
    kind=cc.WriterKind.CSV,
    config=cc.CsvWriterConfig(
        base_dir="./data/output",       # required — root directory; each table is written to <base_dir>/<table_name>.csv
        delimiter=",",                  # optional — field delimiter character, default: ","
        include_header=True,            # optional — write a header row, default: True
        create_dir=True,                # optional — create output directory if missing, default: True
        anchor_table="transfers",       # optional — written last after all other tables
    ),
)

yaml

writer:
  kind: csv
  config:
    base_dir: data/output        # required
    delimiter: ","               # optional, default: ","
    include_header: true         # optional, default: true
    create_dir: true             # optional, default: true
    anchor_table: transfers      # optional — written last after all other tables

PostgreSQL

Inserts Arrow tables into PostgreSQL using the COPY protocol via psycopg v3. Tables are auto-created on the first push using CREATE TABLE IF NOT EXISTS derived from the Arrow schema. All tables except the anchor_table are inserted in parallel.

Requires: pip install "tiders[postgresql]"

Unsupported raw blockchain fields

The PostgreSQL writer does not support List, Struct, or Map Arrow columns. Writing raw EVM or SVM data directly will fail unless you use a step to flatten or drop the affected columns first.

EVM fields that require preprocessing:

TableFieldArrow type
blocksunclesList(Binary)
blockswithdrawalsList(Struct(index, validator_index, address, amount))
transactionsaccess_listList(Struct(address, storage_keys))
transactionsblob_versioned_hashesList(Binary)
tracestrace_addressList(UInt64)

SVM fields that require preprocessing:

TableFieldArrow type
transactionsaccount_keysList(Binary)
transactionssignaturesList(Binary)
transactionsloaded_readonly_addressesList(Binary)
transactionsloaded_writable_addressesList(Binary)
transactionsaddress_table_lookupsList(Struct(account_key, writable_indexes, readonly_indexes))
logsinstruction_addressList(UInt32)
instructionsinstruction_addressList(UInt32)
instructionsrest_of_accountsList(Binary)

Python

import tiders as cc

# Option 1: plain connection parameters (recommended)
writer = cc.Writer(
    kind=cc.WriterKind.POSTGRESQL,
    config=cc.PostgresqlWriterConfig(
        host="localhost",              # optional, default: "localhost"
        port=5432,                     # optional, default: 5432
        user="postgres",               # optional, default: "postgres"
        password="secret",             # optional, default: "postgres"
        dbname="mydb",                 # optional, default: "postgres"
        schema="public",               # optional — PostgreSQL schema (namespace), default: "public"
        create_tables=True,            # optional — auto-create tables on first push, default: True
        anchor_table="transfers",      # optional — written last after all other tables, default: None
    ),
)

# Option 2: pre-built async connection
import psycopg
import asyncio

connection = asyncio.get_event_loop().run_until_complete(
    psycopg.AsyncConnection.connect(
        "host=localhost port=5432 dbname=mydb user=postgres password=secret",
        autocommit=False,
    )
)

writer = cc.Writer(
    kind=cc.WriterKind.POSTGRESQL,
    config=cc.PostgresqlWriterConfig(
        connection=connection,         # optional — pre-built psycopg.AsyncConnection
        schema="public",               # optional, default: "public"
        create_tables=True,            # optional, default: True
        anchor_table="transfers",      # optional, default: None
    ),
)

yaml

writer:
  kind: postgresql
  config:
    host: localhost               # required
    dbname: postgres              # optional, default: postgres
    port: 5432                    # optional, default: 5432
    user: postgres                # optional, default: postgres
    password: ${PG_PASSWORD}      # optional, default: postgres
    schema: public                # optional, default: public
    create_tables: true           # optional, default: true
    anchor_table: transfers       # optional — written last after all other tables