Data Pipelines
Filter, transform, and aggregate collections with map/reduce style operations. All results are typed via OpenAPI schemas for complete type safety.
Pipeline Philosophy
Chain operations naturally: fetch → filter → map → reduce. No SQL complexity, no JOINs, no subqueries. Just elegant data transformation.
Fetch: Retrieve and Filter
Retrieve data with optional filtering, ordering, and limiting. Results are always typed:
(* Fetch with filtering and ordering *)
<Fetch> the <active-users: List<User>> from the <users>
where <status> is "active"
order by <name> asc
limit 100.
Filter: Subset Existing Collections
Filter an existing collection with a predicate:
(* Filter for premium users *)
<Filter> the <premium-users: List<User>> from the <users>
where <tier> is "premium".
Map: Transform to Different Types
Transform a collection to a different OpenAPI-defined type. The runtime automatically maps matching field names:
(* Map List<User> to List<UserSummary> *)
<Map> the <summaries: List<UserSummary>> from the <users>.
Reduce: Aggregate to Single Value
Aggregate a collection using sum, count, avg, min, max, and more:
(* Sum all order amounts *)
<Reduce> the <total: Float> from the <orders>
with sum(<amount>).
(* Count orders *)
<Reduce> the <order-count: Integer> from the <orders>
with count().
(* Average price of electronics *)
<Reduce> the <avg-price: Float> from the <products>
where <category> is "electronics"
with avg(<price>).
Comparison Operators
Use these operators in where clauses:
| Operator | Description | Example |
|---|---|---|
is, == |
Equality | <status> is "active" |
is not, != |
Inequality | <role> is not "guest" |
<, <=, >, >= |
Comparison | <age> >= 18 |
in |
Set membership | <status> in ["a", "b"] |
between |
Range | <price> between 10 and 100 |
contains |
Substring | <name> contains "test" |
starts with |
Prefix | <email> starts with "admin" |
Aggregation Functions
| Function | Description | Example |
|---|---|---|
count() |
Number of items | with count() |
sum(field) |
Sum of numeric field | with sum(<amount>) |
avg(field) |
Average value | with avg(<price>) |
min(field) |
Minimum value | with min(<date>) |
max(field) |
Maximum value | with max(<score>) |
first() |
First element | with first() |
last() |
Last element | with last() |
Complete Example
Here's a sales analytics feature set using all pipeline operations:
(Sales Report: Analytics) {
(* Fetch recent orders *)
<Fetch> the <recent-orders: List<Order>> from the <orders>
where <created-at> > now().minus(30.days)
order by <created-at> desc.
(* Get total revenue *)
<Reduce> the <total-revenue: Float> from the <recent-orders>
with sum(<amount>).
(* Count pending orders *)
<Reduce> the <pending-count: Integer> from the <recent-orders>
where <status> is "pending"
with count().
(* Map to summaries *)
<Map> the <summaries: List<OrderSummary>> from the <recent-orders>.
(* Filter high-value orders *)
<Filter> the <high-value: List<Order>> from the <recent-orders>
where <amount> > 1000.
<Return> an <OK: status> with {
orders: <summaries>,
total: <total-revenue>,
pending: <pending-count>,
high-value-count: <high-value>.count()
}.
}
Why This Approach?
Type Safety
All results are typed via OpenAPI schemas. <users: List<User>>
is guaranteed to contain User objects as defined in your schema.
Readability
Operations read naturally. "Fetch the active users from users where status is active" is close to how you'd describe the operation in English.
Simplicity
No complex SQL. No JOINs, subqueries, or CTEs. Just straightforward pipeline operations that are easy to understand and maintain.
Learn More
See the full data pipelines specification in ARO-0018: Data Pipelines.