#iceberg #data-fusion #catalog #query-engine #unofficial #object-store

iceberg-s3tables-catalog

S3Tables catalog for the unofficial Iceberg table format implementation

3 unstable releases

0.7.0 Feb 19, 2025
0.6.1 Dec 10, 2024
0.6.0 Dec 9, 2024

#2134 in Database interfaces

Download history 199/week @ 2024-12-04 115/week @ 2024-12-11 5/week @ 2024-12-18 2/week @ 2025-01-08 146/week @ 2025-02-19 15/week @ 2025-02-26 4/week @ 2025-03-05 6/week @ 2025-03-12

171 downloads per month
Used in 3 crates (via frostbow)

Apache-2.0

300KB
5.5K SLoC

Rust implementation of Apache Iceberg

Apache Iceberg is Open Table Format that brings ACID quarantees to large analytic datasets. This repository contains a Rust implementation of Apache Iceberg that focuses on the interoperability with the Arrow ecosystem. It provides an Iceberg integration for the Datafusion query engine.

Crates.io Apache V2.0 licensed Build Status

Features

Iceberg tables

Feature Status
Read
Read partitioned
Insert
Insert partitioned
Equality deletes
Positional deletes

Iceberg Views

Feature Status
Read

Iceberg Materialized Views

Feature Status
Read
Full refresh
Incremental refresh

Catalogs

  • REST
  • S3Tables
  • Filesystem
  • Glue
  • RDBMS (Postgres, MySQL)

File formats

  • parquet

Integrations

Example

Check out the datafusion examples.

use datafusion::{arrow::array::Int64Array, prelude::SessionContext};
use datafusion_iceberg::DataFusionTable;
use iceberg_rust::{
    catalog::Catalog,
    spec::{
        partition::{PartitionField, PartitionSpec, Transform},
        schema::Schema,
        types::{PrimitiveType, StructField, StructType, Type},
    },
    table::Table,
};
use iceberg_sql_catalog::SqlCatalog;
use object_store::memory::InMemory;
use object_store::ObjectStore;

use std::sync::Arc;

#[tokio::main]
pub(crate) async fn main() {
    let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());

    let catalog: Arc<dyn Catalog> = Arc::new(
        SqlCatalog::new("sqlite://", "test", object_store.clone())
            .await
            .unwrap(),
    );

    let schema = Schema::builder()
        .with_fields(
            StructType::builder()
                .with_struct_field(StructField {
                    id: 1,
                    name: "id".to_string(),
                    required: true,
                    field_type: Type::Primitive(PrimitiveType::Long),
                    doc: None,
                })
                .with_struct_field(StructField {
                    id: 2,
                    name: "customer_id".to_string(),
                    required: true,
                    field_type: Type::Primitive(PrimitiveType::Long),
                    doc: None,
                })
                .with_struct_field(StructField {
                    id: 3,
                    name: "product_id".to_string(),
                    required: true,
                    field_type: Type::Primitive(PrimitiveType::Long),
                    doc: None,
                })
                .with_struct_field(StructField {
                    id: 4,
                    name: "date".to_string(),
                    required: true,
                    field_type: Type::Primitive(PrimitiveType::Date),
                    doc: None,
                })
                .with_struct_field(StructField {
                    id: 5,
                    name: "amount".to_string(),
                    required: true,
                    field_type: Type::Primitive(PrimitiveType::Int),
                    doc: None,
                })
                .build()
                .unwrap(),
        )
        .build()
        .unwrap();

    let partition_spec = PartitionSpec::builder()
        .with_partition_field(PartitionField::new(4, 1000, "day", Transform::Day))
        .build()
        .expect("Failed to create partition spec");

    let table = Table::builder()
        .with_name("orders")
        .with_location("/test/orders")
        .with_schema(schema)
        .with_partition_spec(partition_spec)
        .build(&["test".to_owned()], catalog)
        .await
        .expect("Failed to create table");

    let table = Arc::new(DataFusionTable::from(table));

    let ctx = SessionContext::new();

    ctx.register_table("orders", table).unwrap();

    ctx.sql(
        "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES 
        (1, 1, 1, '2020-01-01', 1),
        (2, 2, 1, '2020-01-01', 1),
        (3, 3, 1, '2020-01-01', 3),
        (4, 1, 2, '2020-02-02', 1),
        (5, 1, 1, '2020-02-02', 2),
        (6, 3, 3, '2020-02-02', 3);",
    )
    .await
    .expect("Failed to create query plan for insert")
    .collect()
    .await
    .expect("Failed to insert values into table");

    let batches = ctx
        .sql("select product_id, sum(amount) from orders group by product_id;")
        .await
        .expect("Failed to create plan for select")
        .collect()
        .await
        .expect("Failed to execute select query");

    for batch in batches {
        if batch.num_rows() != 0 {
            let (product_ids, amounts) = (
                batch
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
                batch
                    .column(1)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
            );
            for (product_id, amount) in product_ids.iter().zip(amounts) {
                if product_id.unwrap() == 1 {
                    assert_eq!(amount.unwrap(), 7)
                } else if product_id.unwrap() == 2 {
                    assert_eq!(amount.unwrap(), 1)
                } else if product_id.unwrap() == 3 {
                    assert_eq!(amount.unwrap(), 3)
                } else {
                    panic!("Unexpected product id")
                }
            }
        }
    }

    ctx.sql(
        "INSERT INTO orders (id, customer_id, product_id, date, amount) VALUES 
        (7, 1, 3, '2020-01-03', 1),
        (8, 2, 1, '2020-01-03', 2),
        (9, 2, 2, '2020-01-03', 1);",
    )
    .await
    .expect("Failed to create query plan for insert")
    .collect()
    .await
    .expect("Failed to insert values into table");

    let batches = ctx
        .sql("select product_id, sum(amount) from orders group by product_id;")
        .await
        .expect("Failed to create plan for select")
        .collect()
        .await
        .expect("Failed to execute select query");

    for batch in batches {
        if batch.num_rows() != 0 {
            let (product_ids, amounts) = (
                batch
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
                batch
                    .column(1)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .unwrap(),
            );
            for (product_id, amount) in product_ids.iter().zip(amounts) {
                if product_id.unwrap() == 1 {
                    assert_eq!(amount.unwrap(), 9)
                } else if product_id.unwrap() == 2 {
                    assert_eq!(amount.unwrap(), 2)
                } else if product_id.unwrap() == 3 {
                    assert_eq!(amount.unwrap(), 4)
                } else {
                    panic!("Unexpected product id")
                }
            }
        }
    }
}

Dependencies

~60MB
~1M SLoC