Skip to main content

Streaming Reads

For large result sets, materializing all rows into a List can cause out-of-memory errors. Streaming reads return a Cursor — a lazy iterator that fetches rows from the database in batches while the connection stays open.

streamingQuery returns an Operation<Cursor<Row>>. The cursor is live during the map callback, where you process rows incrementally. All the usual combinators (map, combine, transact) work as expected.

Basic Usage

The simplest pattern: stream rows and materialize them with toList().

// Stream rows lazily and materialize into a list
fun allNames(): List<String> {
val streaming = Fragment.of("SELECT name FROM users ORDER BY id")
.streamingQuery(PgTypes.text, 512)

return streaming
.map { it.toList() }
.transact(tx)
}
ParameterDescription
codec / typeA RowCodec or DbType describing the row shape
fetchSizeNumber of rows the JDBC driver fetches per network round-trip

Processing Rows Lazily

The real benefit is processing rows one at a time inside map, without ever holding the full result set in memory:

// Process rows lazily without loading all into memory
fun countExpensiveProducts(): Long {
val streaming = Fragment.of("SELECT price FROM products")
.streamingQuery(PgTypes.int4, 512)

return streaming.map { cursor ->
cursor.asSequence().count { it > 100 }.toLong()
}.transact(tx)
}

Combining Cursors

Multiple streaming operations compose with combine. Both cursors are open simultaneously on the same connection:

// Open two cursors simultaneously on the same connection
fun mergedNames(): List<String> {
val activeUsers = Fragment.of("SELECT name FROM users WHERE active")
.streamingQuery(PgTypes.text, 512)
val archivedUsers = Fragment.of("SELECT name FROM archived_users")
.streamingQuery(PgTypes.text, 512)

return activeUsers.combine(archivedUsers).map { (active, archived) ->
active.toList() + archived.toList()
}.transact(tx)
}

You can also combine streaming operations with regular (non-streaming) operations — just map the cursor first:

streaming.map(Cursor::toList).combine(countOp).transact(tx);

Cursor Lifecycle

Don't return a Cursor from transact

The cursor borrows the connection. When transact returns, the connection is closed and the cursor becomes unusable. Always process the cursor inside map:

// WRONG: the cursor escapes the transaction — connection is already closed!
fun broken(): Cursor<String> {
return Fragment.of("SELECT name FROM users")
.streamingQuery(PgTypes.text, 512)
.transact(tx) // connection closes here, cursor is dead
}

// CORRECT: process the cursor inside map, before the connection closes
fun correct(): Long {
return Fragment.of("SELECT name FROM users")
.streamingQuery(PgTypes.text, 512)
.map { cursor ->
var count = 0L
for (name in cursor) count++
count
}
.transact(tx)
}

Fetch Size

The fetchSize parameter controls how many rows the JDBC driver buffers per network round-trip. A larger fetch size reduces round-trips but uses more memory per batch.

  • PostgreSQL: requires autoCommit=false (the default Transactor strategy already sets this). Rows are fetched from the server in batches of fetchSize.
  • MySQL/MariaDB: use Integer.MIN_VALUE for true row-by-row streaming. Note: only one open cursor per connection.
  • Oracle / SQL Server / DB2: standard setFetchSize() works as expected.
  • DuckDB: setFetchSize() is ignored — all rows are loaded regardless. The API still works, but there is no memory benefit.

A value between 256–2048 is a reasonable starting point for most databases.