Composing Operations
Operations can be composed as values, so that multiple database actions run in a single transaction without manual connection handling.
Combining Independent Operations
.combineWith() combines two operations that don't depend on each other. Both run in the same transaction, and a function combines their results:
- Kotlin
- Java
- Scala
data class User(val id: Int, val name: String)
data class Order(val id: Int, val userId: Int, val product: String)
data class Dashboard(val userCount: Long, val recentOrders: List<Order>)
data class Stats(val userCount: Long, val orderCount: Long, val revenue: Long)
val orderCodec: RowCodec<Order> =
RowCodec.builder<Order>()
.field(PgTypes.int4, Order::id)
.field(PgTypes.int4, Order::userId)
.field(PgTypes.text, Order::product)
.build(::Order)
lateinit var tx: Transactor
// Combine two independent queries — both run in one transaction
val countUsers: OperationRead<Long> =
sql { "SELECT count(*) FROM users" }
.query(RowCodec.of(PgTypes.int8).exactlyOne())
val recentOrders: OperationRead<List<Order>> =
sql { "SELECT * FROM orders ORDER BY id DESC LIMIT 10" }
.query(orderCodec.all())
fun dashboard(): Dashboard =
countUsers
.combineWith(recentOrders, ::Dashboard)
.transact(tx)
// Three-way: all run in one transaction, results combined
val countOrders: OperationRead<Long> =
sql { "SELECT count(*) FROM orders" }
.query(RowCodec.of(PgTypes.int8).exactlyOne())
val totalRevenue: OperationRead<Long> =
sql { "SELECT coalesce(sum(amount), 0) FROM orders" }
.query(RowCodec.of(PgTypes.int8).exactlyOne())
fun stats(): Stats =
countUsers
.combineWith(countOrders, totalRevenue, ::Stats)
.transact(tx)
record User(int id, String name) {}
record Order(int id, int userId, String product) {}
record Dashboard(long userCount, List<Order> recentOrders) {}
record Stats(long userCount, long orderCount, long revenue) {}
static RowCodec<Order> orderCodec =
RowCodec.<Order>builder()
.field(PgTypes.int4, Order::id)
.field(PgTypes.int4, Order::userId)
.field(PgTypes.text, Order::product)
.build(Order::new);
Transactor tx = null; // placeholder
// Combine two independent queries — both run in one transaction
OperationRead<Long> countUsers =
Fragment.of("SELECT count(*) FROM users").query(RowCodec.of(PgTypes.int8).exactlyOne());
OperationRead<List<Order>> recentOrders =
Fragment.of(
"""
SELECT * FROM orders
ORDER BY id DESC LIMIT 10\
""")
.query(orderCodec.all());
Dashboard dashboard() {
return countUsers.combineWith(recentOrders, Dashboard::new).transactRead(tx);
}
// Three-way: all run in one transaction, results combined
OperationRead<Long> countOrders =
Fragment.of("SELECT count(*) FROM orders").query(RowCodec.of(PgTypes.int8).exactlyOne());
OperationRead<Long> totalRevenue =
Fragment.of(
"""
SELECT coalesce(sum(amount), 0)
FROM orders\
""")
.query(RowCodec.of(PgTypes.int8).exactlyOne());
Stats stats() {
return countUsers.combineWith(countOrders, totalRevenue, Stats::new).transactRead(tx);
}
case class User(id: Int, name: String)
case class Order(id: Int, userId: Int, product: String)
case class Dashboard(userCount: Long, recentOrders: List[Order])
case class Stats(userCount: Long, orderCount: Long, revenue: Long)
val orderCodec: RowCodec[Order] = RowCodec
.builder[Order]()
.field(PgTypes.int4)(_.id)
.field(PgTypes.int4)(_.userId)
.field(PgTypes.text)(_.product)
.build(Order.apply)
var tx: Transactor = null // placeholder
// Combine two independent queries in one transaction
val countUsers: OperationRead[Long] =
sql"SELECT count(*) FROM users"
.query(RowCodec.of(PgTypes.int8).exactlyOne())
val recentOrders: OperationRead[List[Order]] =
sql"SELECT * FROM orders ORDER BY id DESC LIMIT 10"
.query(orderCodec.all())
def dashboard(): Dashboard =
countUsers
.combineWith(recentOrders)(Dashboard.apply)
.transact(tx)
// Three-way: all run in one transaction
val countOrders: OperationRead[Long] =
sql"SELECT count(*) FROM orders"
.query(RowCodec.of(PgTypes.int8).exactlyOne())
val totalRevenue: OperationRead[Long] =
sql"SELECT coalesce(sum(amount), 0) FROM orders"
.query(RowCodec.of(PgTypes.int8).exactlyOne())
def stats(): Stats =
countUsers
.combineWith(countOrders, totalRevenue)(Stats.apply)
.transact(tx)
Running Multiple Writes
When you have several write operations and only care about completion (not individual results), use Operation.allOf():
- Kotlin
- Java
- Scala
// Run multiple writes in one transaction, discard individual results
val insertUser: Operation<Int> =
sql { "INSERT INTO users(name) VALUES(${PgTypes.text("Alice")})" }.update()
val insertAudit: Operation<Int> =
sql { "INSERT INTO audit_log(action) VALUES(${PgTypes.text("user_created")})" }.update()
val updateStats: Operation<Int> =
sql { "UPDATE stats SET user_count = user_count + 1" }.update()
fun createUserWithAudit() {
Operation.allOf(insertUser, insertAudit, updateStats).transact(tx)
}
// Run multiple writes in one transaction, discard individual results
Operation<Integer> insertUser =
Fragment.of("INSERT INTO users(name) VALUES(")
.value(PgTypes.text, "Alice")
.append(")")
.update();
Operation<Integer> insertAudit =
Fragment.of(
"""
INSERT INTO audit_log(action)
VALUES(\
""")
.value(PgTypes.text, "user_created")
.append(")")
.update();
Operation<Integer> updateStats =
Fragment.of(
"""
UPDATE stats
SET user_count = user_count + 1\
""")
.update();
void createUserWithAudit() {
tx.transactVoid(
conn -> {
conn.execute(insertUser);
conn.execute(insertAudit);
conn.execute(updateStats);
});
}
// Run multiple writes in one transaction
val insertUser: Operation[Int] =
sql"INSERT INTO users(name) VALUES(${PgTypes.text("Alice")})"
.update()
val insertAudit: Operation[Int] =
sql"INSERT INTO audit_log(action) VALUES(${PgTypes.text("user_created")})"
.update()
val updateStats: Operation[Int] =
sql"UPDATE stats SET user_count = user_count + 1"
.update()
def createUserWithAudit(): Unit =
insertUser
.combine(insertAudit)
.combine(updateStats)
.voided()
.transact(tx)
Sequencing a List
When you have a dynamic list of operations, OperationRead.sequence() runs them all and collects the results:
- Kotlin
- Java
- Scala
// Execute a list of operations and collect all results
val names = listOf("Alice", "Bob", "Charlie")
fun insertAll(): List<Int> {
val inserts: List<OperationRead<Int>> = names.map { name ->
sql { "INSERT INTO users(name) VALUES(${PgTypes.text(name)}) RETURNING id" }
.query(RowCodec.of(PgTypes.int4).exactlyOne())
}
return OperationRead.sequence(inserts).transact(tx)
}
// Execute a list of operations and collect all results
List<String> names = List.of("Alice", "Bob", "Charlie");
List<Integer> insertAll() {
List<OperationRead<Integer>> inserts =
names.stream()
.<OperationRead<Integer>>map(
name ->
Fragment.of(
"""
INSERT INTO users(name)
VALUES(\
""")
.value(PgTypes.text, name)
.append(") RETURNING id")
.query(RowCodec.of(PgTypes.int4).exactlyOne()))
.toList();
return OperationRead.sequence(inserts).transactRead(tx);
}
val names: List[String] =
List("Alice", "Bob", "Charlie")
def insertAll(): List[Int] =
val inserts: List[OperationRead[Int]] =
names.map { name =>
sql"""INSERT INTO users(name)
VALUES(${PgTypes.text(name)})
RETURNING id"""
.query(RowCodec.of(PgTypes.int4).exactlyOne())
}
OperationRead.sequence(inserts).transact(tx)
Data Flow Between Operations
Use .then() to feed one operation's result into a continuation function that returns the next operation. The first operation runs, and its result becomes the input to the function:
- Kotlin
- Java
- Scala
// Reusable queries as methods
fun insertUser(name: String): OperationRead<Int> =
Fragment.of("INSERT INTO users(name) VALUES(")
.value(PgTypes.text, name)
.append(") RETURNING id")
.query(RowCodec.of(PgTypes.int4).exactlyOne())
fun ordersByUser(userId: Int): OperationRead<List<Order>> =
Fragment.of("SELECT id, user_id, product FROM orders WHERE user_id = ")
.value(PgTypes.int4, userId)
.query(orderCodec.all())
// Chain: insert user, then use returned id to fetch their orders.
fun insertAndFetchOrders(): List<Order> =
insertUser("Alice").then { id -> ordersByUser(id) }.transact(tx)
// Reusable queries as methods
OperationRead<Integer> insertUser(String name) {
return Fragment.of("INSERT INTO users(name) VALUES(")
.value(PgTypes.text, name)
.append(") RETURNING id")
.query(RowCodec.of(PgTypes.int4).exactlyOne());
}
OperationRead<List<Order>> ordersByUser(int userId) {
return Fragment.of(
"""
SELECT id, user_id, product
FROM orders WHERE user_id =
""")
.value(PgTypes.int4, userId)
.query(orderCodec.all());
}
// Chain: insert user, then use returned id to fetch their orders.
// .then(fn) flat-maps the result of the first operation into the second.
List<Order> insertAndFetchOrders() {
return tx.execute(insertUser("Alice").then(this::ordersByUser));
}
// Reusable queries as methods
def insertUser(name: String): OperationRead[Int] =
Fragment
.of("INSERT INTO users(name) VALUES(")
.value(PgTypes.text, name)
.append(") RETURNING id")
.query(RowCodec.of(PgTypes.int4).exactlyOne())
def ordersByUser(userId: Int): OperationRead[List[Order]] =
Fragment
.of("SELECT id, user_id, product FROM orders WHERE user_id = ")
.value(PgTypes.int4, userId)
.query(orderCodec.all())
// Chain: insert user, then use returned id to fetch their orders.
def insertAndFetchOrders(): List[Order] =
insertUser("Alice").andThen(id => ordersByUser(id)).transact(tx)
When the first operation returns a record, you can destructure inside the continuation:
- Kotlin
- Java
- Scala
// Insert and return the new user (id + name).
fun insertUser(name: String): OperationRead<NewUser> =
Fragment.of("INSERT INTO users(name) VALUES(")
.value(PgTypes.text, name)
.append(") RETURNING id, name")
.query(newUserCodec.exactlyOne())
// Log the creation, taking the new user as input.
fun logCreation(user: NewUser): Operation<Int> =
Fragment.of("INSERT INTO audit_log(user_id, username) VALUES(")
.value(PgTypes.int4, user.id)
.append(", ")
.value(PgTypes.text, user.name)
.append(")")
.update()
// Chain: insertUser → returned NewUser → logCreation.
fun insertAndLog(): Int =
insertUser("Alice").then { user -> logCreation(user) }.transact(tx)
// Insert and return the new user (id + name).
OperationRead<NewUser> insertUser(String name) {
return Fragment.of("INSERT INTO users(name) VALUES(")
.value(PgTypes.text, name)
.append(") RETURNING id, name")
.query(newUserCodec.exactlyOne());
}
// Log the creation, taking the new user record as input.
Operation<Integer> logCreation(NewUser user) {
return Fragment.of("INSERT INTO audit_log(user_id, username) VALUES(")
.value(PgTypes.int4, user.id())
.append(", ")
.value(PgTypes.text, user.name())
.append(")")
.update();
}
// Chain: insertUser → returned NewUser → logCreation.
// .then(fn) feeds the first op's result into the second op as a method call.
int insertAndLog() {
return tx.execute(insertUser("Alice").then(this::logCreation));
}
// Insert and return the new user (id + name).
def insertUser(name: String): OperationRead[NewUser] =
Fragment
.of("INSERT INTO users(name) VALUES(")
.value(PgTypes.text, name)
.append(") RETURNING id, name")
.query(newUserCodec.exactlyOne())
// Log the creation, taking the new user as input.
def logCreation(user: NewUser): Operation[Int] =
Fragment
.of("INSERT INTO audit_log(user_id, username) VALUES(")
.value(PgTypes.int4, user.id)
.append(", ")
.value(PgTypes.text, user.name)
.append(")")
.update()
// Chain: insertUser → returned NewUser → logCreation.
def insertAndLog(): Int =
insertUser("Alice").andThen(user => logCreation(user)).transact(tx)
Conditional Execution
OperationRead.ifEmpty() implements the find-or-create pattern: run the first operation, and if it returns empty (empty Optional, null, or None), run the fallback instead:
- Kotlin
- Java
- Scala
// Find-or-create pattern
fun findUser(email: String): OperationRead<User?> =
Fragment.of("SELECT id, name, email FROM users WHERE email = ")
.value(PgTypes.text, email)
.query(userCodec.maxOne())
fun createUser(name: String, email: String): OperationRead<User> =
Fragment.of("INSERT INTO users(name, email) VALUES(")
.value(PgTypes.text, name)
.append(", ")
.value(PgTypes.text, email)
.append(") RETURNING *")
.query(userCodec.exactlyOne())
fun findOrCreate(): User =
Operation.ifEmpty(findUser(email), createUser(name, email)).transact(tx)
// Find-or-create pattern
OperationRead<Optional<User>> findUser(String email) {
return Fragment.of(
"""
SELECT id, name, email
FROM users WHERE email =
""")
.value(PgTypes.text, email)
.query(userCodec.maxOne());
}
OperationRead<User> createUser(String name, String email) {
return Fragment.of(
"""
INSERT INTO users(name, email)
VALUES(\
""")
.value(PgTypes.text, name)
.append(", ")
.value(PgTypes.text, email)
.append(") RETURNING *")
.query(userCodec.exactlyOne());
}
User findOrCreate() {
return OperationRead.ifEmpty(findUser(email), createUser(name, email)).transact(tx);
}
// Find-or-create pattern
def findUser(email: String): OperationRead[Option[User]] =
Fragment
.of("SELECT id, name, email FROM users WHERE email = ")
.value(PgTypes.text, email)
.query(userCodec.maxOne())
def createUser(name: String, email: String): OperationRead[User] =
Fragment
.of("INSERT INTO users(name, email) VALUES(")
.value(PgTypes.text, name)
.append(", ")
.value(PgTypes.text, email)
.append(") RETURNING *")
.query(userCodec.exactlyOne())
def findOrCreate(): User =
OperationRead
.ifEmpty(findUser(email), createUser(name, email))
.transact(tx)
Read-Only Composition
When you compose OperationRead values, the result is always OperationRead. Mix in a single write Operation, and the result becomes Operation. The type system tracks this automatically:
- Kotlin
- Java
- Scala
// Combining read-only operations yields OperationRead
val bothReads: OperationRead<Pair<List<Int>, Long>> =
findIds.combine(countUsers)
// Mixing in a write operation yields Operation (not OperationRead)
val writeAndRead: Operation<Pair<Int, List<Int>>> =
insertUser.combine(findIds)
// transactRead works for read-only compositions
val readResult: Pair<List<Int>, Long> = bothReads.transactRead(tx)
// transact required when writes are involved
val writeResult: Pair<Int, List<Int>> = writeAndRead.transact(tx)
// Combining read-only operations yields OperationRead
OperationRead<Tuple.Tuple2<List<Integer>, Long>> bothReads =
findIds.combine(countUsers);
// Mixing in a write operation yields Operation (not OperationRead)
Operation<Tuple.Tuple2<Integer, List<Integer>>> writeAndRead =
insertUser.combine(findIds);
// transactRead works for read-only compositions
Tuple.Tuple2<List<Integer>, Long> readResult = bothReads.transactRead(tx);
// transact required when writes are involved
Tuple.Tuple2<Integer, List<Integer>> writeResult = writeAndRead.transact(tx);
// Combining read-only operations yields OperationRead
val bothReads: OperationRead[(List[Int], Long)] =
findIds.combine(countUsers)
// Mixing in a write operation yields Operation (not OperationRead)
val writeAndRead: Operation[(Int, List[Int])] =
insertUser.combine(findIds)
// transactRead works for read-only compositions
val readResult: (List[Int], Long) = bothReads.transactRead(tx)
// transact required when writes are involved
val writeResult: (Int, List[Int]) = writeAndRead.transact(tx)
This means transactRead works for any tree of pure reads — the compiler enforces it. See Read-Only Transactions for more.
Performance: Why Composition Matters
How you compose operations determines whether the execution engine can optimize them.
combine() is parallel, then() is sequential
When you write a.combine(b), you're telling the execution engine that a and b are independent — neither needs the other's result. When you write a.then(continuation), you're saying the continuation depends on a's result.
The distinction affects execution:
| Combinator | Dependency | Execution |
|---|---|---|
combine() | Independent | Parallelizable — backend decides strategy |
combineWith() | Independent | Same as combine |
sequence() | Independent | Decomposes into combine tree |
allOf() | Independent | Decomposes into combine tree |
then() | Dependent | Sequential (must be) |
ifEmpty() | Conditional | Sequential (check, then maybe fallback) |
map() | Transform | No I/O (pure function) |
The OperationRunner delegates Combine nodes to a pluggable CombineStrategy. JDBC uses SEQUENTIAL (one query at a time on the same connection). Other backends can implement PARALLEL to execute both halves concurrently.
Since combine() nests, a.combine(b).combine(c).combine(d) creates a tree of Combine nodes, and the parallelization is recursive. All leaf operations are submitted concurrently when the strategy supports it.
When to use which
Use combine() / combineWith() when queries are independent:
// Dashboard: load user + orders + preferences in ~1 RTT
var page = tx.execute(
findUser.on(userId)
.combineWith(getOrders.on(userId), getPrefs.on(userId),
(user, orders, prefs) -> new DashboardPage(user, orders, prefs))
);
Use sequence() for dynamic lists of independent operations:
// Fan-out: load N items in ~1 RTT
var items = tx.execute(
OperationRead.sequence(ids.stream()
.map(id -> findItem.on(id))
.toList())
);
Use then() when the next query depends on the previous result:
// Chain: insert, then read back with generated ID
var created = tx.execute(
insertUser.on(newUser).then(findUserById)
);
Use ifEmpty() for find-or-create patterns:
// Conditional: find existing, or create if missing
var user = tx.execute(
OperationRead.ifEmpty(findUser.on(email), createUser.on(newUser))
);
This design is an instance of the applicative functor pattern from functional programming. combine() is the applicative product -- it declares that two computations are independent, so the runtime can execute them in parallel. then() is the monadic bind -- it declares a dependency, forcing sequential execution.
Many database libraries only offer monadic composition (each query depends on the previous connection state). By also offering applicative composition, foundations-jdbc lets the pipeline optimizer batch independent queries into a single network round-trip.
Analyzing Composed Operations
QueryAnalyzer can walk the entire operation tree and analyze every SQL statement in one call. See Query Analysis for details.
- Kotlin
- Java
- Scala
fun analyzeComposedOperation() {
val transaction: Operation<*> =
insertUser("Alice").productL(allUsers)
// Analyze every SQL statement in the tree — one call
val results: List<QueryAnalysis> =
QueryAnalyzer.analyze(transaction, conn)
for (analysis in results) {
if (!analysis.succeeded()) {
System.err.println(analysis.report())
}
}
}
void analyzeComposedOperation() {
Operation<?> transaction = insertUser("Alice").productL(allUsers);
// Analyze every SQL statement in the tree
List<QueryAnalysis> results = QueryAnalyzer.analyze(transaction, conn);
for (var analysis : results) {
if (!analysis.succeeded()) {
System.err.println(analysis.report());
}
}
}
def analyzeComposedOperation(conn: Connection): Unit =
val transaction: Operation[?] =
insertUser("Alice").productL(allUsers)
// Analyze every statement in the tree
val results: List[QueryAnalysis] =
QueryAnalyzer.analyze(transaction, conn)
for analysis <- results do if !analysis.succeeded() then System.err.println(analysis.report())