Streaming Reads
For large result sets, materializing all rows into a List can cause out-of-memory errors. Streaming reads return a Cursor — a lazy iterator that fetches rows from the database in batches while the connection stays open.
streamingQuery returns an Operation<Cursor<Row>>. The cursor is live during the map callback, where you process rows incrementally. All the usual combinators (map, combine, transact) work as expected.
Basic Usage
The simplest pattern: stream rows and materialize them with toList().
- Kotlin
- Java
- Scala
// Stream rows lazily and materialize into a list
fun allNames(): List<String> {
val streaming = Fragment.of("SELECT name FROM users ORDER BY id")
.streamingQuery(PgTypes.text, 512)
return streaming
.map { it.toList() }
.transact(tx)
}
// Stream rows lazily and materialize into a list
List<String> allNames() {
Operation<Cursor<String>> streaming =
Fragment.of("SELECT name FROM users ORDER BY id")
.streamingQuery(PgTypes.text, 512);
return streaming
.map(Cursor::toList)
.transact(tx);
}
// Stream rows lazily and materialize into a list
def allNames(): List[String] =
val streaming = sql"SELECT name FROM users ORDER BY id"
.streamingQuery(PgTypes.text, 512)
streaming
.map(_.toList)
.transact(tx)
| Parameter | Description |
|---|---|
codec / type | A RowCodec or DbType describing the row shape |
fetchSize | Number of rows the JDBC driver fetches per network round-trip |
Processing Rows Lazily
The real benefit is processing rows one at a time inside map, without ever holding the full result set in memory:
- Kotlin
- Java
- Scala
// Process rows lazily without loading all into memory
fun countExpensiveProducts(): Long {
val streaming = Fragment.of("SELECT price FROM products")
.streamingQuery(PgTypes.int4, 512)
return streaming.map { cursor ->
cursor.asSequence().count { it > 100 }.toLong()
}.transact(tx)
}
// Process rows lazily without loading all into memory
long countExpensiveProducts() {
Operation<Cursor<Integer>> streaming =
Fragment.of("SELECT price FROM products")
.streamingQuery(PgTypes.int4, 512);
return streaming.map(cursor -> {
long count = 0;
for (var price : cursor) {
if (price > 100) count++;
}
return count;
}).transact(tx);
}
// Process rows lazily without loading all into memory
def countExpensiveProducts(): Long =
val streaming = Fragment.of("SELECT price FROM products")
.streamingQuery(PgTypes.int4, 512)
streaming.map { cursor =>
cursor.count(_ > 100).toLong
}.transact(tx)
Combining Cursors
Multiple streaming operations compose with combine. Both cursors are open simultaneously on the same connection:
- Kotlin
- Java
- Scala
// Open two cursors simultaneously on the same connection
fun mergedNames(): List<String> {
val activeUsers = Fragment.of("SELECT name FROM users WHERE active")
.streamingQuery(PgTypes.text, 512)
val archivedUsers = Fragment.of("SELECT name FROM archived_users")
.streamingQuery(PgTypes.text, 512)
return activeUsers.combine(archivedUsers).map { (active, archived) ->
active.toList() + archived.toList()
}.transact(tx)
}
// Open two cursors simultaneously on the same connection
List<String> mergedNames() {
var activeUsers = Fragment.of("SELECT name FROM users WHERE active")
.streamingQuery(PgTypes.text, 512);
var archivedUsers = Fragment.of("SELECT name FROM archived_users")
.streamingQuery(PgTypes.text, 512);
return activeUsers.combine(archivedUsers).map(cursors -> {
List<String> all = new ArrayList<>();
all.addAll(cursors._1().toList());
all.addAll(cursors._2().toList());
return all;
}).transact(tx);
}
// Open two cursors simultaneously on the same connection
def mergedNames(): List[String] =
val activeUsers = Fragment.of("SELECT name FROM users WHERE active")
.streamingQuery(PgTypes.text, 512)
val archivedUsers = Fragment.of("SELECT name FROM archived_users")
.streamingQuery(PgTypes.text, 512)
activeUsers.combine(archivedUsers).map { (active, archived) =>
active.toList ++ archived.toList
}.transact(tx)
You can also combine streaming operations with regular (non-streaming) operations — just map the cursor first:
streaming.map(Cursor::toList).combine(countOp).transact(tx);
Cursor Lifecycle
The cursor borrows the connection. When transact returns, the connection is closed and the cursor becomes unusable. Always process the cursor inside map:
- Kotlin
- Java
- Scala
// WRONG: the cursor escapes the transaction — connection is already closed!
fun broken(): Cursor<String> {
return Fragment.of("SELECT name FROM users")
.streamingQuery(PgTypes.text, 512)
.transact(tx) // connection closes here, cursor is dead
}
// CORRECT: process the cursor inside map, before the connection closes
fun correct(): Long {
return Fragment.of("SELECT name FROM users")
.streamingQuery(PgTypes.text, 512)
.map { cursor ->
var count = 0L
for (name in cursor) count++
count
}
.transact(tx)
}
// WRONG: the cursor escapes the transaction — connection is already closed!
Cursor<String> broken() {
return Fragment.of("SELECT name FROM users")
.streamingQuery(PgTypes.text, 512)
.transact(tx); // connection closes here, cursor is dead
}
// CORRECT: process the cursor inside map, before the connection closes
long correct() {
return Fragment.of("SELECT name FROM users")
.streamingQuery(PgTypes.text, 512)
.map(cursor -> {
long count = 0;
for (var name : cursor) count++;
return count;
})
.transact(tx);
}
// WRONG: the cursor escapes the transaction — connection is already closed!
def broken(): Cursor[String] =
Fragment.of("SELECT name FROM users")
.streamingQuery(PgTypes.text, 512)
.transact(tx) // connection closes here, cursor is dead
// CORRECT: process the cursor inside map, before the connection closes
def correct(): Long =
Fragment.of("SELECT name FROM users")
.streamingQuery(PgTypes.text, 512)
.map { cursor =>
var count = 0L
cursor.foreach(_ => count += 1)
count
}
.transact(tx)
Fetch Size
The fetchSize parameter controls how many rows the JDBC driver buffers per network round-trip. A larger fetch size reduces round-trips but uses more memory per batch.
- PostgreSQL: requires
autoCommit=false(the defaultTransactorstrategy already sets this). Rows are fetched from the server in batches offetchSize. - MySQL/MariaDB: use
Integer.MIN_VALUEfor true row-by-row streaming. Note: only one open cursor per connection. - Oracle / SQL Server / DB2: standard
setFetchSize()works as expected. - DuckDB:
setFetchSize()is ignored — all rows are loaded regardless. The API still works, but there is no memory benefit.
A value between 256–2048 is a reasonable starting point for most databases.