1 unstable release
0.1.0 | Sep 7, 2020 |
---|
#2248 in Database interfaces
61KB
1.5K
SLoC
Async rust client for Apache Druid
Fully asynchronous, future-enabled Apache Druid client library for rust programming language.
The library provides staticly typed API for Native Queries
Installation
The library is hosted on crates.io.
[dependencies]
druid-io = "*"
Supported Native Queries
- Timeseries
- TopN
- GroupBy
- Scan
- Search
- TimeBoundary
- SegmentMetadata
- DataSourceMetadata
Usage
Client
Connect to a druid cluster throughly staticly provided list of brokers:
let druid_client = DruidClient::new(vec!["localhost:8082".to_string()]);
Connector to Druid cluster through Zookeeper - supports autodiscovery of new brokers and load balancing:
TODO:
Querying
Timeseries
See Timeseries query documentation
#[derive(Serialize, Deserialize, Debug)]
pub struct TimeAggr {
count: usize,
count_fraction: f32,
user: String,
}
let timeseries = Timeseries {
data_source: DataSource::table("wikipedia"),
limit: Some(10),
descending: false,
granularity: Granularity::All,
filter: Some(Filter::selector("user", "Taffe316")),
aggregations: vec![
Aggregation::count("count"),
Aggregation::StringFirst {
name: "user".into(),
field_name: "user".into(),
max_string_bytes: 1024,
},
],
post_aggregations: vec![PostAggregation::Arithmetic {
name: "count_fraction".into(),
function: "/".into(),
fields: vec![
PostAggregator::field_access("count_percent", "count"),
PostAggregator::constant("hundred", 100.into()),
],
ordering: None,
}],
intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
context: context,
};
let result = tokio_test::block_on(druid_client.timeseries::<TimeAggr>(×eries));
TopN
See Apache Druid TopN query documentation
#[derive(Serialize, Deserialize, Debug)]
struct WikiPage {
page: String,
user: Option<String>,
count: usize,
}
let top_n = TopN {
data_source: DataSource::table("wikipedia"),
dimension: Dimension::default("page"),
threshold: 10,
metric: "count".into(),
aggregations: vec![
Aggregation::count("count"),
Aggregation::StringFirst {
name: "user".into(),
field_name: "user".into(),
max_string_bytes: 1024,
},
],
intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
granularity: Granularity::All,
context: Default::default(),
};
let druid_client = DruidClient::new(vec!["localhost:8082".to_string()]);
let result = tokio_test::block_on(druid_client.top_n::<WikiPage>(&top_n));
GroupBy
See Apache Druid GroupBy query documentation
let group_by = GroupBy {
data_source: DataSource::table("wikipedia"),
dimensions: vec![Dimension::Default {
dimension: "page".into(),
output_name: "page".into(),
output_type: OutputType::STRING,
}],
limit_spec: Some(LimitSpec {
limit: 10,
columns: vec![OrderByColumnSpec::new(
"page",
Ordering::Descending,
SortingOrder::Alphanumeric,
)],
}),
granularity: Granularity::All,
filter: Some(Filter::selector("user", "Taffe316")),
aggregations: vec![
Aggregation::count("count"),
Aggregation::StringFirst {
name: "user".into(),
field_name: "user".into(),
max_string_bytes: 1024,
},
],
post_aggregations: vec![PostAggregation::Arithmetic {
name: "count_fraction".into(),
function: "/".into(),
fields: vec![
PostAggregator::field_access("count_percent", "count"),
PostAggregator::constant("hundred", 100.into()),
],
ordering: None,
}],
having: Some(HavingSpec::greater_than("count_fraction", 0.01.into())),
intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
subtotal_spec: Default::default(),
context: Default::default(),
};
let result = tokio_test::block_on(druid_client.group_by::<WikiPage>(&group_by));
Scan (with inner join)
See Apache Druid TimeBoundary query documentation
Let's try something more complex: inner join
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct ScanEvent {
#[serde(rename(deserialize = "__time"))]
time: usize,
city_name: Option<String>,
comment: Option<String>,
namespace: Option<String>,
page: Option<String>,
region_iso_code: Option<String>,
user: String,
#[serde(rename(deserialize = "c.languages"))]
languages: Option<String>,
}
let scan = Scan {
data_source: DataSource::join(JoinType::Inner)
.left(DataSource::table("wikipedia"))
.right(
DataSource::query(
Scan {
data_source: DataSource::table("countries"),
batch_size: 10,
intervals: vec![
"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z"
.into(),
],
result_format: ResultFormat::List,
columns: vec!["Name".into(), "languages".into()],
limit: None,
filter: None,
ordering: Some(Ordering::None),
context: std::collections::HashMap::new(),
}
.into(),
),
"c.",
)
.condition("countryName == \"c.Name\"")
.build()
.unwrap(),
batch_size: 10,
intervals: vec!["-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".into()],
result_format: ResultFormat::List,
columns: vec![],
limit: Some(10),
filter: None,
ordering: Some(Ordering::None),
context: Default::default(),
};
let result = tokio_test::block_on(druid_client.scan::<ScanEvent>(&scan));
Dependencies
~7–11MB
~202K SLoC