0.1.1 |
|
---|
#28 in #query-execution
99KB
2.5K
SLoC
HETNETDB
Hetnetdb is a database that is intended to be extremely flexible and easy to use. In hetnetdb (het, short for heteorogenous, and net, short for networked), the goal is to provide easy access to data in heterogeneous storage platforms and networks via SQL queries. Goals of the database include supporting CSV and JSON stored on multiple storage platforms including local, server, edge, and streaming. To achieve this goal, there will be agent support for browser, iOS, android, linux, and macOS. (Not windows, we specifically don't want to support windows users!). Computational operations will be executed by opaque HTTP endpoints for extreme ease of use.
Table of Contents
Priorities, IO, Crunch, Network
In supporting edge peripherals as units in the execution graph, we open the door for supporting extremely large networks of devices. This can make crunching numbers generated by puny devices very easy and tracable, but it can also increase overall query execution time and create a long tail for individual stages of execution time.
With such limitations out in the open, the graph of execution must account for drastic differences in disk, network, and processing in order to provide low-latency query performance for various topologies. Improving this performance is a secondary goal; design decisions will be made to emphasize usability first!
General Architecture and Nomenclature
Before defining terms, let's define a teir list: Good(TG), Bad(TB), and Ugly(TU). This teir list will be used to refer to the grade of capabilities of an attribute. These rankings indicate order(s) of magnitude difference. For example, an edge device communicating over BLE would have networking capabilities on the Ugly Teir or TU for short. Your smartphone might get TB for networking, and a server in the datacenter will get TG.
- Query Server: This is a host in the cloud. This runs the HTTP Server with endpoints for submitting query requests, requesting execution, etc. Generally, a Query Server should be TG.
- Agent: This is a host of executors. Agents are heterogenous in hetnetdb with sets of capabilities all over the teir list. Agents should not share resources with other agents, unless they have TU rankings. Agents may manage executors to retry or balance workloads, but when they fail to find an acceptable executor configuration, the query fails.
- Executor: The actual data manipulation happens inside of an executor. They respect their resource limitations and do their best to complete a job. They are fully independent from each other. They produce either results or maybe recoverable errors.
- parser: The interface for accessing data is SQL. The parser will turn your query into an error message or an execution graph.
- Execution Nodes: These are the high level todo list for the query. The Query Server keeps track of agents and delegates nodes to agents as it traverses the execution graph
- Execution Graph: The execution graph is the parsed plan for executing Execution Nodes. Every query has one graph pending finalization as error or results.
Execution Graph Traits and Structs
The GraphBuilder
struct exists to expose the builder pattern to create, validate, and optimize an execution graph in concise syntax. GraphBuilder::new(query_id)
begins constructing an execution graph for a persisted query by query_id
such that distributed requests during execution can render a new execution graph, searching by query_id
. On GraphBuilder::build(&mut self)-> RootNode
, the graph is optimized and ready to query (or continue querying).
The execution graph itself has a RootNode
intended to act as a metadata, pass-through node that supports the same async Node
interface as the rest of the HyperNode
s in the graph. Each HyperNode
is an Execution Node in the execution graph with direct ties to relational algebra. Like RootNode
, HyperNode
exposes an async fn curse() -> Arc<WorkNodeCursor>
interface for traversing result sets yielded by the asynchronous processing interface of a WorkNodeCursor
. A HyperNode
is a meta node that does not actually do compute or read; it initializes and yields to the compute and read done at the WorkNode
instances. WorkNode
instances are intended to be flexibly deployed and partitioned, they implement the logic to support the various OpType
s or IoType
s.
Discussed and motivated in the following Topologies section, a HyperNode
may run in an Executor on one Agent and expose an asynchronous cursor to WorkNode
s running on Executors on a different Agent.
Topologies
Obviously, there are some ambitious goals that aren't going to be completed any time soon.
v0 target topologies
- Query Server + (Enter Generic Client) Agent: A user has CSV/JSON they uploaded to the (website, app). They run the SQL query in the (browser, phone).
- Query Server + Local Agent: A user has a directory of CSV/JSON. They run a SQL query against that data without upload.
- Query Server + Local Agent + Remote Streaming Agent: A user has a directory of CSV/JSON. A Remote Streaming Agent is measuring data from an sensor and uploading to Query Server. Remote Streaming Agent's data is processed on Query Server + Local Agent.
v1 target topologies
- Query Server Service + Multi-tenant Agent Pools + Elastic Storage: A user submits a query to Query Service. Data flows from agent pools and elastic storage. Compute happens in Query Service and Agent Pools.
- Query Server Service + Elastic Storage + Local Agent + Remote Streaming Agent: A user sumbits a query to Query Service. Data flows from elastic storage and remote streaming agents. New data from streaming agent is persisted in elastic storage and/or local agent.
- Query Server Service + Edge Agents: Botnets can compute and store too
- Query Server Service + Data redistribution: A user can move data to desired agents or have the service decide the best place
v3 target topologies
- Query Server Service + Mutli-tenant Agent Pools + Edge Agents: Run a social media platform. pay agents with ad revenue.
Performance Goals
There are a couple important workloads considering target datasets will include various attributes including:
- Time filter
- Geo filter
- Id filter
- Capture then filter
- Shuffle data distribution
Realistically, with the target architectures for v0, the best we can hope for is sub-second latency. With on-the-fly data parsing and HTTP servers at every hop of the execution graph, 500ms query execution time would be amazing. As agents enter the pool and indexes get more complicated new goals for new workloads will be established.
Development
- Install: cargo, libpq, diesel_cli (with postgres), systemfd, cargo-watch
- Build/Test:
cargo build
orcargo test
- Run dev server:
systemfd --no-pid -s http::6969 -- cargo watch -x run
- Run prod server:
cargo run --release
Example Usage
- Install httpie
- Submit the query to the endpoint piecewise or wholesale:
jwtrueb@jbmp hetnetdb % echo '{ "text": "SELECT count(*) from agents" }' | http post :6969/query/submit
HTTP/1.1 200 OK
content-length: 87
content-type: application/json
date: Sat, 14 Nov 2020 21:23:58 GMT
{
"records": [
{
"columns": [
{
"i64": 42
}
],
"ready": {
"dt_utc": "2020-11-14T21:23:58.730786Z"
}
}
]
}
Booking Keeping
Releases are to be created and tagged off of master with semantic versioning. The README should be up to date. The table of contents can be updated automatically with a markdown toc generator: cargo install markdown-toc
and md-toc README.md
. The licenses were inspected using cargo install cargo-license
, but running the tool was odd rustup run nightly cargo-license
.
Feature Roadmap and Wish List
While in the early stages of the project, this TODO list will hold the temporary feature roadmap.
- Create an HTTP Server to route all actions through. All requests will perform message passing via HTTP requests and manipulate shared state by transacting with postgres.
- Create routes for submitting select queries:
- To try to parse a SQL query
- To optimize a SQL query
- To execute an optimized SQL query
- To submit an unchecked SQL query and wait for results
- Inflate an optimized query into an execution graph
- Define base graph types and relationships
- Create execution nodes for data _
- Data filtering: WHERE
- Data grouping: GROUP BY
- Data ordering: ORDER BY
- Data functions:
- COUNT(*)
- COUNT()
- COUNT(DISTINCT)
- min
- max
- avg
- ?stddev
- Create execution nodes for data load
- Data limiting: LIMIT
- Data offset: OFFSET
- Create routes for data load with schema enforcement
- To upload CSV to be cached
- To parse CSV that is cached
- To stream CSV into cached table
- To register S3 configs to download the data (via HTTP request)
- To register agent configs to process data locally (requires agency CLI/daemon services)
- Create agency CLI
- Register capabilities
- Heartbeat system load
- Mark registration in computational group
- Mark registration in storage group
- Process compute events from Query Server Service
- Process storage events from Query Server Service
- Configuring a monitoring dashboard
- TICK stack
- LogDNA event tracing for inidividual queries
- Create an execution graph visualization tool
Improvement Wish List
- Create execution cost models and benchmarks
- Improve query optimization
- Improve execution graph inflation
- Add JSON support for CSV whereever CSV is referenced
- Pick a faster serde format too
- Re-route workloads on heartbeat system load events
- Switch to a different parser that supports
- Common Table Expressions
- Window Functions
- Reasonably Abritrary Syntax Expansions
- Run a benchmark on about 1B rows and/or 100GB uncompressed CSV
- Run an agency CLI service on something that produces rows by streaming from an edge device
- Provide reliability mechanisms for tracking ingestion of partially ingested data (after endpoint before rest).
Milestone Interactions
First Count Star
Data was
- Not persisted
- Generated by
seq 1 100000
- Uploaded 100 times in parallel
- Only allowed to come from 1 table
- Expected to be traversed line by line to count
jwtrueb@jbmp hetnetdb % for i in `seq 1 100`; do http --multipart POST :6969/tables/upload/1 'Authorization: Bearer zKpze8PrHL0RfEoZwTeFKCrzL56RprSwJRm1hFp6KwTOfInwAzW8btLHuiMtfD12' csv@./sequence.csv & ; done
jwtrueb@jbmp hetnetdb % echo '{ "text": "select count(*) from simple" }' | http POST :6969/query/submit 'Authorization: Bearer zKpze8PrHL0RfEoZwTeFKCrzL56RprSwJRm1hFp6KwTOfInwAzW8btLHuiMtfD12'
HTTP/1.1 200 OK
content-length: 92
content-type: application/json
date: Mon, 16 Nov 2020 04:36:29 GMT
{
"records": [
{
"columns": [
{
"i64": 1000000
}
],
"ready": {
"dt_utc": "2020-11-16T04:36:29.126917Z"
}
}
]
}
First 100 Million Row Query
Data was
- not persisted
- Generated by
for i in $(seq 0 999999); do echo $i,$i,$i >> data/i64_i64_i64.csv; done
(21MB csv) - Uploaded 100 times in parallel in 🔥 29.44s 🔥
- Validated and cached during upload at 🔥 600Mbps 🔥 (using request completion therefore including upload time (1.34Gbps peak))
- Calculated by summing rows in cached shared app data
- Query ran in ⚡ 9.4ms ⚡
Client
jwtrueb@jbmp hetnetdb % ./query.sh $(cat target/auth.txt)
HTTP/1.1 200 OK
content-length: 94
content-type: application/json
date: Sun, 22 Nov 2020 20:46:46 GMT
{
"records": [
{
"columns": [
{
"i64": 100000000
}
],
"ready": {
"dt_utc": "2020-11-22T20:46:46.697957Z"
}
}
]
}
Server
[2020-11-22T20:46:46Z INFO hetnetdb::query::routes] /query/execute Query { text: "select count(*) from hndefault", parse: Some(Select(SelectStatement { tables: [Table { name: "hndefault", alias: None }], distinct: false, fields: [Col(Column { name: "count(*)", alias: None, table: None, function: Some(CountStar) })], join: [], where_clause: None, group_by: None, order: None, limit: None })), optimal_parse: Some(Select(SelectStatement { tables: [Table { name: "hndefault", alias: None }], distinct: false, fields: [Col(Column { name: "count(*)", alias: None, table: None, function: Some(CountStar) })], join: [], where_clause: None, group_by: None, order: None, limit: None })) }
[2020-11-22T20:46:46Z INFO actix_web::middleware::logger] 127.0.0.1:52091 "POST /query/submit HTTP/1.1" 200 94 "-" "HTTPie/2.3.0" 0.009403
Drill Benchmarking
The beginnings of a benchmark suite use drill (cargo install drill
) to directly hit the HTTP server. The configs are in drill.yml
. The first case is running count star in parallel 1000x.
Fun fact this snippet comes from querying against the 100 million rows loaded in the previous milestone.
jwtrueb@jbmp hetnetdb % drill --benchmark drill.yml --stats
Concurrency 4
Iterations 1000
Rampup 2
Base URL http://localhost:6969
Run queries http://localhost:6969/query/submit 200 OK 13ms
Run queries http://localhost:6969/query/submit 200 OK 17ms
Run queries http://localhost:6969/query/submit 200 OK 14ms
Run queries http://localhost:6969/query/submit 200 OK 16ms
Run queries http://localhost:6969/query/submit 200 OK 7ms
...
Run queries http://localhost:6969/query/submit 200 OK 7ms
Run queries http://localhost:6969/query/submit 200 OK 6ms
Run queries Total requests 1000
Run queries Successful requests 1000
Run queries Failed requests 0
Run queries Median time per request 6ms
Run queries Average time per request 7ms
Run queries Sample standard deviation 1ms
Time taken for tests 1.8 seconds
Total requests 1000
Successful requests 1000
Failed requests 0
Requests per second 547.94 [#/sec]
Median time per request 6ms
Average time per request 7ms
Sample standard deviation 1ms
First Executed Graph
A select star with a reorder, project, and select from cache was run via drill.
jwtrueb@jbmp hetnetdb % drill --benchmark drill.yml --stats
Concurrency 4
Iterations 1000
Rampup 2
Base URL http://localhost:6969
...
Run queries http://localhost:6969/query/submit 200 OK 24ms
Run queries http://localhost:6969/query/submit 200 OK 24ms
Run queries http://localhost:6969/query/submit 200 OK 25ms
Run queries Total requests 1000
Run queries Successful requests 1000
Run queries Failed requests 0
Run queries Median time per request 17ms
Run queries Average time per request 23ms
Run queries Sample standard deviation 107ms
Time taken for tests 6.1 seconds
Total requests 1000
Successful requests 1000
Failed requests 0
Requests per second 165.13 [#/sec]
Median time per request 17ms
Average time per request 23ms
Sample standard deviation 107ms
Here are some log snippets showing how the statement compiled and executed.
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Found SelectStatement: SelectStatement { tables: [Table { name: "hndefault", alias: None }], distinct: false, fields: [All], join: [], where_clause: None, group_by: None, order: None, limit: None }
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Beginning collect for Server(
Whole,
)
NodeInfo {
input: Single(
HyperNode {
name: "project",
columns: None,
info: NodeInfo {
input: Single(
HyperNode {
name: "select_hndefault",
columns: None,
info: NodeInfo {
input: Leaf,
personality: Leaf(
Ram(
"hndefault",
),
),
},
execution_info: Mutex {
is_locked: false,
has_waiters: false,
},
},
),
personality: Op(
Project,
),
},
execution_info: Mutex {
is_locked: false,
has_waiters: false,
},
},
),
personality: Op(
Reorder,
),
}
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Collecting Op Reorder
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Beginning collect for Server(
Whole,
)
NodeInfo {
input: Single(
HyperNode {
name: "select_hndefault",
columns: None,
info: NodeInfo {
input: Leaf,
personality: Leaf(
Ram(
"hndefault",
),
),
},
execution_info: Mutex {
is_locked: false,
has_waiters: false,
},
},
),
personality: Op(
Project,
),
}
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Collecting Op Project
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Beginning collect for Server(
Whole,
)
NodeInfo {
input: Leaf,
personality: Leaf(
Ram(
"hndefault",
),
),
}
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Collecting Leaf Ram("hndefault")
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Loading table_data from ram cache
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Found table_data with 1 partitions
[2020-11-29T04:38:35Z TRACE hetnetdb::graph::node] Processing 50 records for partition 0
Dependencies
~46MB
~785K SLoC