← Back to Documentation

Data Pipelines

Filter, transform, and aggregate collections with map/reduce style operations. All results are typed via OpenAPI schemas for complete type safety.

Pipeline Philosophy

Chain operations naturally: fetch → filter → map → reduce. No SQL complexity, no JOINs, no subqueries. Just elegant data transformation.

Fetch Filter Map Reduce

Fetch: Retrieve and Filter

Retrieve data with optional filtering, ordering, and limiting. Results are always typed:

(* Fetch with filtering and ordering *)
<Fetch> the <active-users: List<User>> from the <users>
    where <status> is "active"
    order by <name> asc
    limit 100.

Filter: Subset Existing Collections

Filter an existing collection with a predicate:

(* Filter for premium users *)
<Filter> the <premium-users: List<User>> from the <users>
    where <tier> is "premium".

Map: Transform to Different Types

Transform a collection to a different OpenAPI-defined type. The runtime automatically maps matching field names:

(* Map List<User> to List<UserSummary> *)
<Map> the <summaries: List<UserSummary>> from the <users>.

Reduce: Aggregate to Single Value

Aggregate a collection using sum, count, avg, min, max, and more:

(* Sum all order amounts *)
<Reduce> the <total: Float> from the <orders>
    with sum(<amount>).

(* Count orders *)
<Reduce> the <order-count: Integer> from the <orders>
    with count().

(* Average price of electronics *)
<Reduce> the <avg-price: Float> from the <products>
    where <category> is "electronics"
    with avg(<price>).

Comparison Operators

Use these operators in where clauses:

Operator Description Example
is, == Equality <status> is "active"
is not, != Inequality <role> is not "guest"
<, <=, >, >= Comparison <age> >= 18
in Set membership <status> in ["a", "b"]
between Range <price> between 10 and 100
contains Substring <name> contains "test"
starts with Prefix <email> starts with "admin"

Aggregation Functions

Function Description Example
count() Number of items with count()
sum(field) Sum of numeric field with sum(<amount>)
avg(field) Average value with avg(<price>)
min(field) Minimum value with min(<date>)
max(field) Maximum value with max(<score>)
first() First element with first()
last() Last element with last()

Complete Example

Here's a sales analytics feature set using all pipeline operations:

(Sales Report: Analytics) {
    (* Fetch recent orders *)
    <Fetch> the <recent-orders: List<Order>> from the <orders>
        where <created-at> > now().minus(30.days)
        order by <created-at> desc.

    (* Get total revenue *)
    <Reduce> the <total-revenue: Float> from the <recent-orders>
        with sum(<amount>).

    (* Count pending orders *)
    <Reduce> the <pending-count: Integer> from the <recent-orders>
        where <status> is "pending"
        with count().

    (* Map to summaries *)
    <Map> the <summaries: List<OrderSummary>> from the <recent-orders>.

    (* Filter high-value orders *)
    <Filter> the <high-value: List<Order>> from the <recent-orders>
        where <amount> > 1000.

    <Return> an <OK: status> with {
        orders: <summaries>,
        total: <total-revenue>,
        pending: <pending-count>,
        high-value-count: <high-value>.count()
    }.
}

Why This Approach?

Type Safety

All results are typed via OpenAPI schemas. <users: List<User>> is guaranteed to contain User objects as defined in your schema.

Readability

Operations read naturally. "Fetch the active users from users where status is active" is close to how you'd describe the operation in English.

Simplicity

No complex SQL. No JOINs, subqueries, or CTEs. Just straightforward pipeline operations that are easy to understand and maintain.

Learn More

See the full data pipelines specification in ARO-0018: Data Pipelines.