6 releases
Uses new Rust 2024
| 0.6.0-alpha.6 | Aug 22, 2025 |
|---|---|
| 0.6.0-alpha.5 | Aug 20, 2025 |
| 0.6.0-alpha.4 | Aug 19, 2025 |
| 0.6.0-alpha.3 | Aug 15, 2025 |
#2025 in Database interfaces
3MB
64K
SLoC
Apache Iceberg Hadoop Catalog Unofficial Native Rust Implementation
This crate contains the unofficial Native Rust implementation of Apache Iceberg Hadoop Catalog.
See the API documentation for examples and the full API.
Usage
Single NameNode
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::prelude::SessionContext;
use iceberg_unofficial::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg_unofficial::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hadoop_unofficial::{HadoopCatalog, HadoopCatalogConfig};
use iceberg_datafusion_unofficial::IcebergTableProvider;
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let o = Command::new("cmd")
.arg("/c")
.arg("kinit -kt C:/tmp/hive@DOMAIN.COM.keytab hive@DOMAIN.COM")
.output()
.map_err(|e| e.to_string())?;
println!("kinit output: {:?}", &o);
let properties = HashMap::from([(
"fs.defaultFS".to_string(),
"hdfs://nn1:8020".to_string(),
)]);
let config = HadoopCatalogConfig::builder()
.warehouse("hdfs://nn1:8020/user/hive/iceberg".to_owned())
.properties(properties)
.build();
let catalog = HadoopCatalog::new(config).await?;
catalog
.list_namespaces(None)
.await
.unwrap()
.iter()
.for_each(|namespace| {
println!("namespace: {:?}", namespace);
});
let table_ident = TableIdent::new(
NamespaceIdent::new("ods".to_owned()),
"test1".to_owned(),
);
let table = catalog.load_table(&table_ident).await?;
println!("table:{:?}", table.metadata());
let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test1", Arc::new(table_provider))
.unwrap();
let df = ctx
.sql("SELECT * FROM test1 limit 100")
.await
.unwrap();
df.show().await.unwrap();
Ok(())
}
NameNode HA
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::prelude::SessionContext;
use iceberg_unofficial::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg_unofficial::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hadoop_unofficial::{HadoopCatalog, HadoopCatalogConfig};
use iceberg_datafusion_unofficial::IcebergTableProvider;
use std::process::Command;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let o = Command::new("cmd")
.arg("/c")
.arg("kinit -kt C:/tmp/hive@DOMAIN.COM.keytab hive@DOMAIN.COM")
.output()
.map_err(|e| e.to_string())?;
println!("kinit output: {:?}", &o);
let properties = HashMap::from([
("fs.defaultFS".to_string(), "hdfs://nameservice1".to_string()),
("dfs.nameservices".to_string(), "nameservice1".to_string()),
(
"dfs.namenode.rpc-address.nameservice1.nn1".to_string(),
"nn1:8020".to_string(),
),
(
"dfs.namenode.rpc-address.nameservice1.nn2".to_string(),
"nn2:8020".to_string(),
),
(
"dfs.ha.namenodes.nameservice1".to_string(),
"nn1,nn2".to_string(),
),]);
let config = HadoopCatalogConfig::builder()
.warehouse("hdfs://nameservice1/user/hive/iceberg".to_owned())
.properties(properties)
.build();
let catalog = HadoopCatalog::new(config).await?;
catalog
.list_namespaces(None)
.await
.unwrap()
.iter()
.for_each(|namespace| {
println!("namespace: {:?}", namespace);
});
let table_ident = TableIdent::new(
NamespaceIdent::new("ods".to_owned()),
"test1".to_owned(),
);
let table = catalog.load_table(&table_ident).await?;
println!("table:{:?}", table.metadata());
let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test1", Arc::new(table_provider))
.unwrap();
let df = ctx
.sql("SELECT * FROM test1 limit 100")
.await
.unwrap();
df.show().await.unwrap();
Ok(())
}
Dependencies
~86MB
~1.5M SLoC