Skip to content

Examples

Detailed example

This example demonstrates how to use PgUpsert to upsert data from staging tables to base tables.

1 - Initialize a PostgreSQL database

Initialize a PostgreSQL database called dev with the following schema and data (also available here).

See the full SQL schema: tests/data/schema_passing.sql

Example data ERD

2 - Run PgUpsert

from pg_upsert import PgUpsert

result = PgUpsert(
    uri="postgresql://user@localhost:5432/dev",
    tables=("genres", "publishers", "books", "authors", "book_authors"),
    staging_schema="staging",
    base_schema="public",
    do_commit=True,
    upsert_method="upsert",
    exclude_cols=("rev_user", "rev_time"),
    exclude_null_check_cols=("book_alias",),
).run()

# Inspect results programmatically
print(result.qa_passed)       # True
print(result.committed)       # True
print(result.total_inserted)  # 90
print(result.to_json())       # JSON for CI/CD

3 - Example output (passing data)

With all QA checks passing, pg-upsert shows pass indicators for each check and table, then performs the upsert:

  PostgreSQL → user@localhost:5432/dev
  Password:

  Started at 2026-04-02 14:30:15
  Tables selected for upsert (staging → public)
    • genres
    • publishers
    • books
    • authors
    • book_authors

──────────── Column Existence checks ────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✓ staging.books
  ✓ staging.authors
  ✓ staging.book_authors

────────────── Column Type checks ───────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✓ staging.books
  ✓ staging.authors
  ✓ staging.book_authors

──────────── Non-NULL checks ────────────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✓ staging.books
  ✓ staging.authors
  ✓ staging.book_authors

──────────── Primary Key checks ─────────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✓ staging.books
  ✓ staging.authors
  ✓ staging.book_authors

──────────── Unique checks ──────────────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✓ staging.books
  ✓ staging.authors
  ✓ staging.book_authors

──────────── Foreign Key checks ─────────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✓ staging.books
  ✓ staging.authors
  ✓ staging.book_authors

──────────── Check Constraint checks ────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✓ staging.books
  ✓ staging.authors
  ✓ staging.book_authors

╭─ QA Results ────────────────────────╮
│                                     │
│  ✓ genres                           │
│  ✓ publishers                       │
│  ✓ books                            │
│  ✓ authors                          │
│  ✓ book_authors                     │
│                                     │
│  All 5 tables passed QA checks      │
╰─────────────────────────────────────╯

──────────────── Upsert ─────────────────────────
  method=upsert  commit=ON

  public.genres
    + 19 rows inserted

  public.publishers
    + 21 rows inserted

  public.authors
    + 13 rows inserted

  public.books
    + 18 rows inserted

  public.book_authors
    + 19 rows inserted

       Summary of Changes
┌──────────────┬──────────────┬───────────────┐
│ table_name   │ rows_updated │ rows_inserted │
├──────────────┼──────────────┼───────────────┤
│ genres       │ 0            │ 19            │
│ publishers   │ 0            │ 21            │
│ authors      │ 0            │ 13            │
│ books        │ 0            │ 18            │
│ book_authors │ 0            │ 19            │
├──────────────┼──────────────┼───────────────┤
│ Total        │ 0            │ 90            │
└──────────────┴──────────────┴───────────────┘
  Changes committed

  Finished at 2026-04-02 14:30:23 (8.2 seconds)

4 - Example output (failing data)

When QA checks fail, pg-upsert shows which checks failed with details:

──────────── Non-NULL checks ────────────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✗ staging.books — book_title (1)
  ✓ staging.authors
  ✓ staging.book_authors

──────────── Primary Key checks ─────────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✓ staging.books
  ✗ staging.authors — 1 duplicate keys (2 rows) in table staging.authors
      ┌───────────┬───────┐
      │ author_id │ nrows │
      ├───────────┼───────┤
      │ JDoe      │     2 │
      └───────────┴───────┘
  ✓ staging.book_authors

──────────── Foreign Key checks ─────────────────
  ✓ staging.genres
  ✓ staging.publishers
  ✗ staging.books — books_genre_fkey (1)
      ┌─────────┬───────┐
      │ genre   │ nrows │
      ├─────────┼───────┤
      │ Mystery │     1 │
      └─────────┴───────┘
  ✓ staging.authors
  ✓ staging.book_authors

╭─ QA Results ───────────────────────────────────────╮
│                                                    │
│  ✓ genres                                          │
│  ✓ publishers                                      │
│  ✗ books                                           │
│      null: book_title (1)                          │
│      fk: books_genre_fkey (1)                      │
│  ✗ authors                                         │
│      pk: 1 duplicate keys (2 rows) in staging..    │
│      ck: chk_authors_first_name (1)                │
│  ✓ book_authors                                    │
│                                                    │
│  2 of 5 tables failed QA checks                    │
╰────────────────────────────────────────────────────╯

No upsert is performed when QA checks fail.

5 - Using the UpsertResult

The run() method returns an UpsertResult object:

result = PgUpsert(...).run()

# Overall status
result.qa_passed       # bool — did all QA checks pass?
result.committed       # bool — were changes committed?

# Aggregate counts
result.total_updated   # int — total rows updated
result.total_inserted  # int — total rows inserted

# Per-table details
for table in result.tables:
    print(table.table_name, table.rows_updated, table.rows_inserted)
    if not table.qa_passed:
        for error in table.qa_errors:
            print(f"  {error.check_type.value}: {error.details}")

# JSON serialization for CI/CD
print(result.to_json())

Example JSON output (--output json):

{
  "staging_schema": "staging",
  "base_schema": "public",
  "upsert_method": "upsert",
  "qa_passed": true,
  "committed": true,
  "total_updated": 0,
  "total_inserted": 90,
  "started_at": "2026-04-02 18:30:15",
  "finished_at": "2026-04-02 18:30:23",
  "duration_seconds": 8.234,
  "tables": [
    {
      "table_name": "genres",
      "rows_updated": 0,
      "rows_inserted": 19,
      "qa_passed": true,
      "qa_errors": []
    }
  ]
}

6 - Pipeline callbacks

Use the callback parameter to get per-table progress during a run. The callback receives a PipelineEvent at two points:

  • QA_TABLE_COMPLETE — after all QA checks finish for a table
  • UPSERT_TABLE_COMPLETE — after each table's upsert completes
from pg_upsert import PgUpsert, CallbackEvent

def on_event(event):
    if event.event == CallbackEvent.QA_TABLE_COMPLETE:
        status = "passed" if event.qa_passed else "FAILED"
        print(f"  QA {status}: {event.table}")
        if not event.qa_passed:
            for err in event.qa_errors:
                print(f"    {err.check_type.value}: {err.details}")
    elif event.event == CallbackEvent.UPSERT_TABLE_COMPLETE:
        print(f"  Upserted {event.table}: "
              f"{event.rows_inserted} inserted, {event.rows_updated} updated")

result = PgUpsert(
    uri="postgresql://user@localhost:5432/dev",
    tables=("genres", "publishers", "books"),
    staging_schema="staging",
    base_schema="public",
    do_commit=True,
    callback=on_event,
).run()

Returning False from the callback aborts the pipeline and triggers a rollback:

def abort_on_failure(event):
    if event.event == CallbackEvent.QA_TABLE_COMPLETE and not event.qa_passed:
        print(f"Aborting — QA failed for {event.table}")
        return False  # triggers rollback

result = PgUpsert(..., callback=abort_on_failure).run()

7 - Cleanup

pg-upsert creates temporary tables and views (all prefixed with ups_) during its pipeline. These are session-scoped and normally dropped when the connection closes. For long-lived connections, use cleanup() to drop them explicitly:

import psycopg2
from pg_upsert import PgUpsert

conn = psycopg2.connect(...)

ups = PgUpsert(conn=conn, tables=("books",), staging_schema="staging", base_schema="public")
result = ups.run()
ups.cleanup()  # drops all ups_* temp objects; connection stays open

# conn is still usable for other work

CLI examples

# Basic upsert with commit
pg-upsert -h localhost -d dev -u docker \
  -s staging -b public \
  -t genres -t publishers -t books \
  -x rev_user -x rev_time \
  --commit

# Schema-only validation
pg-upsert --check-schema -h localhost -d dev -u docker \
  -s staging -b public -t books

# JSON output for CI/CD
PGPASSWORD=secret pg-upsert -h localhost -d dev -u docker \
  -s staging -b public -t genres \
  --output json --commit

# Interactive mode with Textual TUI
pg-upsert -h localhost -d dev -u docker \
  -s staging -b public -t genres \
  --interactive --ui textual

# Write to logfile
pg-upsert -h localhost -d dev -u docker \
  -s staging -b public -t genres \
  -l pg-upsert.log --commit

# Export a fix sheet when QA fails (CSV — one file per table)
pg-upsert -h localhost -d dev -u docker \
  -s staging -b public -t genres -t books -t authors \
  --export-failures ./failures/ --export-format csv
#   -> ./failures/pg_upsert_failures_books.csv
#   -> ./failures/pg_upsert_failures_authors.csv
#   -> ./failures/pg_upsert_failures_schema.csv  (if schema issues exist)

# Same fix sheet as an XLSX workbook (one sheet per table)
pg-upsert -h localhost -d dev -u docker \
  -s staging -b public -t genres -t books -t authors \
  --export-failures ./failures/ --export-format xlsx
#   -> ./failures/pg_upsert_failures.xlsx

# Same fix sheet as a nested JSON file
pg-upsert -h localhost -d dev -u docker \
  -s staging -b public -t books \
  --export-failures ./failures/ --export-format json
#   -> ./failures/pg_upsert_failures.json