Skip to content

API Reference

This page documents the public classes, dataclasses, and type aliases exported by pg_upsert. Each entry below is generated from the module docstrings — class names in the right-hand table of contents are symbol headings generated by mkdocstrings (not plain markdown).

Main Entry Point

pg_upsert.PgUpsert

PgUpsert(uri: None | str = None, conn: None | connection = None, encoding: str = 'utf-8', tables: list | tuple | None = (), staging_schema: str | None = None, base_schema: str | None = None, do_commit: bool = False, interactive: bool = False, upsert_method: str = 'upsert', exclude_cols: list | tuple | None = (), exclude_null_check_cols: list | tuple | None = (), control_table: str = 'ups_control', ui_mode: str = 'auto', compact: bool = False, callback: PipelineCallback | None = None, capture_detail_rows: bool = False, max_export_rows: int = 1000, strict_columns: bool = False)

Perform one or all of the following operations on a set of PostgreSQL tables:

  • Perform QA checks on data in a staging table or set of staging tables. QA checks include not-null, primary key, foreign key, and check constraint checks.
  • Perform updates and inserts (upserts) on a base table or set of base tables from the staging table(s) of the same name.

PgUpsert utilizes temporary tables and views inside the PostgreSQL database to dynamically generate SQL for QA checks and upserts. All temporary objects are initialized with the ups_ prefix.

The upsert process is transactional. If any part of the process fails, the transaction will be rolled back. Committing changes to the database is optional and can be controlled with the do_commit flag.

All SQL statements are generated using the psycopg2.sql module.

Parameters:

Name Type Description Default
uri str or None

Connection URI for the PostgreSQL database. Defaults to None. Note: If a connection URI is not provided, an existing connection object must be provided.

None
conn connection or None

An existing connection object to the PostgreSQL database. Defaults to None. Note: If a connection object is not provided, a connection URI must be provided. If both are provided, the connection object will be used.

None
encoding str

The encoding to use for the database connection. Defaults to "utf-8".

'utf-8'
tables list or tuple or None

List of table names to perform QA checks on and upsert. Defaults to ().

()
staging_schema str or None

Name of the staging schema where tables are located which will be used for QA checks and upserts. Tables in the staging schema must have the same name as the tables in the base schema that they will be upserted to. Defaults to None.

None
base_schema str or None

Name of the base schema where tables are located which will be updated or inserted into. Defaults to None.

None
do_commit bool

If True, changes will be committed to the database once the upsert process has completed successfully. If False, changes will be rolled back. Defaults to False.

False
interactive bool

If True, the user will be prompted with multiple dialogs to confirm various steps during the upsert process. If False, the upsert process will run without user intervention. Defaults to False.

False
upsert_method str

The method to use for upserting data. Must be one of "upsert", "update", or "insert". Defaults to "upsert".

'upsert'
exclude_cols list or tuple or None

List of column names to exclude from the upsert process. These columns will not be updated or inserted to, however, they will still be checked during the QA process.

()
exclude_null_check_cols list or tuple or None

List of column names to exclude from the not-null check during the QA process. You may wish to exclude certain columns from null checks, such as auto-generated timestamps or serial columns as they may not be populated until after records are inserted or updated. Defaults to ().

()
control_table str

Name of the temporary control table that will be used to track changes during the upsert process. Defaults to "ups_control".

'ups_control'
ui_mode str

Interactive UI backend to use when interactive=True. One of "auto" (pick tkinter if a display is available, else textual), "tkinter" (force desktop GUI), or "textual" (force terminal TUI). When interactive=False this is ignored and a non-interactive console backend is used internally. Defaults to "auto".

'auto'
compact bool

If True, render the QA summary as a compact / grid (one row per table, one column per check type) instead of a per-check panel. Useful for runs with many tables. Defaults to False.

False
callback PipelineCallback or None

Optional callable invoked at key pipeline events (QA_TABLE_COMPLETE, UPSERT_TABLE_COMPLETE). Receives a :class:PipelineEvent. Returning False aborts the pipeline and triggers a rollback. Defaults to None.

None
capture_detail_rows bool

If True, each QA check captures the actual violating staging rows onto QAError.violations / QAError.schema_issues so they can be exported by :meth:UpsertResult.export_failures. Adds extra queries per check and is only enabled automatically when --export-failures is passed on the CLI. Defaults to False.

False
max_export_rows int

Maximum number of violating rows captured per check per table when capture_detail_rows=True. Applied as a SQL LIMIT on each detail query. Defaults to 1000.

1000
strict_columns bool

If True, treat all missing staging columns as errors during column existence checks. By default only primary key columns and NOT NULL columns without a default are errors; other missing columns produce warnings. Defaults to False.

False

Example:

from pg_upsert import PgUpsert

ups  = PgUpsert(
    uri="postgresql://user@localhost:5432/database", # Note the missing password. pg_upsert will prompt for the password.
    tables=("genres", "books", "publishers", "authors", "book_authors"),
    staging_schema="staging",
    base_schema="public",
    do_commit=False,
    upsert_method="upsert",
    interactive=False,
    exclude_cols=("rev_user", "rev_time", "created_at", "updated_at"),
    exclude_null_check_cols=("rev_user", "rev_time", "created_at", "updated_at", "alias"),
)
Source code in src/pg_upsert/upsert.py
def __init__(
    self,
    uri: None | str = None,
    conn: None | psycopg2.extensions.connection = None,
    encoding: str = "utf-8",
    tables: list | tuple | None = (),
    staging_schema: str | None = None,
    base_schema: str | None = None,
    do_commit: bool = False,
    interactive: bool = False,
    upsert_method: str = "upsert",
    exclude_cols: list | tuple | None = (),
    exclude_null_check_cols: list | tuple | None = (),
    control_table: str = "ups_control",
    ui_mode: str = "auto",
    compact: bool = False,
    callback: PipelineCallback | None = None,
    capture_detail_rows: bool = False,
    max_export_rows: int = 1000,
    strict_columns: bool = False,
):
    if upsert_method not in self._upsert_methods():
        raise ValueError(
            f"Invalid upsert method: {upsert_method}. Must be one of {self._upsert_methods()}",
        )
    if not base_schema or not staging_schema:
        if not base_schema and not staging_schema:
            raise ValueError("No base or staging schema specified")
        if not base_schema:
            raise ValueError("No base schema specified")
        if not staging_schema:
            raise ValueError("No staging schema specified")
    if not tables:
        raise ValueError("No tables specified")
    if staging_schema == base_schema:
        raise ValueError(
            f"Staging and base schemas must be different. Got {staging_schema} for both.",
        )
    self.db = PostgresDB(
        uri=uri,
        conn=conn,
        encoding=encoding,
    )
    logger.debug(f"Connected to {self.db!s}")
    self.tables = tables
    self.staging_schema = staging_schema
    self.base_schema = base_schema
    self.do_commit = do_commit
    self.interactive = interactive
    self.upsert_method = upsert_method
    self.exclude_cols = exclude_cols
    self.exclude_null_check_cols = exclude_null_check_cols
    self.control_table = control_table
    self.compact = compact
    self.qa_passed = False
    self._qa_findings: list[QAError] = []
    self._callback = callback

    # Validate schemas once (not twice — bug fix).
    self._validate_schemas()
    for table in self.tables:
        self._validate_table(table)

    # Initialise sub-components.
    _effective_ui_mode = ui_mode if interactive else "_console"
    self._ui = get_ui_backend(_effective_ui_mode)
    self._control = ControlTable(self.db, table_name=control_table, ui=self._ui)
    self._qa = QARunner(
        db=self.db,
        control=self._control,
        staging_schema=staging_schema,
        base_schema=base_schema,
        exclude_null_check_cols=exclude_null_check_cols or (),
        ui=self._ui,
        capture_detail_rows=capture_detail_rows,
        max_export_rows=max_export_rows,
        strict_columns=strict_columns,
    )
    self._executor = UpsertExecutor(
        db=self.db,
        control=self._control,
        staging_schema=staging_schema,
        base_schema=base_schema,
        upsert_method=upsert_method,
        ui=self._ui,
    )

    self._control.initialize(
        tables=list(tables),
        exclude_cols=list(exclude_cols) if exclude_cols else None,
        exclude_null_check_cols=list(exclude_null_check_cols) if exclude_null_check_cols else None,
        interactive=interactive,
    )

qa_errors property

qa_errors: list[QAError]

ERROR-level QA findings only — issues that blocked the upsert.

This is the backwards-compatible view: it contains the same items that qa_errors contained before severity was introduced. Use :attr:qa_findings for the full list (errors + warnings).

qa_warnings property

qa_warnings: list[QAError]

WARNING-level QA findings only — informational, did not block the upsert.

qa_findings property

qa_findings: list[QAError]

All QA findings (errors and warnings combined).

cleanup

cleanup() -> PgUpsert

Drop all temporary tables and views created by the upsert pipeline.

Issues DROP ... IF EXISTS ... CASCADE for every known ups_* temporary object, including the configurable control table. Safe to call multiple times or when no temporary objects exist.

This is useful for long-lived connections where you want to reclaim temporary object space without closing the connection.

Returns:

Type Description
PgUpsert

self, for method chaining.

Source code in src/pg_upsert/upsert.py
def cleanup(self: PgUpsert) -> PgUpsert:
    """Drop all temporary tables and views created by the upsert pipeline.

    Issues ``DROP ... IF EXISTS ... CASCADE`` for every known ``ups_*``
    temporary object, including the configurable control table.  Safe to
    call multiple times or when no temporary objects exist.

    This is useful for long-lived connections where you want to reclaim
    temporary object space without closing the connection.

    Returns:
        self, for method chaining.
    """
    for view in self._TEMP_VIEWS:
        self.db.execute(SQL("DROP VIEW IF EXISTS {} CASCADE").format(Identifier(view)))
    for table in self._TEMP_TABLES:
        self.db.execute(SQL("DROP TABLE IF EXISTS {} CASCADE").format(Identifier(table)))
    # The control table name is configurable.
    self.db.execute(SQL("DROP TABLE IF EXISTS {} CASCADE").format(Identifier(self.control_table)))
    # Reset pipeline state so the instance isn't left with stale results.
    self.qa_passed = False
    self._qa_findings = []
    # Invalidate the per-table PK column cache so a subsequent run
    # after a schema change doesn't return stale PKs.
    self._qa._pk_cols_cache.clear()
    return self

show_control

show_control() -> None

Display contents of the control table.

If the interactive flag is set to True, the control table will be displayed in a Tkinter window. Otherwise, the results will be logged.

The control table definition is as follows:

column name data type required description
table_name text yes The name of the table to process.
exclude_cols text no A comma-separated list of columns to exclude from the upsert process.
exclude_null_checks text no A comma-separated list of columns to exclude from the not-null check during the QA process.
interactive boolean yes A flag to indicate whether the QA and upsert processes should be interactive.
null_errors text no A comma-separated list of columns with null values.
pk_errors text no A comma-separated list of primary key errors.
fk_errors text no A comma-separated list of foreign key errors.
ck_errors text no A comma-separated list of check constraint errors.
rows_updated integer no The number of rows updated during the upsert process.
rows_inserted integer no The number of rows inserted during the upsert process.
Source code in src/pg_upsert/upsert.py
def show_control(self: PgUpsert) -> None:
    """Display contents of the control table.

    If the `interactive` flag is set to `True`, the control table will be displayed in a Tkinter window. Otherwise, the results will be logged.

    The control table definition is as follows:

    | column name          | data type | required | description |
    |----------------------|-----------|----------|-------------|
    | `table_name`         | text      | yes      | The name of the table to process. |
    | `exclude_cols`       | text      | no       | A comma-separated list of columns to exclude from the upsert process. |
    | `exclude_null_checks`| text      | no       | A comma-separated list of columns to exclude from the not-null check during the QA process. |
    | `interactive`        | boolean   | yes      | A flag to indicate whether the QA and upsert processes should be interactive. |
    | `null_errors`        | text      | no       | A comma-separated list of columns with null values. |
    | `pk_errors`          | text      | no       | A comma-separated list of primary key errors. |
    | `fk_errors`          | text      | no       | A comma-separated list of foreign key errors. |
    | `ck_errors`          | text      | no       | A comma-separated list of check constraint errors. |
    | `rows_updated`       | integer   | no       | The number of rows updated during the upsert process. |
    | `rows_inserted`      | integer   | no       | The number of rows inserted during the upsert process. |
    """  # noqa: E501
    self._validate_control()
    self._control.show(self.interactive)

qa_all

qa_all() -> PgUpsert

Performs QA checks for nulls in non-null columns, for duplicated primary key values, for invalid foreign keys, and invalid check constraints in a set of staging tables to be loaded into base tables. If there are failures in the QA checks, loading is not attempted. If the loading step is carried out, it is done within a transaction.

The null_errors, pk_errors, fk_errors, ck_errors columns of the control table will be updated to identify any errors that occur, so that this information is available to the caller.

The rows_updated and rows_inserted columns of the control table will be updated with counts of the number of rows affected by the upsert operation for each table.

When the upsert operation updates the base table, all columns of the base table that are also in the staging table are updated. The update operation does not test to see if column contents are different, and so does not update only those values that are different.

This method runs PgUpsert methods in the following order:

  1. PgUpsert.qa_all_null
  2. PgUpsert.qa_all_pk
  3. PgUpsert.qa_all_fk
  4. PgUpsert.qa_all_ck

Example:

PgUpsert(
    uri="postgresql://user@localhost:5432/database",
    tables=("genres", "books", "publishers", "authors", "book_authors"),
    staging_schema="staging",
    base_schema="public",
    do_commit=False,
    interactive=False,
    exclude_cols=("rev_user", "rev_time", "created_at", "updated_at"),
    exclude_null_check_cols=("rev_user", "rev_time", "created_at", "updated_at", "alias"),
).qa_all()
Source code in src/pg_upsert/upsert.py
def qa_all(self: PgUpsert) -> PgUpsert:
    """Performs QA checks for nulls in non-null columns, for duplicated
    primary key values, for invalid foreign keys, and invalid check constraints
    in a set of staging tables to be loaded into base tables.
    If there are failures in the QA checks, loading is not attempted.
    If the loading step is carried out, it is done within a transaction.

    The `null_errors`, `pk_errors`, `fk_errors`, `ck_errors` columns of the
    control table will be updated to identify any errors that occur,
    so that this information is available to the caller.

    The `rows_updated` and `rows_inserted` columns of the control table
    will be updated with counts of the number of rows affected by the
    upsert operation for each table.

    When the upsert operation updates the base table, all columns of the
    base table that are also in the staging table are updated.  The
    update operation does not test to see if column contents are different,
    and so does not update only those values that are different.

    This method runs [`PgUpsert`](pg_upsert.md) methods in the following order:

    1. [`PgUpsert.qa_all_null`](pg_upsert.md#pg_upsert.PgUpsert.qa_all_null)
    2. [`PgUpsert.qa_all_pk`](pg_upsert.md#pg_upsert.PgUpsert.qa_all_pk)
    3. [`PgUpsert.qa_all_fk`](pg_upsert.md#pg_upsert.PgUpsert.qa_all_fk)
    4. [`PgUpsert.qa_all_ck`](pg_upsert.md#pg_upsert.PgUpsert.qa_all_ck)

    **Example:**

    ```python
    PgUpsert(
        uri="postgresql://user@localhost:5432/database",
        tables=("genres", "books", "publishers", "authors", "book_authors"),
        staging_schema="staging",
        base_schema="public",
        do_commit=False,
        interactive=False,
        exclude_cols=("rev_user", "rev_time", "created_at", "updated_at"),
        exclude_null_check_cols=("rev_user", "rev_time", "created_at", "updated_at", "alias"),
    ).qa_all()
    ```
    """  # noqa: E501
    self._validate_control()
    self._control.clear_results()
    self._qa_findings = self._qa.run_all(
        list(self.tables),
        interactive=self.interactive,
        compact=self.compact,
        callback=self._callback,
    )
    self._update_qa_passed()
    return self

qa_all_null

qa_all_null() -> PgUpsert

Performs null checks for non-null columns in selected staging tables.

Source code in src/pg_upsert/upsert.py
def qa_all_null(self: PgUpsert) -> PgUpsert:
    """Performs null checks for non-null columns in selected staging tables."""
    total = len(self.tables)
    for i, table in enumerate(self.tables, 1):
        ctx = CheckContext(table_num=i, total_tables=total)
        self._qa_findings.extend(self._qa.check_nulls(table, ctx=ctx))
    self._update_qa_passed()
    return self

qa_one_null

qa_one_null(table: str) -> PgUpsert

Performs null checks for non-null columns in a single staging table.

Parameters:

Name Type Description Default
table str

The name of the staging table to check for null values.

required
Source code in src/pg_upsert/upsert.py
def qa_one_null(self: PgUpsert, table: str) -> PgUpsert:
    """Performs null checks for non-null columns in a single staging table.

    Args:
        table (str): The name of the staging table to check for null values.
    """
    self._validate_table(table)
    self._qa_findings.extend(self._qa.check_nulls(table))
    self._update_qa_passed()
    return self

qa_all_pk

qa_all_pk() -> PgUpsert

Performs primary key checks for duplicated primary key values in selected staging tables.

Source code in src/pg_upsert/upsert.py
def qa_all_pk(self: PgUpsert) -> PgUpsert:
    """Performs primary key checks for duplicated primary key values in selected staging tables."""
    total = len(self.tables)
    for i, table in enumerate(self.tables, 1):
        ctx = CheckContext(table_num=i, total_tables=total)
        self._qa_findings.extend(self._qa.check_pks(table, interactive=self.interactive, ctx=ctx))
    self._update_qa_passed()
    return self

qa_one_pk

qa_one_pk(table: str) -> PgUpsert

Performs primary key checks for duplicated primary key values in a single staging table.

Parameters:

Name Type Description Default
table str

The name of the staging table to check for duplicate primary key values.

required
Source code in src/pg_upsert/upsert.py
def qa_one_pk(self: PgUpsert, table: str) -> PgUpsert:
    """Performs primary key checks for duplicated primary key values in a single staging table.

    Args:
        table (str): The name of the staging table to check for duplicate primary key values.
    """
    self._validate_table(table)
    self._qa_findings.extend(self._qa.check_pks(table, interactive=self.interactive))
    self._update_qa_passed()
    return self

qa_all_fk

qa_all_fk() -> PgUpsert

Performs foreign key checks for invalid foreign key values in selected staging tables.

Source code in src/pg_upsert/upsert.py
def qa_all_fk(self: PgUpsert) -> PgUpsert:
    """Performs foreign key checks for invalid foreign key values in selected staging tables."""
    total = len(self.tables)
    for i, table in enumerate(self.tables, 1):
        ctx = CheckContext(table_num=i, total_tables=total)
        self._qa_findings.extend(self._qa.check_fks(table, interactive=self.interactive, ctx=ctx))
    self._update_qa_passed()
    return self

qa_one_fk

qa_one_fk(table: str) -> PgUpsert

Performs foreign key checks for invalid foreign key values in a single staging table.

Parameters:

Name Type Description Default
table str

The name of the staging table to check for invalid foreign key values.

required
Source code in src/pg_upsert/upsert.py
def qa_one_fk(self: PgUpsert, table: str) -> PgUpsert:
    """Performs foreign key checks for invalid foreign key values in a single staging table.

    Args:
        table (str): The name of the staging table to check for invalid foreign key values.
    """
    self._validate_table(table)
    self._qa_findings.extend(self._qa.check_fks(table, interactive=self.interactive))
    self._update_qa_passed()
    return self

qa_all_ck

qa_all_ck() -> PgUpsert

Performs check constraint checks for invalid check constraint values in selected staging tables.

Source code in src/pg_upsert/upsert.py
def qa_all_ck(self: PgUpsert) -> PgUpsert:
    """Performs check constraint checks for invalid check constraint values in selected staging tables."""
    total = len(self.tables)
    for i, table in enumerate(self.tables, 1):
        ctx = CheckContext(table_num=i, total_tables=total)
        self._qa_findings.extend(self._qa.check_cks(table, ctx=ctx))
    self._update_qa_passed()
    return self

qa_one_ck

qa_one_ck(table: str) -> PgUpsert

Performs check constraint checks for invalid check constraint values in a single staging table.

Parameters:

Name Type Description Default
table str

The name of the staging table to check for invalid check constraint values.

required
Source code in src/pg_upsert/upsert.py
def qa_one_ck(self: PgUpsert, table: str) -> PgUpsert:
    """Performs check constraint checks for invalid check constraint values in a single staging table.

    Args:
        table (str): The name of the staging table to check for invalid check constraint values.
    """
    self._validate_table(table)
    self._qa_findings.extend(self._qa.check_cks(table))
    self._update_qa_passed()
    return self

qa_all_unique

qa_all_unique() -> PgUpsert

Performs unique constraint checks on all selected staging tables.

Source code in src/pg_upsert/upsert.py
def qa_all_unique(self: PgUpsert) -> PgUpsert:
    """Performs unique constraint checks on all selected staging tables."""
    total = len(self.tables)
    for i, table in enumerate(self.tables, 1):
        ctx = CheckContext(table_num=i, total_tables=total)
        self._qa_findings.extend(self._qa.check_unique(table, interactive=self.interactive, ctx=ctx))
    self._update_qa_passed()
    return self

qa_one_unique

qa_one_unique(table: str) -> PgUpsert

Performs unique constraint checks on a single staging table.

Parameters:

Name Type Description Default
table str

The name of the staging table to check.

required
Source code in src/pg_upsert/upsert.py
def qa_one_unique(self: PgUpsert, table: str) -> PgUpsert:
    """Performs unique constraint checks on a single staging table.

    Args:
        table (str): The name of the staging table to check.
    """
    self._validate_table(table)
    self._qa_findings.extend(self._qa.check_unique(table, interactive=self.interactive))
    self._update_qa_passed()
    return self

qa_column_existence

qa_column_existence() -> PgUpsert

Checks that all base table columns exist in the staging tables.

Respects the exclude_cols setting — excluded columns are not flagged.

Source code in src/pg_upsert/upsert.py
def qa_column_existence(self: PgUpsert) -> PgUpsert:
    """Checks that all base table columns exist in the staging tables.

    Respects the ``exclude_cols`` setting — excluded columns are not flagged.
    """
    total = len(self.tables)
    for i, table in enumerate(self.tables, 1):
        ctx = CheckContext(table_num=i, total_tables=total)
        self._qa_findings.extend(self._qa.check_column_existence(table, ctx=ctx))
    self._update_qa_passed()
    return self

qa_type_mismatch

qa_type_mismatch() -> PgUpsert

Checks for hard type incompatibilities between staging and base columns.

Only flags mismatches where PostgreSQL has no implicit or assignment cast.

Source code in src/pg_upsert/upsert.py
def qa_type_mismatch(self: PgUpsert) -> PgUpsert:
    """Checks for hard type incompatibilities between staging and base columns.

    Only flags mismatches where PostgreSQL has no implicit or assignment cast.
    """
    total = len(self.tables)
    for i, table in enumerate(self.tables, 1):
        ctx = CheckContext(table_num=i, total_tables=total)
        self._qa_findings.extend(self._qa.check_type_mismatch(table, ctx=ctx))
    self._update_qa_passed()
    return self

upsert_all

upsert_all() -> PgUpsert

Performs upsert operations on all selected tables in the base schema.

Objects created:

table / view description
ups_dependencies Temporary table containing the dependencies of the base schema.
ups_ordered_tables Temporary table containing the selected tables ordered by dependency.
Source code in src/pg_upsert/upsert.py
def upsert_all(self: PgUpsert) -> PgUpsert:
    """Performs upsert operations on all selected tables in the base schema.

    **Objects created:**

    | table / view | description |
    |--------------|-------------|
    | `ups_dependencies` | Temporary table containing the dependencies of the base schema. |
    | `ups_ordered_tables` | Temporary table containing the selected tables ordered by dependency. |
    """  # noqa: E501
    self._validate_control()
    if not self.qa_passed:
        display.console.print(
            "  [bold red]QA checks have not passed — refusing to upsert.[/bold red]",
        )
        _file_logger.error("QA checks have not passed — refusing to upsert.")
        return self
    display.console.print()
    display.console.rule("[bold]Upsert[/bold]", style="cyan")
    commit_label = "[green]ON[/green]" if self.do_commit else "[dim]OFF[/dim]"
    display.console.print(f"  method={self.upsert_method}  commit={commit_label}")
    _file_logger.info(f"=== Upsert (method={self.upsert_method}, commit={self.do_commit}) ===")
    # Sync any runtime change to upsert_method before delegating.
    self._executor.upsert_method = self.upsert_method
    self._executor.upsert_all(list(self.tables), interactive=self.interactive)
    return self

upsert_one

upsert_one(table: str) -> PgUpsert

Performs an upsert operation on a single table.

Parameters:

Name Type Description Default
table str

The name of the table to upsert.

required
Source code in src/pg_upsert/upsert.py
def upsert_one(self: PgUpsert, table: str) -> PgUpsert:
    """Performs an upsert operation on a single table.

    Args:
        table (str): The name of the table to upsert.
    """
    self._validate_table(table)
    # Sync any runtime change to upsert_method before delegating.
    self._executor.upsert_method = self.upsert_method
    self._executor.upsert_one(table, interactive=self.interactive)
    return self

run

run() -> UpsertResult

Run all QA checks and upsert operations.

This method runs PgUpsert methods in the following order:

  1. PgUpsert.qa_all()
  2. PgUpsert.upsert_all()
  3. PgUpsert.commit()

Returns:

Name Type Description
UpsertResult UpsertResult

Structured result containing QA outcomes and row counts.

Source code in src/pg_upsert/upsert.py
def run(self: PgUpsert) -> UpsertResult:
    """Run all QA checks and upsert operations.

    This method runs `PgUpsert` methods in the following order:

    1. [`PgUpsert.qa_all()`](pg_upsert.md#pg_upsert.PgUpsert.qa_all)
    2. [`PgUpsert.upsert_all()`](pg_upsert.md#pg_upsert.PgUpsert.upsert_all)
    3. [`PgUpsert.commit()`](pg_upsert.md#pg_upsert.PgUpsert.commit)

    Returns:
        UpsertResult: Structured result containing QA outcomes and row counts.
    """
    start_time = datetime.now()
    start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
    num_tables = len(self.tables)

    # Logfile header — clear separator for appended runs.
    from importlib.metadata import PackageNotFoundError, version

    try:
        _ver = version("pg_upsert")
    except PackageNotFoundError:
        _ver = "unknown"
    try:
        _pg_ver = self.db.execute("SELECT version()").fetchone()[0]
    except (psycopg2.Error, IndexError, TypeError):
        _pg_ver = "unknown"
    _file_logger.info("")
    _file_logger.info("=" * 60)
    _file_logger.info(f"pg-upsert {_ver} — run started at {start_str}")
    _file_logger.info(f"  {self.staging_schema}{self.base_schema} ({num_tables} tables)")
    _file_logger.info(f"  PostgreSQL: {_pg_ver}")
    _file_logger.info("=" * 60)

    display.console.print()
    display.console.print(f"  [dim]Started at {start_str}[/dim]")
    display.console.print(
        f"  Tables selected for upsert "
        f"[dim]([/dim][bold]{self.staging_schema}[/bold] [dim]→[/dim] "
        f"[bold]{self.base_schema}[/bold][dim], {num_tables} tables)[/dim]",
    )
    if self.interactive:
        btn, _return_value = self._ui.show_table(
            "Upsert Tables",
            "Tables selected for upsert",
            [
                ("Continue", 0, "<Return>"),
                ("Cancel", 1, "<Escape>"),
            ],
            ["Table"],
            [[table] for table in self.tables],
        )
        if btn != 0:
            display.console.print("  [dim]Upsert cancelled[/dim]")
            _file_logger.info("Upsert cancelled")
            return UpsertResult(tables=[], committed=False)
    for table in self.tables:
        display.console.print(f"    [dim]•[/dim] {table}")
        _file_logger.info(f"  {table}")

    # Reset qa_passed and reinitialise the control table for a fresh run.
    self.qa_passed = False
    self._qa_findings = []
    self._init_ups_control()

    committed = False
    table_results: list[TableResult] = []

    try:
        self._control.clear_results()
        self._qa_findings = self._qa.run_all(
            list(self.tables),
            interactive=self.interactive,
            compact=self.compact,
            callback=self._callback,
        )
        self._update_qa_passed()
        if self.qa_passed:
            table_results = self._executor.upsert_all(
                list(self.tables),
                interactive=self.interactive,
                callback=self._callback,
            )
            committed = self._do_commit()
    except UserCancelledError:
        display.console.print("  [bold yellow]Rolling back changes due to user cancellation[/bold yellow]")
        _file_logger.info("Rolling back changes due to user cancellation")
        self.db.rollback()
    except psycopg2.Error as e:
        display.console.print(f"  [bold red]Database error — rolling back:[/bold red] {e}")
        _file_logger.error(f"Database error — rolling back: {e}")
        self.db.rollback()

    end_time = datetime.now()
    end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
    duration = elapsed_time(start_time)
    display.console.print(f"\n  [dim]Finished at {end_str} ({duration})[/dim]")

    # Logfile footer.
    _file_logger.info("-" * 60)
    _file_logger.info(f"pg-upsert run finished at {end_str} ({duration})")
    _file_logger.info("-" * 60)

    # Merge QA errors into table results.
    error_map: dict[str, list[QAError]] = {}
    for err in self._qa_findings:
        error_map.setdefault(err.table, []).append(err)

    # Build a TableResult for every table (including QA-only runs).
    result_map: dict[str, TableResult] = {r.table_name: r for r in table_results}
    for table in self.tables:
        if table not in result_map:
            result_map[table] = TableResult(table_name=table)
        result_map[table]._qa_findings = error_map.get(table, [])

    duration_seconds = (end_time - start_time).total_seconds()
    return UpsertResult(
        tables=[result_map[t] for t in self.tables if t in result_map],
        committed=committed,
        staging_schema=self.staging_schema,
        base_schema=self.base_schema,
        upsert_method=self.upsert_method,
        started_at=start_str,
        finished_at=end_str,
        duration_seconds=round(duration_seconds, 3),
    )

commit

commit() -> PgUpsert

Commits the transaction to the database and show a summary of changes.

Changes are committed if the following criteria are met:

  • The do_commit flag is set to True.
  • All QA checks have passed (i.e., the qa_passed flag is set to True).
  • The summary of changes shows that rows have been updated or inserted.
  • If the interactive flag is set to True and the do_commit flag is is set to False, the user is prompted to commit the changes and the user selects "Continue".

If qa_passed is False, the transaction is rolled back and no commit is attempted. This guards against accidentally persisting data that failed QA checks when using the step-by-step API.

Source code in src/pg_upsert/upsert.py
def commit(self: PgUpsert) -> PgUpsert:
    """Commits the transaction to the database and show a summary of changes.

    Changes are committed if the following criteria are met:

    - The `do_commit` flag is set to `True`.
    - All QA checks have passed (i.e., the `qa_passed` flag is set to `True`).
    - The summary of changes shows that rows have been updated or inserted.
    - If the `interactive` flag is set to `True` and the `do_commit` flag is is set to `False`, the user is prompted to commit the changes and the user selects "Continue".

    If ``qa_passed`` is ``False``, the transaction is rolled back and no
    commit is attempted.  This guards against accidentally persisting data
    that failed QA checks when using the step-by-step API.
    """  # noqa: E501
    if not self.qa_passed:
        display.console.print(
            "  [bold yellow]QA checks have not passed — refusing to commit. Rolling back.[/bold yellow]",
        )
        _file_logger.warning("QA checks have not passed — refusing to commit. Rolling back.")
        self.db.rollback()
        return self
    self._do_commit()
    return self

Result Objects

These dataclasses are returned from PgUpsert.run() and its facade methods. They provide structured access to QA outcomes and upsert statistics, and are JSON-serialisable via to_dict() / to_json().

pg_upsert.UpsertResult dataclass

UpsertResult(tables: list[TableResult] = list(), committed: bool = False, staging_schema: str = '', base_schema: str = '', upsert_method: str = '', started_at: str = '', finished_at: str = '', duration_seconds: float = 0.0)

Structured result from a PgUpsert.run() call.

Provides programmatic access to QA results and upsert statistics for all tables processed.

Attributes:

Name Type Description
tables list[TableResult]

Per-table results.

committed bool

Whether the transaction was committed.

staging_schema str

Name of the staging schema.

base_schema str

Name of the base schema.

upsert_method str

The upsert method used (upsert, update, insert).

started_at str

ISO 8601 timestamp when the run started.

finished_at str

ISO 8601 timestamp when the run finished.

duration_seconds float

Elapsed time in seconds.

qa_passed property

qa_passed: bool

True if all tables passed QA checks.

total_updated property

total_updated: int

Total rows updated across all tables.

total_inserted property

total_inserted: int

Total rows inserted across all tables.

to_dict

to_dict() -> dict

Serialize to a dictionary for JSON output.

Source code in src/pg_upsert/models.py
def to_dict(self) -> dict:
    """Serialize to a dictionary for JSON output."""
    return {
        "staging_schema": self.staging_schema,
        "base_schema": self.base_schema,
        "upsert_method": self.upsert_method,
        "qa_passed": self.qa_passed,
        "committed": self.committed,
        "total_updated": self.total_updated,
        "total_inserted": self.total_inserted,
        "started_at": self.started_at,
        "finished_at": self.finished_at,
        "duration_seconds": self.duration_seconds,
        "tables": [t.to_dict() for t in self.tables],
    }

to_json

to_json(indent: int = 2) -> str

Serialize to a JSON string.

Source code in src/pg_upsert/models.py
def to_json(self, indent: int = 2) -> str:
    """Serialize to a JSON string."""
    return json.dumps(self.to_dict(), indent=indent)

export_failures

export_failures(directory: str | Path, fmt: str = 'csv') -> Path | None

Export QA violations as a "fix sheet" grouped by table.

Writes one row per unique violating staging row to directory, with an _issues column summarising every problem found on that row. Format is selected by fmt (csv, json, or xlsx); see :func:pg_upsert.export.export_failures for the exact file layout per format.

Returns the directory written, or None if there are no exportable violations.

Source code in src/pg_upsert/models.py
def export_failures(self, directory: str | Path, fmt: str = "csv") -> Path | None:
    """Export QA violations as a "fix sheet" grouped by table.

    Writes one row per unique violating staging row to *directory*,
    with an ``_issues`` column summarising every problem found on
    that row.  Format is selected by *fmt* (``csv``, ``json``, or
    ``xlsx``); see :func:`pg_upsert.export.export_failures` for the
    exact file layout per format.

    Returns the directory written, or ``None`` if there are no
    exportable violations.
    """
    all_errors = [e for t in self.tables for e in t.qa_findings]
    from .export import export_failures

    return export_failures(all_errors, directory, fmt=fmt)

pg_upsert.TableResult dataclass

TableResult(table_name: str, rows_updated: int = 0, rows_inserted: int = 0, _qa_findings: list[QAError] = list())

Per-table result from a QA check or upsert operation.

Attributes:

Name Type Description
table_name str

The name of the table.

rows_updated int

Number of rows updated during the upsert.

rows_inserted int

Number of rows inserted during the upsert.

_qa_findings list[QAError]

Internal list of all QA findings (errors + warnings).

qa_errors property

qa_errors: list[QAError]

ERROR-level QA findings only — issues that blocked the upsert.

qa_warnings property

qa_warnings: list[QAError]

WARNING-level QA findings only — informational, did not block.

qa_findings property

qa_findings: list[QAError]

All QA findings (errors and warnings combined).

qa_passed property

qa_passed: bool

True if no ERROR-level QA findings exist for this table.

Warnings do not count as failures.

QA Error Models

Every QA check produces QAError instances. When --export-failures is active, each error is also populated with per-row RowViolation objects (for data checks) or SchemaIssue objects (for column existence and type mismatch checks). These feed the fix-sheet exporter.

pg_upsert.QASeverity

Bases: Enum

Severity level for a QA finding.

ERROR findings block the upsert pipeline — qa_passed is set to False when any ERROR-severity finding is present, and commit() / upsert_all() will refuse to proceed.

WARNING findings are informational only. They are displayed (yellow ) but do not set qa_passed to False and do not block the upsert. Use qa_warnings or qa_findings to access them.

pg_upsert.QAError dataclass

QAError(table: str, check_type: QACheckType, details: str, severity: QASeverity = ERROR, violations: list[RowViolation] = list(), schema_issues: list[SchemaIssue] = list())

A single QA check finding.

Attributes:

Name Type Description
table str

The table where the error was found.

check_type QACheckType

The type of QA check that produced this error.

details str

Human-readable error summary, e.g. "genre (3)" for 3 null values in the genre column.

violations list[RowViolation]

Per-row violations captured when --export-failures is active. Used by the export module to build fix sheets. Excluded from :meth:to_dict to keep the --output json API stable.

schema_issues list[SchemaIssue]

For column-existence and type-mismatch checks, structured metadata that the export module writes to the _schema output. Excluded from :meth:to_dict.

pg_upsert.QACheckType

Bases: Enum

Types of QA checks performed on staging data.

pg_upsert.RowViolation dataclass

RowViolation(pk_values: tuple, row_data: dict[str, Any], issue_type: str, pk_columns: list[str] = list(), issue_column: str | None = None, constraint_name: str | None = None, description: str = '')

One problem detected on one staging row.

Multiple violations may reference the same staging row (same pk_values); :mod:pg_upsert.export deduplicates and merges them into a single fix-sheet entry per row.

Attributes:

Name Type Description
pk_values tuple

Primary key tuple for this staging row. Used as the dedup key when building the fix sheet. Tables without a PK fall back to a tuple of all column values.

pk_columns list[str]

PK column names in declared order, parallel to pk_values. Empty for tables with no primary key. Used by the export layer to sort the fix sheet by PK.

row_data dict[str, Any]

Full staging row contents as a column -> value dict.

issue_type str

Short identifier — "null", "pk", "fk", "unique", or "ck".

issue_column str | None

For NULL/FK/UNIQUE, the column (or comma-joined columns) responsible for the violation.

constraint_name str | None

For PK/FK/UNIQUE/CK, the constraint that failed.

description str

Human-readable phrase used in the fix sheet's _issues column, e.g. "NULL in 'genre'".

pg_upsert.SchemaIssue dataclass

SchemaIssue(check_type: str, column_name: str, staging_type: str | None = None, base_type: str | None = None, description: str = '')

One schema-level problem detected by column existence / type checks.

Schema issues have no row data — they describe a structural mismatch between the staging and base tables. They are written to a dedicated _schema output separate from the row-level fix sheets.

Attributes:

Name Type Description
check_type str

"column" (missing) or "type" (mismatch).

column_name str

Column with the issue.

staging_type str | None

Staging type (type mismatch only).

base_type str | None

Base type (type mismatch only).

description str

Human-readable description.

Pipeline Callbacks and Context

CheckContext is passed through QA check methods so display and callback hooks can report progress. CallbackEvent, PipelineEvent, and PipelineCallback form the callback protocol used by PgUpsert(callback=...).

pg_upsert.CheckContext dataclass

CheckContext(table_num: int, total_tables: int)

Progress context passed through QA check methods to display functions.

Attributes:

Name Type Description
table_num int

Current table number (1-based).

total_tables int

Total number of tables being checked.

pg_upsert.CallbackEvent

Bases: Enum

Events fired during the pg-upsert pipeline.

pg_upsert.PipelineEvent dataclass

PipelineEvent(event: CallbackEvent, table: str, qa_passed: bool | None = None, rows_updated: int = 0, rows_inserted: int = 0, qa_findings: list[QAError] = list())

Data passed to the pipeline callback at each event.

Attributes:

Name Type Description
event CallbackEvent

The type of event.

table str

The table name this event relates to.

qa_passed bool | None

Whether QA passed for this table (None if not yet determined). True when only WARNING-level findings exist; False only when ERROR-level findings are present.

rows_updated int

Rows updated (0 if not applicable yet).

rows_inserted int

Rows inserted (0 if not applicable yet).

qa_findings list[QAError]

All QA findings (errors + warnings) for this table.

qa_errors property

qa_errors: list[QAError]

ERROR-level QA findings only.

qa_warnings property

qa_warnings: list[QAError]

WARNING-level QA findings only.

pg_upsert.PipelineCallback module-attribute

PipelineCallback = Callable[[PipelineEvent], bool | None]

Callback type for pipeline events.

Return False to abort the pipeline (triggers rollback). Return True or None to continue.

Database Connection

pg_upsert.PostgresDB

PostgresDB(uri: None | str = None, conn: None | connection = None, encoding: str = 'utf-8', **kwargs)

Base database object for connecting and executing SQL queries on a PostgreSQL database.

Parameters:

Name Type Description Default
conn connection

An existing connection object to a PostgreSQL database.

None
uri str

A connection URI for a PostgreSQL database.

None
encoding str

The encoding to use for the database connection.

'utf-8'
**kwargs

Additional keyword arguments passed to psycopg2.connect().

{}

Returns:

Name Type Description
PostgresDB

A new PostgresDB object for connecting to a PostgreSQL database and executing queries.

Raises:

Type Description
AttributeError

If neither a connection URI nor an existing connection object is provided.

Error

If an error occurs while connecting to the database or executing a query.

Source code in src/pg_upsert/postgres.py
def __init__(
    self,
    uri: None | str = None,
    conn: None | psycopg2.extensions.connection = None,
    encoding: str = "utf-8",
    **kwargs,
):
    if conn is None and uri is None:
        raise AttributeError(
            "Either a connection URI or an existing connection object must be provided.",
        )
    if conn and uri:
        logger.warning(
            "Connection URI ignored as an existing connection object is provided.",
        )
        uri = None
    # If a URI is supplied, extract the password separately to avoid storing
    # it in the dsn (which would be exposed via conn.dsn / repr).
    self._password: str | None = None
    self._sanitized_uri: str | None = None
    self._connect_uri: str | None = None  # full URI with password for reconnection
    self._owns_connection = conn is None  # only manage connections we created
    if uri:
        uri, self._password, self._sanitized_uri = self._extract_password(uri)
        self._connect_uri = uri
    self.conn = conn or psycopg2.connect(uri, **kwargs)
    self.encoding = encoding
    self.in_transaction = False
    self._in_savepoint = False
    self.kwargs = kwargs
    if not self._is_valid_connection():
        raise psycopg2.Error(f"Error connecting to {self.conn.dsn}")

__repr__

__repr__() -> str

Return a string representation of the object.

Source code in src/pg_upsert/postgres.py
def __repr__(self) -> str:
    """Return a string representation of the object."""
    params = self.conn.get_dsn_parameters() if self.conn else "No connection"
    return f"{self.__class__.__name__}({params})"

__del__

__del__()

Ensure the database connection is closed when the object is deleted, if open.

Source code in src/pg_upsert/postgres.py
def __del__(self):
    """Ensure the database connection is closed when the object is deleted, if open."""
    if getattr(self, "_owns_connection", False) and hasattr(self, "conn") and self.conn and not self.conn.closed:
        self.close()

open_db

open_db() -> None

Ensure the database connection is open.

Reconnects using the original URI (stored at init time) if the connection was created by this instance. External connections (passed via conn=) cannot be reopened.

Source code in src/pg_upsert/postgres.py
def open_db(self) -> None:
    """Ensure the database connection is open.

    Reconnects using the original URI (stored at init time) if the
    connection was created by this instance.  External connections
    (passed via ``conn=``) cannot be reopened.
    """
    if not self.conn or self.conn.closed:
        logger.debug("Opening database connection.")
        if self._connect_uri:
            self.conn = psycopg2.connect(self._connect_uri, **self.kwargs)
        elif not self._owns_connection and self.conn:
            # External connection — try DSN (may lack password).
            self.conn = psycopg2.connect(self.conn.dsn, **self.kwargs)
        else:
            raise psycopg2.OperationalError(
                "Cannot reopen connection: no stored credentials. "
                "Connections passed via conn= cannot be automatically reopened.",
            )
        self.in_transaction = False
        self.conn.set_client_encoding(self.encoding)
        self.conn.set_session(autocommit=False)

cursor

cursor()

Return a cursor for executing database queries.

Source code in src/pg_upsert/postgres.py
def cursor(self):
    """Return a cursor for executing database queries."""
    self.open_db()
    return self.conn.cursor(cursor_factory=DictCursor)

close

close() -> None

Close the database connection if open and owned by this instance.

Connections provided externally via conn= are never closed — the caller retains ownership and is responsible for closing them.

Source code in src/pg_upsert/postgres.py
def close(self) -> None:
    """Close the database connection if open and owned by this instance.

    Connections provided externally via ``conn=`` are never closed —
    the caller retains ownership and is responsible for closing them.
    """
    if not self._owns_connection:
        return
    if self.conn and not self.conn.closed:
        self.rollback()
        self.conn.close()

commit

commit() -> None

Commit the current transaction.

Source code in src/pg_upsert/postgres.py
def commit(self) -> None:
    """Commit the current transaction."""
    if self.conn and self.in_transaction:
        self.conn.commit()
        self.in_transaction = False

rollback

rollback() -> None

Rollback the current transaction.

Source code in src/pg_upsert/postgres.py
def rollback(self) -> None:
    """Rollback the current transaction."""
    if self.conn and self.in_transaction:
        self.conn.rollback()
        self.in_transaction = False

execute

execute(sql: str | Composable, params=None) -> cursor

A shortcut to self.cursor().execute() that handles encoding.

Handles insert, updates, deletes

Parameters:

Name Type Description Default
sql str | Composable

The SQL query to execute. Accepts a str or Composable object.

required
params tuple

A tuple of parameters to pass to the query. Note that a Composable object should not have parameters passed separately. Default is None.

None

Returns: psycopg2.extensions.cursor: A cursor object for the executed query.

Source code in src/pg_upsert/postgres.py
def execute(
    self: PostgresDB,
    sql: str | Composable,
    params=None,
) -> psycopg2.extensions.cursor:
    """A shortcut to self.cursor().execute() that handles encoding.

    Handles insert, updates, deletes

    Args:
        sql (str | psycopg2.sql.Composable): The SQL query to execute. Accepts a `str` or `Composable` object.
        params (tuple, optional): A tuple of parameters to pass to the query.
            Note that a `Composable` object should not have parameters passed separately. Default is None.
    Returns:
        psycopg2.extensions.cursor: A cursor object for the executed query.
    """
    self.in_transaction = True
    try:
        curs = self.cursor()
        if isinstance(sql, Composable):
            logger.debug(f"\n{sql.as_string(curs)}")
            curs.execute(sql)
        else:
            if params is None:
                logger.debug(f"\n{sql}")
                curs.execute(sql.encode(self.encoding))
            else:
                logger.debug(f"\nSQL:\n{sql}\nParameters:\n{params}")
                curs.execute(sql.encode(self.encoding), params)
    except psycopg2.Error:
        if not self._in_savepoint:
            self.rollback()
        raise
    return curs

rowdict

rowdict(sql: str | Composable, params=None) -> tuple

Convert a cursor object to an iterable that yields dictionaries of row data.

yields dictionaries of row data with the following structure

0) dict_row (iterator) - an iterator that yields dictionaries of row data 1) headers (list) - a list of column names 2) rowcount (int) - the number of rows returned by the query

Source code in src/pg_upsert/postgres.py
def rowdict(self: PostgresDB, sql: str | Composable, params=None) -> tuple:
    """Convert a cursor object to an iterable that yields dictionaries of row data.

    yields dictionaries of row data with the following structure:
        0) dict_row (iterator) - an iterator that yields dictionaries of row data
        1) headers (list) - a list of column names
        2) rowcount (int) - the number of rows returned by the query
    """
    curs = self.execute(sql, params)
    if not curs.description:
        # No data returned
        return (iter([]), [], 0)
    headers = [d[0] for d in curs.description]

    def dict_row():
        """Convert a data row to a dictionary."""
        row = curs.fetchone()
        if row:
            r = [(c.decode(self.encoding, "backslashreplace") if isinstance(c, bytes) else c) for c in row]
            return dict(zip(headers, r, strict=True))
        return None

    return (iter(dict_row, None), headers, curs.rowcount)

Exceptions

pg_upsert.UserCancelledError

Bases: Exception

Raised when the user cancels an interactive operation.