#hdfs #sql #catalog #iceberg

iceberg-catalog-hadoop-unofficial

Apache Iceberg Rust Hadoop Catalog

6 releases

Uses new Rust 2024

0.6.0-alpha.6 Aug 22, 2025
0.6.0-alpha.5 Aug 20, 2025
0.6.0-alpha.4 Aug 19, 2025
0.6.0-alpha.3 Aug 15, 2025

#2025 in Database interfaces

Apache-2.0

3MB
64K SLoC

Apache Iceberg Hadoop Catalog Unofficial Native Rust Implementation

crates.io docs.rs

This crate contains the unofficial Native Rust implementation of Apache Iceberg Hadoop Catalog.

See the API documentation for examples and the full API.

Usage

Single NameNode

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::prelude::SessionContext;
use iceberg_unofficial::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg_unofficial::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hadoop_unofficial::{HadoopCatalog, HadoopCatalogConfig};
use iceberg_datafusion_unofficial::IcebergTableProvider;
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let o = Command::new("cmd")
                        .arg("/c")
                        .arg("kinit -kt C:/tmp/hive@DOMAIN.COM.keytab hive@DOMAIN.COM")
                        .output()
                        .map_err(|e| e.to_string())?;
    println!("kinit output: {:?}", &o);
    let properties = HashMap::from([(
        "fs.defaultFS".to_string(),
        "hdfs://nn1:8020".to_string(),
    )]);

    let config = HadoopCatalogConfig::builder()
        .warehouse("hdfs://nn1:8020/user/hive/iceberg".to_owned())
        .properties(properties)
        .build();
    let catalog = HadoopCatalog::new(config).await?;

    catalog
        .list_namespaces(None)
        .await
        .unwrap()
        .iter()
        .for_each(|namespace| {
            println!("namespace: {:?}", namespace);
        });

    let table_ident = TableIdent::new(
        NamespaceIdent::new("ods".to_owned()),
        "test1".to_owned(),
    );

    let table = catalog.load_table(&table_ident).await?;

    println!("table:{:?}", table.metadata());

    let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
        .await
        .unwrap();

    let ctx = SessionContext::new();

    ctx.register_table("test1", Arc::new(table_provider))
        .unwrap();
    let df = ctx
        .sql("SELECT * FROM test1 limit 100")
        .await
        .unwrap();
    df.show().await.unwrap();
    Ok(())
}

NameNode HA

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::prelude::SessionContext;
use iceberg_unofficial::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg_unofficial::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hadoop_unofficial::{HadoopCatalog, HadoopCatalogConfig};
use iceberg_datafusion_unofficial::IcebergTableProvider;
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let o = Command::new("cmd")
                        .arg("/c")
                        .arg("kinit -kt C:/tmp/hive@DOMAIN.COM.keytab hive@DOMAIN.COM")
                        .output()
                        .map_err(|e| e.to_string())?;
    println!("kinit output: {:?}", &o);
    let properties = HashMap::from([
        ("fs.defaultFS".to_string(), "hdfs://nameservice1".to_string()),
        ("dfs.nameservices".to_string(), "nameservice1".to_string()),
        (
            "dfs.namenode.rpc-address.nameservice1.nn1".to_string(),
            "nn1:8020".to_string(),
        ),
        (
            "dfs.namenode.rpc-address.nameservice1.nn2".to_string(),
            "nn2:8020".to_string(),
        ),
        (
            "dfs.ha.namenodes.nameservice1".to_string(),
            "nn1,nn2".to_string(),
        ),]);

    let config = HadoopCatalogConfig::builder()
        .warehouse("hdfs://nameservice1/user/hive/iceberg".to_owned())
        .properties(properties)
        .build();
    let catalog = HadoopCatalog::new(config).await?;

    catalog
        .list_namespaces(None)
        .await
        .unwrap()
        .iter()
        .for_each(|namespace| {
            println!("namespace: {:?}", namespace);
        });

    let table_ident = TableIdent::new(
        NamespaceIdent::new("ods".to_owned()),
        "test1".to_owned(),
    );

    let table = catalog.load_table(&table_ident).await?;

    println!("table:{:?}", table.metadata());

    let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
        .await
        .unwrap();

    let ctx = SessionContext::new();

    ctx.register_table("test1", Arc::new(table_provider))
        .unwrap();
    let df = ctx
        .sql("SELECT * FROM test1 limit 100")
        .await
        .unwrap();
    df.show().await.unwrap();
    Ok(())
}

Dependencies

~86MB
~1.5M SLoC