#differential #join #dataflow #operator #input #collection #delta

differential-dogs3

Advanced join patterns in differential dataflow

4 releases

new 0.1.3 Jan 9, 2025
0.1.2 Dec 19, 2024
0.1.1 Nov 11, 2024
0.1.0 Oct 29, 2024

#172 in Data structures

Download history 1477/week @ 2024-10-28 2230/week @ 2024-11-04 2930/week @ 2024-11-11 2222/week @ 2024-11-18 1616/week @ 2024-11-25 2126/week @ 2024-12-02 1490/week @ 2024-12-09 1645/week @ 2024-12-16 794/week @ 2024-12-23 872/week @ 2024-12-30 1602/week @ 2025-01-06

5,052 downloads per month

MIT license

1MB
694 lines

Worst-case optimal joins and delta queries in differential dataflow

This project collects differential dataflow operators sufficient to implement memory-efficient delta queries, and worst-case optimal delta queries in the framework of differential dataflow. Many of the ideas are a transportation of work done in the dataflow-join project.


Imagine we have a collection of (src, dst) pairs describing a graph, and we want to determine the set of triangles:

triangles(a,b,c) := edge(a,b), edge(b,c), edge(a,c)

Differential dataflow provides a binary join operator that matches records in its two inputs with the same key, and produces the pairs of values. While this operator serves its role well, its naive implementation needs to maintain each of its inputs in indexed form to respond quickly to changes on either input.

In the example above, we might write

edges.join(&edges)
     .map(|(a,(b,c))| ((b,c),a))
     .semijoin(&edges)
     .map(|((b,c),a)| (a,b,c));

This produces all triples (a,b,c) satisfying the triangle definition above, and it will update incrementally for arbitrary changes to the edges collection. Unfortunately, the first input to semijoin has size proportional to the sum of the squares of the degrees in the graph, which can be quite large.

The database community has a few remedies for this problem, and we will be borrowing two of them.

Delta queries

Starting from the triangles query above, let's treat each of the three uses of edges as distinct relations.

triangles(a,b,c) := edge1(a,b), edge2(b,c), edge3(a,c)

We can write relational "delta queries" for how the triangles collection should change as a function of changes to any of the input relations.

d_tris1(a,b,c) := d_edge1(a,b), edge2(b,c), edge3(a,c)
d_tris2(a,b,c) := d_edge2(b,c), edge1(a,b), edge3(a,c)
d_tris3(a,b,c) := d_edge3(a,c), edge1(a,b), edge2(b,c)

All I've done here is written three rules by taking each relation in turn, replacing it by a version with a d_ prefix, and then listing the remaining relations. Each of these queries tell us how to respond to a change to the associated relation.

Delta queries are appealing because they can be implemented in a stateless manner, maintaining only the indexed representations of edges (probably indexed both by source and by destination). They do not need to maintain intermediate collections, as differential dataflow does by default.

There is an important subtle detail, which the above example demonstrates: if there are simultaneous updates to many relations, we can't simply apply all delta queries at the same time. One change to edges could cause the three rules above to derive the same change multiple times. Instead, we need to execute the delta queries "in order", perhaps the order written above. Of course, we don't plan on actually doing that, but we can use timely dataflow's timestamps to impose a logical if not physical order on the execution.

In examples/delta_query.rs we can write these three rules as so:

// d_tris1(a,b,c) := d_edge1(a,b), edge2(b,c), edge3(a,c)
let d_tris1 = forward
    .propose_using(&mut neu_forward.extend_using(|(_a,b)| *b))
    .validate_using(&mut neu_forward.extend_using(|(a,_b)| *a))
    .map(|((a,b),c)| (a,b,c));

// d_tris2(a,b,c) := d_edge2(b,c), edge1(a,b), edge3(a,c)
let d_tris2 = forward
    .propose_using(&mut alt_reverse.extend_using(|(b,_c)| *b))
    .validate_using(&mut neu_reverse.extend_using(|(_b,c)| *c))
    .map(|((b,c),a)| (a,b,c));

// d_tris3(a,b,c) := d_edge3(a,c), edge1(a,b), edge2(b,c)
let d_tris3 = forward
    .propose_using(&mut alt_forward.extend_using(|(a,_c)| *a))
    .validate_using(&mut alt_reverse.extend_using(|(_a,c)| *c))
    .map(|((a,c),b)| (a,b,c));

where the alt and neu prefixes refer to whether the data are old or new (old data can't "see" new data until the next timestamp), and the forward and reverse suffixes are the direction the data are indexed.

There is clearly a lot of code jargon going on here, but this all works and maintains a fixed memory footprint, proportional to the number of edges rather than the number of intermediate triangle candidates.

Worst-case optimal joins

If you don't know what these are you are in for a treat.

Worst-case optimal joins are a new way of join processing that operate attribute-at-a-time, rather than relation-at-a-time, and which take time proportional to the worst-case possible result size set. For something like triangle computation, this is at most the number of edges raised to the power 1.5, which is better than the power 2, which is what a traditional relational engine would guarantee.

There are several worst-case optimal join algorithms, but my favorite comes from the dataflow-join repository. The gist here is that whenever you have a tuple like (a,b) to which you are thinking about adding a new attribute c, best to take stock of the possible ways that the new attribute c might be added. For example, in

d_tris1(a,b,c) := d_edge1(a,b), edge2(b,c), edge3(a,c)

the new c needs to be present in both edge2 and in edge3. However, for a specific (a,b) pair, either of these two relations might propose more or fewer candidate c values. What if you were smart and first asked each relation to report the number of distinct values they would propose? Then you could have that relation propose the values, and the other relations validate these choices in time proportional to the number of proposals.

You can perform this in native differential dataflow over in the examples/ngo.rs example. While cool, this version has the defect that the differential implementation again maintains intermediate state. This state can be much smaller than the naive joins, but it can also be much larger than the input relations.

Of course, we can combine the delta queries above with the worst-case optimal join processing, using all of the sweet loot in this project here. In examples/delta_query_wcoj.rs we write the same delta queries, but implemented using worst-case optimal joins.

// d_tris1(a,b,c) := d_edge1(a,b), edge2(b,c), edge3(a,c)
let d_tris1 = forward
    .extend(&mut [
        &mut neu_forward.extend_using(|(_a,b)| *b),
        &mut neu_forward.extend_using(|(a,_b)| *a),
    ])
    .map(|((a,b),c)| (a,b,c));

// d_tris2(a,b,c) := d_edge2(b,c), edge1(a,b), edge3(a,c)
let d_tris2 = forward
    .extend(&mut [
        &mut alt_reverse.extend_using(|(b,_c)| *b),
        &mut neu_reverse.extend_using(|(_b,c)| *c),
    ])
    .map(|((b,c),a)| (a,b,c));

// d_tris3(a,b,c) := d_edge3(a,c), edge1(a,b), edge2(b,c)
let d_tris3 = forward
    .extend(&mut [
        &mut alt_forward.extend_using(|(a,_c)| *a),
        &mut alt_reverse.extend_using(|(_a,c)| *c),
    ])
    .map(|((a,c),b)| (a,b,c));

Some numbers

Let's take these two implementations for a performance spin. None of these numbers are going to be nearly as exciting as for hand-rolled triangle counting implementations, sorry! However, they are fully differential, which means you could use them inside of an iterative Datalog computation, for example. Or with high refresh frequency, or distributed on multiple workers, etc. It's up to you to decide if these things are worth doing.

We have a few examples, including a naive differential WCOJ (examples/ngo.rs), and the delta queries and worst-case optimal delta queries. For the moment, let's just look at the second two.

If we spin up examples/delta_query.rs with the livejournal graph, asking it to introduce nodes (and all their edges) one at a time, we see

Echidnatron% cargo run --release --example delta_query -- ~/Projects/Datasets/livejournal 1
    Finished release [optimized] target(s) in 0.07s
     Running `target/release/examples/delta_query /Users/mcsherry/Projects/Datasets/livejournal 1`
3.429001ms  Round 1 complete
32.624059ms Round 2 complete
38.686335ms Round 3 complete
41.348212ms Round 4 complete
44.279022ms Round 5 complete
...

which looks like a great start. However, it goes really fast and quickly gets "stuck". If we wait a moment, we see

425.859742ms    Round 85 complete
426.276961ms    Round 86 complete
426.550825ms    Round 87 complete
26.317746169s   Round 88 complete
26.318698709s   Round 89 complete
26.319160175s   Round 90 complete
...

which is pretty bad news.

What happened here is a great example of why you want worst-case optimal joins. It turns out that node 87 has 13,127 out-going edges. That means that the third update rule in the delta query

// d_tris3(a,b,c) := d_edge3(a,c), edge1(a,b), edge2(b,c)
let d_tris3 = forward
    .propose_using(&mut alt_forward.extend_using(|(a,_c)| *a))
    .validate_using(&mut alt_reverse.extend_using(|(_a,c)| *c))
    .map(|((a,c),b)| (a,b,c));

takes in 13,127 changes and produces 172,318,129 proposals, being that number of changes squared. In this case, it was a horrible idea to ask edge1 to propose changes to extend d_edge3. It would have been much better to ask edge2 to do the proposals, especially because we are loading the graph in node order and there aren't so many reverse edges yet. Of course we don't want to bake that in as a rule, but the worst-case optimal implementation can figure this out automatically.

Let's see how examples/delta_query_wcoj.rs does:

Echidnatron% cargo run --release --example delta_query_wcoj -- ~/Projects/Datasets/livejournal 1
    Finished release [optimized] target(s) in 0.14s
     Running `target/release/examples/delta_query_wcoj /Users/mcsherry/Projects/Datasets/livejournal 1`
3.649981ms  Round 1 complete
5.279384ms  Round 2 complete
6.196575ms  Round 3 complete
6.990896ms  Round 4 complete
7.662358ms  Round 5 complete
...

Lots faster, you might have noticed. Even without the horrible behavior seen above, most of these nodes have degree in the hundreds, and the implementation is smart enough to use the reverse directions when appropriate.

84.9538ms   Round 85 complete
87.320066ms Round 86 complete
88.923623ms Round 87 complete
144.530066ms    Round 88 complete
145.450224ms    Round 89 complete
146.103021ms    Round 90 complete
...

Node 87 still causes some mischief, but this has more to do with our computation ingesting those 13,127 edges and checking each of them to see what needs to be done. We still perform linear work, just not quadratic in this case.

You can take the examples out for spins with bigger batching if you want higher throughput. I wouldn't recommend this for the vanilla delta query (it has problems enough) but you can do e.g.

Echidnatron% cargo run --release --example delta_query_wcoj -- ~/Projects/Datasets/livejournal 1000
    Finished release [optimized] target(s) in 0.07s
     Running `target/release/examples/delta_query_wcoj /Users/mcsherry/Projects/Datasets/livejournal 1000`
675.443522ms    Round 1000 complete
1.534094533s    Round 2000 complete
2.341095966s    Round 3000 complete
3.192567920s    Round 4000 complete
3.952057645s    Round 5000 complete
4.619403029s    Round 6000 complete
5.617067697s    Round 7000 complete
6.260885764s    Round 8000 complete
7.649967738s    Round 9000 complete
8.279152976s    Round 10000 complete
...

This is showing us that we can perform over one thousand distinct updates each second (note: not a batch of one thousand updates), though this number will probably drop off as we let it run (the graph fills out). There is still plenty to improve and work on, and at the moment this is more about unique functionality rather than raw performance.

Dependencies

~13MB
~106K SLoC