9 releases
| 0.2.0-rc.9 | Oct 12, 2025 |
|---|---|
| 0.2.0-rc.7 | Oct 6, 2025 |
| 0.2.0-rc.4 | Sep 27, 2025 |
| 0.1.6 |
|
| 0.1.0 |
|
#543 in Data structures
155KB
3K
SLoC
mumu-flow — Stream transforms for the MuMu/Lava runtime
Version: 0.2.0-rc.9
Repository: https://gitlab.com/tofo/flow-mumu
License: Dual MIT / Apache-2.0
mumu-flow is a dynamic plugin for the MuMu/Lava interpreter that provides a rich set of streaming combinators and pipeline utilities. It is designed to compose cleanly with MuMu’s iterators and the interpreter’s zero-arg “transform” functions, enabling non-blocking, partial-application-friendly dataflows.
This crate powers flow pipelines such as throttled I/O drains, functional mapping/filtering, compositional chaining, early termination, applicative application, and more — all written in ergonomic MuMu code.
Highlights at a glance
-
Non-blocking throttling
flow:throttle(ms)and dynamicflow:throttle_with(fn)return AGAIN (temporary no-data) instead of sleeping, so other pipelines continue to run. -
Async draining (micro-bursts)
flow:drain_asyncandflow:drain_then(callback)drain in background poller ticks with item burst and time budget caps; AGAIN is respected as keep-alive. -
Functional stages
Mapping (flow:trans/flow:map), filtering (flow:filter), slicing/windowing (flow:slice), collection (flow:to_array), reduction (flow:reduce). -
Flow control
flow:stop_when(pred)terminates the stream without emitting the triggering item. -
Combinators & utilities
flow:chain(flatMap),flow:concat,flow:of(one-shot),flow:empty(immediate EOF),flow:ap/flow:ap_zip(applicatives),flow:read(chunked file reads). -
First-class partials
All stages support placeholders (_) and curried usage. Partials are re-entrant and combine cleanly viaflow:compose. -
Interoperability
Accept Iterator handles or zero-arg transform functions as sources; return transforms suitable for further composition.
Mental model
Most primitives follow one of these shapes:
-
Stage constructor →
(…params…) => (source) => transform
e.g.flow:filter(pred)returns a function that accepts an iterator/transform and yields a zero-arg transform that emits a filtered stream. -
Eager stage (when fully applied) →
Value
flow:trans(func, source)runs eagerly and returns a concrete array (typed where possible). With a single argument (flow:trans(func)) it returns a stage constructor. -
Non-blocking gate → returns "AGAIN"
Throttling stages never sleep; if the period hasn't elapsed they yield a temporary no-data error ("AGAIN"). Async drains and the host event loop understand this and keep the interpreter alive until the next window. -
EOF → returns "NO_MORE_DATA"
Signifies the upstream has been fully consumed for this run.
Placeholders
_
Many bridges treatValue::Placeholder,"_", or["_"]identically. Mixing positional values and_lets you assemble pipelines incrementally.
Function catalog
Below is a practical overview of the exported functions (as named inside MuMu). Each function also exists as a variable of the same name to allow direct invocation.
Composition & control
| Name | Signature (informal) | Notes |
|---|---|---|
flow:compose |
(...stagesOrPartials) → Function |
Chains stages right-to-left; supports _ as slot. Final arg can be “data” to apply immediately. |
flow:concat |
(a, b) → transform |
Emit a to completion then b. a/b are Iterator or transform. |
flow:chain |
(mapper, source) → transform |
Flat-map: mapper(item) may return value, iterator, or transform — result is flattened. |
flow:stop_when |
(pred[, source]) → transform |
Stops the pipeline when pred(item) is truthy; does not emit the triggering item. |
flow:empty |
() → transform |
Emits no data (EOF). |
flow:of |
(value?) → transform |
Emits value exactly once, then EOF. |
flow:ap_zip |
(fs, xs) → transform |
Zipped applicative: pull one function from fs and one value from xs, apply. |
flow:ap |
(fs, xs) → transform |
Cartesian applicative: buffer all xs, then apply each f ∈ fs to each x ∈ xs. |
Functional transforms
| Name | Signature (informal) | Notes |
|---|---|---|
flow:trans / flow:map |
(fn[, source]) → transform or array |
With two args it is eager and returns an array; with one arg it returns a stage. |
flow:filter |
(pred[, source]) → transform |
Predicate may return bool or number truthiness (!= 0). |
flow:slice |
(skip, count[, source]) → transform |
Skips skip items, then emits up to count. Accepts iterator/transform sources. |
flow:reduce |
(fn, init, source) → Value |
Eager reduction: acc = fn(acc, item). Source may be iterator or transform. |
flow:to_array |
(source) → Array |
Collects a homogeneous typed array (int/float/str) or errors on mixed types. |
Throttling & draining
| Name | Signature (informal) | Notes |
|---|---|---|
flow:throttle |
(ms[, source]) → transform |
Non-blocking gate per instance; first item passes immediately. |
flow:throttle_with |
(period_fn[, source]) → transform |
Period is computed after each successful emit by calling period_fn(). |
flow:drain |
(source) → int |
Blocking drain (in MuMu terms): pulls until EOF; returns count. |
flow:drain_async |
(source[, callback]) → 0 |
Spawns a background poller; micro-burst drain per tick. Respects AGAIN and avoids busy-wait. |
flow:drain_then |
(source, callback) → 0 |
Like drain_async but callback is required and is invoked with the total count exactly once on EOF. |
Tuning
drain_async/drain_then
Set environment variables to adjust poller behaviour:
LAVA_FLOW_BURST(default64) — max items per tick
LAVA_FLOW_BUDGET_US(default500) — time budget per tick (µs)
File I/O
| Name | Signature (informal) | Notes |
|---|---|---|
flow:read |
(chunkSizeBytes, path) → transform |
Reads fixed-size byte chunks from a file; each chunk is decoded via UTF-8 lossy and emitted as a string. EOF ⇒ "NO_MORE_DATA". |
Behavioural details & contracts
Source & sink interoperability
Most stages accept either:
- an Iterator handle (e.g., from
step(start, end)), or - a zero-arg transform
() => value | "NO_MORE_DATA".
Internally, the plugin normalizes everything to a transform via to_transform(…).
Non-blocking gates
- Throttle stages (
flow:throttle,flow:throttle_with) never sleep. If the period hasn’t elapsed they immediately return the string error"AGAIN". - Async drains treat an only-AGAIN tick as a keep-alive and return
1so the scheduler doesn’t idle out just before the next window.
EOF & errors
- EOF is signalled via the string error
"NO_MORE_DATA". - Any other error is considered terminal by drainers and most stages.
Partials & placeholders
- All stage constructors support
_as a placeholder so you can incrementally build up a chain:p = flow:filter(_, _) # both slots open p = p(x => x % 2 == 0) # fill predicate tf = p(ink(1,10)) # fill source - Partials are re-entrant: each call returns a fresh partial with the updated defaults.
Eager vs. lazy mapping
- With two arguments,
flow:trans(fn, source)runs now and returns a typed array. - With one argument,
flow:trans(fn)returns a stage to be used insideflow:composeor called later.
flow:throttle_with(period_fn, source)
period_fnis invoked only after a successful emission; the next allowed instant becomesnow + ms.- The first item is always allowed immediately.
period_fnmust return a non-negative number (int/long/float).
flow:stop_when(pred)
- When
pred(item)is truthy the stage terminates and does not emit the triggering item (NO_MORE_DATA).
Examples (from examples/)
The repo contains many runnable
.muscripts demonstrating the stages end-to-end. Here are a few tiny, illustrative snippets. These assume the MuMu/Lava runtime has the appropriate plugins loaded (e.g.extend("flow"),extend("net")).
Transform + Filter + Throttle + Drain
extend("flow")
run = flow:compose(
flow:drain_async, # drain in the background
flow:chain(slog), # print items
flow:throttle(10), # non-blocking 10ms gate
flow:filter(x => x % 2) # keep odd numbers
)
run(step(1, 100)) # -> 1,3,5,...
Dynamic period throttling
extend("flow")
^delay = 1
pipe = flow:compose(
flow:drain_async,
flow:chain(_ => delay += 50),
flow:throttle_with(() => delay), # period grows after each emit
flow:chain(slog)
)
pipe(step(1, 20))
Stop on condition
extend("flow")
^count = 0
flow:compose(
flow:drain_async,
flow:chain(_ => slog(++count)),
flow:stop_when(_ => count > 10)
)(step(1, 99999))
Chunked file reading
extend("flow")
pipeline = flow:compose(
flow:drain_async,
flow:chain(sput),
flow:throttle(10),
flow:read(64) # 64-byte chunks, UTF-8 lossy
)
pipeline("README.md")
Zipped applicative
extend("flow")
fs = flow:of(x => x * 10) # emits a single function
xs = step(1, 5) # 1..4
flow:to_array(flow:ap_zip(fs, xs)) # => [10, 20, 30, 40]
See
examples/for more: async drains with callbacks, net ping/LLDP demos, partial composition patterns, reduction, etc.
Design notes
-
Cooperative scheduling
Every stage avoids blocking the interpreter loop. Throttles use AGAIN; async drains use tiny micro-bursts governed byLAVA_FLOW_BURSTandLAVA_FLOW_BUDGET_US(defaults:64items,500µs). -
Typed collections
Collectors/transformers strive to produce the tightest possible array type (IntArray,FloatArray,StrArray). Heterogeneous results fall back toMixedArray(or error in strict collectors). -
File I/O
flow:readreads raw bytes via aBufReaderand converts each chunk to a string using UTF-8 lossy decoding for maximal robustness. -
Correctness guards
For example,flow:throttlerejects negative periods;flow:filterenforces that predicates are functions;flow:to_arrayerrors on mixed element types;flow:slicevalidates bothskipandcount.
Project structure
src/
lib.rs # plugin entry point, function registrations
trans.rs # map (eager on 2-arg), and alias registration
filter.rs # predicate-based filter
slice.rs # windowing (skip/take)
to_array.rs # collectors
reduce.rs # eager reduce
throttle.rs # fixed-period non-blocking throttle
throttle_with.rs # dynamic-period non-blocking throttle
chain.rs # flatMap (value/iterator/transform to stream)
concat.rs # sequential concatenation
of.rs # one-shot iterator
empty.rs # empty iterator (EOF)
ap.rs # applicative: zip/cartesian
drain.rs # blocking drain to count
drain_async.rs # async micro-burst draining
drain_then.rs # async draining with required callback on EOF
read.rs # chunked file reader (UTF-8 lossy)
stop_when.rs # early termination stage
tests/ # MuMu specs covering compose, trans, slice, filter, throttle, etc.
examples/ # many runnable MuMu scripts (pipelines, net demos, etc.)
Compatibility & features
- MuMu core: targets
core-mumu = 0.9.0-rc.3(aliased asmumuin code). - Features:
- Default: no features (lightweight; avoids host-only deps).
host: enable for native.so/.dllbuilds; mirrors the MuMu host ABI.web: marker for WASM/web builds (no host-only deps).
- Error constants: uses string errors
"AGAIN"and"NO_MORE_DATA"consistently across stages.
Contributing
Contributions are welcome! Please open issues or MRs on GitLab.
- Repository: https://gitlab.com/tofo/mumu-flow
- Primary author: Tom Fotheringham
- Contributors: the many folks filing issues, writing tests, and improving the MuMu ecosystem — thank you ❤️
Before submitting, please:
- keep functions non-blocking and compatible with the interpreter’s poll loop,
- prefer partials and underscore placeholders where it improves ergonomics,
- include examples and tests for new stages.
License
This project is dual-licensed under MIT and Apache-2.0. You may choose either license. See LICENSE for details.
Dependencies
~3.5–5MB
~84K SLoC