4 releases

0.1.3 Jan 5, 2024
0.1.2 Nov 21, 2023
0.1.1 Jul 23, 2023
0.1.0 Jul 7, 2023

#433 in Concurrency

GPL-3.0-or-later

105KB
2K SLoC

ParGraph - Parallel Graph Processing library

Pargraph is a yet simple and humble library for parallel processing of petgraph data structures.

Testing

Run basic tests with cargo test.

Properly testing concurrent data structures is difficult. This crate uses the loom crate for testing internal synchronization primitives. This tests must be run separately with:

RUSTFLAGS="--cfg loom" cargo test

lib.rs:

ParaGraph: Parallel graph processing.

This crate implements datastructures and algorithms for concurrent traversal of graphs. Graphs can be processed using 'operators'. An operator can see only a small part of a graph, namely the 'active' node and its direct neighbors. Labelling operators can edit the associated data of the active node and they can generate a set of new nodes which should be processed later. The order how the nodes are processed is largely defined by the 'worklists'.

Operators

There are the following types of operators:

  • ReadonlyOperator - Can only access graph elements by immutable reference. Needs to use interior mutability if modification is necessary.
  • LabellingOperator - Can modify the local node data. The executor must provide a mutable reference to local node data.

Executors

There are the following executors:

Example: compute cone of influence using atomics

The following example visits the output cone of the src node. The output cone consists of all nodes which can be reached by starting at src and then following outgoing edges. Additionally, for each node in the cone, the operator keeps track of input nodes which are in the cone.

Similar algorithms can for example be used to mark the regions of interest for incremental updates for shortest path searches.

This algorithm is implemented as a ReadonlyOperator which operates on immutable references of the node data. Safe mutability of the node data is still achieved using atomics. This avoids wrapping the node data into a DataCell for locking.

use pargraph::prelude::*;
use petgraph::data::DataMap;
use petgraph::graph::DiGraph;
use petgraph::visit::*;
use std::sync::atomic::AtomicU32;

struct NodeData {
    /// Count the number of input edges to the node
    /// which are part of the cone.
    num_dependencies: AtomicU32,
}

impl NodeData {
    fn new() -> Self {
        Self {
            num_dependencies: AtomicU32::new(0),
        }
    }
}

// Create a graph like:
//     x---
//     |   \
//    src   y
//     |
//     a
//   /  \
//  b   c
//  \  /
//   d
let mut g = DiGraph::new();

// Helper function for creating new nodes with default node data.
// Initialize the distance to the maximum value.
let mut new_node = || g.add_node(NodeData::new());

// Create some new nodes.
let [x, y, src, a, b, c, d] = [(); 7].map(|_| new_node());

// Add some edges (without any weights).
g.add_edge(x, src, ());
g.add_edge(x, y, ());
g.add_edge(src, a, ());
g.add_edge(a, b, ());
g.add_edge(a, c, ());
g.add_edge(c, d, ());
g.add_edge(b, d, ());

let operator = ConeOfInfluenceOp {};

let executor = MultiThreadExecutor::new();

// Create a worklist and add the source node.
let wl = FifoWorklist::new_with_local_queues(vec![src].into());
executor.run_readonly(wl, &operator, &g);

let get_num_dependencies = |n: petgraph::graph::NodeIndex| -> u32 {
    g.node_weight(n)
        .unwrap()
        .num_dependencies
        .load(std::sync::atomic::Ordering::Relaxed)
};

// Check the distances.
assert_eq!(get_num_dependencies(x), 0, "x is not in the cone of influence of src");
assert_eq!(get_num_dependencies(y), 0, "y is not in the cone of influence of src");
assert_eq!(get_num_dependencies(src), 0);
assert_eq!(get_num_dependencies(a), 1);
assert_eq!(get_num_dependencies(b), 1);
assert_eq!(get_num_dependencies(c), 1);
assert_eq!(get_num_dependencies(d), 2);

// This is our operator.
struct ConeOfInfluenceOp {}

// We can implement this operator as a `ReadonlyOperator` because it does not require
// a mutable reference to the node data. Safe mutability is achieved using atomics.
// Note that we implement the operator for the reference type. Operators are required to implement `Clone`.
// A reference implements `Clone` automatically. Alternatively we could also derive `Clone` for `ConeOfInfluenceOp`
// and pass ownership of the operator to the executor. The executor might create clones of the operators for the worker
// threads.
impl<G> ReadonlyOperator<G> for &ConeOfInfluenceOp
where
    G: GraphBase + IntoEdgesDirected,
    G: DataMap<NodeWeight = NodeData>,
{
    type WorkItem = G::NodeId;

    fn op(
        &self,
        active_node: Self::WorkItem,
        local_view: LocalGraphView<&G>,
        mut worklist: impl WorklistPush<Self::WorkItem>,
    ) {
        let output_nodes =
            local_view.neighbors_directed(active_node, petgraph::Direction::Outgoing);

        for n in output_nodes {
            // Access the node weight.
            let n_data = local_view
                .node_weight(n)
                .expect("all nodes should have a weight");

            // Atomically increment the number of dependencies of the node `n`.
            // `fetch_add` returns the previous value. If the previous value is `0` then
            // we know that this is the first time we look at node `n` (unless there is a cycle leading to the source node).
            let previous_num_dependencies = n_data
                .num_dependencies
                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);

            if previous_num_dependencies == 0 {
                // This is the first time n is touched.
                worklist.push(n);
            }
        }
    }
}

Dependencies

~2–29MB
~394K SLoC