Streaming Inserts
Streaming inserts use PostgreSQL's COPY FROM STDIN protocol. This feature is not available for other databases.
Streaming inserts use PostgreSQL's COPY protocol to load data significantly faster than individual INSERT statements. Data is text-encoded in batches and streamed directly to the server, bypassing the overhead of prepared statements.
streamingInsert.of() returns an Operation<Long> that can be transacted like any other operation. The COPY participates in the current transaction, so it can be composed with other operations atomically.
Single Column
For simple cases, use the PgText encoder from the type directly:
- Kotlin
- Java
- Scala
// Insert a list of strings using COPY
fun insertNames(names: Iterator<String>, tx: Transactor): Long {
return streamingInsert
.of("COPY users(name) FROM STDIN", 1000, names, PgTypes.text.pgText())
.transact(tx)
}
// Insert a list of strings using COPY
long insertNames(Iterator<String> names, Transactor tx) {
return streamingInsert
.of("COPY users(name) FROM STDIN", 1000, names, PgTypes.text.pgText())
.transact(tx);
}
// Insert a list of strings using COPY
def insertNames(names: Iterator[String], tx: Transactor): Long =
streamingInsert
.of("COPY users(name) FROM STDIN", 1000, names, PgTypes.text.pgText())
.transact(tx)
| Parameter | Description |
|---|---|
copyCommand | A PostgreSQL COPY ... FROM STDIN command |
batchSize | Number of rows to buffer before flushing to the server |
rows | An Iterator over your data |
text | A PgText<T> encoder for your row type |
Multi-Column Rows
For rows with multiple columns, derive a PgText encoder from a RowCodec:
- Kotlin
- Java
- Scala
// Define a RowCodec for your row type
val productCodec: RowCodec<ProductRow> = RowCodec.builder<ProductRow>()
.field(PgTypes.text, ProductRow::name)
.field(PgTypes.numeric, ProductRow::price)
.field(PgTypes.int4, ProductRow::quantity)
.build(::ProductRow)
// PgText.from() derives a text encoder from the RowCodec
val productText: PgText<ProductRow> = PgText.from(productCodec.underlying)
fun insertProducts(products: Iterator<ProductRow>, tx: Transactor): Long {
return streamingInsert
.of("COPY products(name, price, quantity) FROM STDIN", 1000, products, productText)
.transact(tx)
}
// Define a RowCodec for your row type
static RowCodec<ProductRow> productCodec = RowCodec.<ProductRow>builder()
.field(PgTypes.text, ProductRow::name)
.field(PgTypes.numeric, ProductRow::price)
.field(PgTypes.int4, ProductRow::quantity)
.build(ProductRow::new);
// PgText.from() derives a text encoder from the RowCodec
static PgText<ProductRow> productText = PgText.from(productCodec);
long insertProducts(Iterator<ProductRow> products, Transactor tx) {
return streamingInsert
.of("COPY products(name, price, quantity) FROM STDIN", 1000, products, productText)
.transact(tx);
}
// Define a RowCodec for your row type
val productCodec: RowCodec[ProductRow] =
RowCodec.builder[ProductRow]()
.field(PgTypes.text)(_.name)
.field(PgTypes.numeric)(_.price)
.field(PgTypes.int4)(_.quantity)
.build(ProductRow.apply)
// PgText.from() derives a text encoder from the RowCodec
val productText: PgText[ProductRow] = PgText.from(productCodec)
def insertProducts(products: Iterator[ProductRow], tx: Transactor): Long =
streamingInsert
.of("COPY products(name, price, quantity) FROM STDIN", 1000, products, productText)
.transact(tx)
PgText.from(rowCodec) uses each column's text encoder to produce tab-delimited COPY format. The same RowCodec you use for reading rows can drive bulk loading.
Supported Types
Most PostgreSQL types support text encoding for COPY. Types that don't (such as jsonb) will throw UnsupportedOperationException at encode time.
Batch Size
The batchSize parameter controls how many rows are buffered in memory before being flushed to PostgreSQL. A larger batch size reduces network round-trips but uses more memory. A value between 1000-10000 is a reasonable starting point.