API Reference¶
This page documents the public classes, dataclasses, and type aliases
exported by pg_upsert. Each entry below is generated from the module
docstrings — class names in the right-hand table of contents are
symbol headings generated by mkdocstrings (not plain markdown).
Main Entry Point¶
pg_upsert.PgUpsert ¶
PgUpsert(uri: None | str = None, conn: None | connection = None, encoding: str = 'utf-8', tables: list | tuple | None = (), staging_schema: str | None = None, base_schema: str | None = None, do_commit: bool = False, interactive: bool = False, upsert_method: str = 'upsert', exclude_cols: list | tuple | None = (), exclude_null_check_cols: list | tuple | None = (), control_table: str = 'ups_control', ui_mode: str = 'auto', compact: bool = False, callback: PipelineCallback | None = None, capture_detail_rows: bool = False, max_export_rows: int = 1000, strict_columns: bool = False)
Perform one or all of the following operations on a set of PostgreSQL tables:
- Perform QA checks on data in a staging table or set of staging tables. QA checks include not-null, primary key, foreign key, and check constraint checks.
- Perform updates and inserts (upserts) on a base table or set of base tables from the staging table(s) of the same name.
PgUpsert utilizes temporary tables and views inside the PostgreSQL database to dynamically generate SQL for QA checks and upserts. All temporary objects are initialized with the ups_ prefix.
The upsert process is transactional. If any part of the process fails, the transaction will be rolled back. Committing changes to the database is optional and can be controlled with the do_commit flag.
All SQL statements are generated using the psycopg2.sql module.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
str or None
|
Connection URI for the PostgreSQL database. Defaults to None. Note: If a connection URI is not provided, an existing connection object must be provided. |
None
|
conn
|
connection or None
|
An existing connection object to the PostgreSQL database. Defaults to None. Note: If a connection object is not provided, a connection URI must be provided. If both are provided, the connection object will be used. |
None
|
encoding
|
str
|
The encoding to use for the database connection. Defaults to "utf-8". |
'utf-8'
|
tables
|
list or tuple or None
|
List of table names to perform QA checks on and upsert. Defaults to (). |
()
|
staging_schema
|
str or None
|
Name of the staging schema where tables are located which will be used for QA checks and upserts. Tables in the staging schema must have the same name as the tables in the base schema that they will be upserted to. Defaults to None. |
None
|
base_schema
|
str or None
|
Name of the base schema where tables are located which will be updated or inserted into. Defaults to None. |
None
|
do_commit
|
bool
|
If True, changes will be committed to the database once the upsert process has completed successfully. If False, changes will be rolled back. Defaults to False. |
False
|
interactive
|
bool
|
If True, the user will be prompted with multiple dialogs to confirm various steps during the upsert process. If False, the upsert process will run without user intervention. Defaults to False. |
False
|
upsert_method
|
str
|
The method to use for upserting data. Must be one of "upsert", "update", or "insert". Defaults to "upsert". |
'upsert'
|
exclude_cols
|
list or tuple or None
|
List of column names to exclude from the upsert process. These columns will not be updated or inserted to, however, they will still be checked during the QA process. |
()
|
exclude_null_check_cols
|
list or tuple or None
|
List of column names to exclude from the not-null check during the QA process. You may wish to exclude certain columns from null checks, such as auto-generated timestamps or serial columns as they may not be populated until after records are inserted or updated. Defaults to (). |
()
|
control_table
|
str
|
Name of the temporary control table that will be used to track changes during the upsert process. Defaults to "ups_control". |
'ups_control'
|
ui_mode
|
str
|
Interactive UI backend to use when |
'auto'
|
compact
|
bool
|
If |
False
|
callback
|
PipelineCallback or None
|
Optional callable invoked at key pipeline events ( |
None
|
capture_detail_rows
|
bool
|
If |
False
|
max_export_rows
|
int
|
Maximum number of violating rows captured per check per table when |
1000
|
strict_columns
|
bool
|
If |
False
|
Example:
from pg_upsert import PgUpsert
ups = PgUpsert(
uri="postgresql://user@localhost:5432/database", # Note the missing password. pg_upsert will prompt for the password.
tables=("genres", "books", "publishers", "authors", "book_authors"),
staging_schema="staging",
base_schema="public",
do_commit=False,
upsert_method="upsert",
interactive=False,
exclude_cols=("rev_user", "rev_time", "created_at", "updated_at"),
exclude_null_check_cols=("rev_user", "rev_time", "created_at", "updated_at", "alias"),
)
Source code in src/pg_upsert/upsert.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | |
qa_errors
property
¶
qa_errors: list[QAError]
ERROR-level QA findings only — issues that blocked the upsert.
This is the backwards-compatible view: it contains the same items
that qa_errors contained before severity was introduced. Use
:attr:qa_findings for the full list (errors + warnings).
qa_warnings
property
¶
qa_warnings: list[QAError]
WARNING-level QA findings only — informational, did not block the upsert.
cleanup ¶
cleanup() -> PgUpsert
Drop all temporary tables and views created by the upsert pipeline.
Issues DROP ... IF EXISTS ... CASCADE for every known ups_*
temporary object, including the configurable control table. Safe to
call multiple times or when no temporary objects exist.
This is useful for long-lived connections where you want to reclaim temporary object space without closing the connection.
Returns:
| Type | Description |
|---|---|
PgUpsert
|
self, for method chaining. |
Source code in src/pg_upsert/upsert.py
show_control ¶
Display contents of the control table.
If the interactive flag is set to True, the control table will be displayed in a Tkinter window. Otherwise, the results will be logged.
The control table definition is as follows:
| column name | data type | required | description |
|---|---|---|---|
table_name |
text | yes | The name of the table to process. |
exclude_cols |
text | no | A comma-separated list of columns to exclude from the upsert process. |
exclude_null_checks |
text | no | A comma-separated list of columns to exclude from the not-null check during the QA process. |
interactive |
boolean | yes | A flag to indicate whether the QA and upsert processes should be interactive. |
null_errors |
text | no | A comma-separated list of columns with null values. |
pk_errors |
text | no | A comma-separated list of primary key errors. |
fk_errors |
text | no | A comma-separated list of foreign key errors. |
ck_errors |
text | no | A comma-separated list of check constraint errors. |
rows_updated |
integer | no | The number of rows updated during the upsert process. |
rows_inserted |
integer | no | The number of rows inserted during the upsert process. |
Source code in src/pg_upsert/upsert.py
qa_all ¶
qa_all() -> PgUpsert
Performs QA checks for nulls in non-null columns, for duplicated primary key values, for invalid foreign keys, and invalid check constraints in a set of staging tables to be loaded into base tables. If there are failures in the QA checks, loading is not attempted. If the loading step is carried out, it is done within a transaction.
The null_errors, pk_errors, fk_errors, ck_errors columns of the
control table will be updated to identify any errors that occur,
so that this information is available to the caller.
The rows_updated and rows_inserted columns of the control table
will be updated with counts of the number of rows affected by the
upsert operation for each table.
When the upsert operation updates the base table, all columns of the base table that are also in the staging table are updated. The update operation does not test to see if column contents are different, and so does not update only those values that are different.
This method runs PgUpsert methods in the following order:
Example:
PgUpsert(
uri="postgresql://user@localhost:5432/database",
tables=("genres", "books", "publishers", "authors", "book_authors"),
staging_schema="staging",
base_schema="public",
do_commit=False,
interactive=False,
exclude_cols=("rev_user", "rev_time", "created_at", "updated_at"),
exclude_null_check_cols=("rev_user", "rev_time", "created_at", "updated_at", "alias"),
).qa_all()
Source code in src/pg_upsert/upsert.py
qa_all_null ¶
qa_all_null() -> PgUpsert
Performs null checks for non-null columns in selected staging tables.
Source code in src/pg_upsert/upsert.py
qa_one_null ¶
qa_one_null(table: str) -> PgUpsert
Performs null checks for non-null columns in a single staging table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
The name of the staging table to check for null values. |
required |
Source code in src/pg_upsert/upsert.py
qa_all_pk ¶
qa_all_pk() -> PgUpsert
Performs primary key checks for duplicated primary key values in selected staging tables.
Source code in src/pg_upsert/upsert.py
qa_one_pk ¶
qa_one_pk(table: str) -> PgUpsert
Performs primary key checks for duplicated primary key values in a single staging table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
The name of the staging table to check for duplicate primary key values. |
required |
Source code in src/pg_upsert/upsert.py
qa_all_fk ¶
qa_all_fk() -> PgUpsert
Performs foreign key checks for invalid foreign key values in selected staging tables.
Source code in src/pg_upsert/upsert.py
qa_one_fk ¶
qa_one_fk(table: str) -> PgUpsert
Performs foreign key checks for invalid foreign key values in a single staging table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
The name of the staging table to check for invalid foreign key values. |
required |
Source code in src/pg_upsert/upsert.py
qa_all_ck ¶
qa_all_ck() -> PgUpsert
Performs check constraint checks for invalid check constraint values in selected staging tables.
Source code in src/pg_upsert/upsert.py
qa_one_ck ¶
qa_one_ck(table: str) -> PgUpsert
Performs check constraint checks for invalid check constraint values in a single staging table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
The name of the staging table to check for invalid check constraint values. |
required |
Source code in src/pg_upsert/upsert.py
qa_all_unique ¶
qa_all_unique() -> PgUpsert
Performs unique constraint checks on all selected staging tables.
Source code in src/pg_upsert/upsert.py
qa_one_unique ¶
qa_one_unique(table: str) -> PgUpsert
Performs unique constraint checks on a single staging table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
The name of the staging table to check. |
required |
Source code in src/pg_upsert/upsert.py
qa_column_existence ¶
qa_column_existence() -> PgUpsert
Checks that all base table columns exist in the staging tables.
Respects the exclude_cols setting — excluded columns are not flagged.
Source code in src/pg_upsert/upsert.py
qa_type_mismatch ¶
qa_type_mismatch() -> PgUpsert
Checks for hard type incompatibilities between staging and base columns.
Only flags mismatches where PostgreSQL has no implicit or assignment cast.
Source code in src/pg_upsert/upsert.py
upsert_all ¶
upsert_all() -> PgUpsert
Performs upsert operations on all selected tables in the base schema.
Objects created:
| table / view | description |
|---|---|
ups_dependencies |
Temporary table containing the dependencies of the base schema. |
ups_ordered_tables |
Temporary table containing the selected tables ordered by dependency. |
Source code in src/pg_upsert/upsert.py
upsert_one ¶
upsert_one(table: str) -> PgUpsert
Performs an upsert operation on a single table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
The name of the table to upsert. |
required |
Source code in src/pg_upsert/upsert.py
run ¶
run() -> UpsertResult
Run all QA checks and upsert operations.
This method runs PgUpsert methods in the following order:
Returns:
| Name | Type | Description |
|---|---|---|
UpsertResult |
UpsertResult
|
Structured result containing QA outcomes and row counts. |
Source code in src/pg_upsert/upsert.py
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 | |
commit ¶
commit() -> PgUpsert
Commits the transaction to the database and show a summary of changes.
Changes are committed if the following criteria are met:
- The
do_commitflag is set toTrue. - All QA checks have passed (i.e., the
qa_passedflag is set toTrue). - The summary of changes shows that rows have been updated or inserted.
- If the
interactiveflag is set toTrueand thedo_commitflag is is set toFalse, the user is prompted to commit the changes and the user selects "Continue".
If qa_passed is False, the transaction is rolled back and no
commit is attempted. This guards against accidentally persisting data
that failed QA checks when using the step-by-step API.
Source code in src/pg_upsert/upsert.py
Result Objects¶
These dataclasses are returned from PgUpsert.run() and its facade
methods. They provide structured access to QA outcomes and upsert
statistics, and are JSON-serialisable via to_dict() / to_json().
pg_upsert.UpsertResult
dataclass
¶
UpsertResult(tables: list[TableResult] = list(), committed: bool = False, staging_schema: str = '', base_schema: str = '', upsert_method: str = '', started_at: str = '', finished_at: str = '', duration_seconds: float = 0.0)
Structured result from a PgUpsert.run() call.
Provides programmatic access to QA results and upsert statistics for all tables processed.
Attributes:
| Name | Type | Description |
|---|---|---|
tables |
list[TableResult]
|
Per-table results. |
committed |
bool
|
Whether the transaction was committed. |
staging_schema |
str
|
Name of the staging schema. |
base_schema |
str
|
Name of the base schema. |
upsert_method |
str
|
The upsert method used (upsert, update, insert). |
started_at |
str
|
ISO 8601 timestamp when the run started. |
finished_at |
str
|
ISO 8601 timestamp when the run finished. |
duration_seconds |
float
|
Elapsed time in seconds. |
to_dict ¶
Serialize to a dictionary for JSON output.
Source code in src/pg_upsert/models.py
to_json ¶
export_failures ¶
Export QA violations as a "fix sheet" grouped by table.
Writes one row per unique violating staging row to directory,
with an _issues column summarising every problem found on
that row. Format is selected by fmt (csv, json, or
xlsx); see :func:pg_upsert.export.export_failures for the
exact file layout per format.
Returns the directory written, or None if there are no
exportable violations.
Source code in src/pg_upsert/models.py
pg_upsert.TableResult
dataclass
¶
TableResult(table_name: str, rows_updated: int = 0, rows_inserted: int = 0, _qa_findings: list[QAError] = list())
Per-table result from a QA check or upsert operation.
Attributes:
| Name | Type | Description |
|---|---|---|
table_name |
str
|
The name of the table. |
rows_updated |
int
|
Number of rows updated during the upsert. |
rows_inserted |
int
|
Number of rows inserted during the upsert. |
_qa_findings |
list[QAError]
|
Internal list of all QA findings (errors + warnings). |
qa_errors
property
¶
qa_errors: list[QAError]
ERROR-level QA findings only — issues that blocked the upsert.
qa_warnings
property
¶
qa_warnings: list[QAError]
WARNING-level QA findings only — informational, did not block.
qa_passed
property
¶
True if no ERROR-level QA findings exist for this table.
Warnings do not count as failures.
QA Error Models¶
Every QA check produces QAError instances. When --export-failures
is active, each error is also populated with per-row RowViolation
objects (for data checks) or SchemaIssue objects (for column
existence and type mismatch checks). These feed the fix-sheet exporter.
pg_upsert.QASeverity ¶
Bases: Enum
Severity level for a QA finding.
ERROR findings block the upsert pipeline — qa_passed is set to
False when any ERROR-severity finding is present, and commit() /
upsert_all() will refuse to proceed.
WARNING findings are informational only. They are displayed (yellow
⚠) but do not set qa_passed to False and do not block the
upsert. Use qa_warnings or qa_findings to access them.
pg_upsert.QAError
dataclass
¶
QAError(table: str, check_type: QACheckType, details: str, severity: QASeverity = ERROR, violations: list[RowViolation] = list(), schema_issues: list[SchemaIssue] = list())
A single QA check finding.
Attributes:
| Name | Type | Description |
|---|---|---|
table |
str
|
The table where the error was found. |
check_type |
QACheckType
|
The type of QA check that produced this error. |
details |
str
|
Human-readable error summary, e.g. |
violations |
list[RowViolation]
|
Per-row violations captured when |
schema_issues |
list[SchemaIssue]
|
For column-existence and type-mismatch checks,
structured metadata that the export module writes to the
|
pg_upsert.QACheckType ¶
Bases: Enum
Types of QA checks performed on staging data.
pg_upsert.RowViolation
dataclass
¶
RowViolation(pk_values: tuple, row_data: dict[str, Any], issue_type: str, pk_columns: list[str] = list(), issue_column: str | None = None, constraint_name: str | None = None, description: str = '')
One problem detected on one staging row.
Multiple violations may reference the same staging row (same
pk_values); :mod:pg_upsert.export deduplicates and merges
them into a single fix-sheet entry per row.
Attributes:
| Name | Type | Description |
|---|---|---|
pk_values |
tuple
|
Primary key tuple for this staging row. Used as the dedup key when building the fix sheet. Tables without a PK fall back to a tuple of all column values. |
pk_columns |
list[str]
|
PK column names in declared order, parallel to
|
row_data |
dict[str, Any]
|
Full staging row contents as a column -> value dict. |
issue_type |
str
|
Short identifier — |
issue_column |
str | None
|
For NULL/FK/UNIQUE, the column (or comma-joined columns) responsible for the violation. |
constraint_name |
str | None
|
For PK/FK/UNIQUE/CK, the constraint that failed. |
description |
str
|
Human-readable phrase used in the fix sheet's
|
pg_upsert.SchemaIssue
dataclass
¶
SchemaIssue(check_type: str, column_name: str, staging_type: str | None = None, base_type: str | None = None, description: str = '')
One schema-level problem detected by column existence / type checks.
Schema issues have no row data — they describe a structural mismatch
between the staging and base tables. They are written to a dedicated
_schema output separate from the row-level fix sheets.
Attributes:
| Name | Type | Description |
|---|---|---|
check_type |
str
|
|
column_name |
str
|
Column with the issue. |
staging_type |
str | None
|
Staging type (type mismatch only). |
base_type |
str | None
|
Base type (type mismatch only). |
description |
str
|
Human-readable description. |
Pipeline Callbacks and Context¶
CheckContext is passed through QA check methods so display and
callback hooks can report progress. CallbackEvent, PipelineEvent,
and PipelineCallback form the callback protocol used by
PgUpsert(callback=...).
pg_upsert.CheckContext
dataclass
¶
Progress context passed through QA check methods to display functions.
Attributes:
| Name | Type | Description |
|---|---|---|
table_num |
int
|
Current table number (1-based). |
total_tables |
int
|
Total number of tables being checked. |
pg_upsert.CallbackEvent ¶
Bases: Enum
Events fired during the pg-upsert pipeline.
pg_upsert.PipelineEvent
dataclass
¶
PipelineEvent(event: CallbackEvent, table: str, qa_passed: bool | None = None, rows_updated: int = 0, rows_inserted: int = 0, qa_findings: list[QAError] = list())
Data passed to the pipeline callback at each event.
Attributes:
| Name | Type | Description |
|---|---|---|
event |
CallbackEvent
|
The type of event. |
table |
str
|
The table name this event relates to. |
qa_passed |
bool | None
|
Whether QA passed for this table ( |
rows_updated |
int
|
Rows updated (0 if not applicable yet). |
rows_inserted |
int
|
Rows inserted (0 if not applicable yet). |
qa_findings |
list[QAError]
|
All QA findings (errors + warnings) for this table. |
pg_upsert.PipelineCallback
module-attribute
¶
PipelineCallback = Callable[[PipelineEvent], bool | None]
Callback type for pipeline events.
Return False to abort the pipeline (triggers rollback).
Return True or None to continue.
Database Connection¶
pg_upsert.PostgresDB ¶
PostgresDB(uri: None | str = None, conn: None | connection = None, encoding: str = 'utf-8', **kwargs)
Base database object for connecting and executing SQL queries on a PostgreSQL database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn
|
connection
|
An existing connection object to a PostgreSQL database. |
None
|
uri
|
str
|
A connection URI for a PostgreSQL database. |
None
|
encoding
|
str
|
The encoding to use for the database connection. |
'utf-8'
|
**kwargs
|
Additional keyword arguments passed to |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
PostgresDB |
A new PostgresDB object for connecting to a PostgreSQL database and executing queries. |
Raises:
| Type | Description |
|---|---|
AttributeError
|
If neither a connection URI nor an existing connection object is provided. |
Error
|
If an error occurs while connecting to the database or executing a query. |
Source code in src/pg_upsert/postgres.py
__repr__ ¶
__del__ ¶
Ensure the database connection is closed when the object is deleted, if open.
open_db ¶
Ensure the database connection is open.
Reconnects using the original URI (stored at init time) if the
connection was created by this instance. External connections
(passed via conn=) cannot be reopened.
Source code in src/pg_upsert/postgres.py
cursor ¶
close ¶
Close the database connection if open and owned by this instance.
Connections provided externally via conn= are never closed —
the caller retains ownership and is responsible for closing them.
Source code in src/pg_upsert/postgres.py
commit ¶
rollback ¶
execute ¶
A shortcut to self.cursor().execute() that handles encoding.
Handles insert, updates, deletes
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
str | Composable
|
The SQL query to execute. Accepts a |
required |
params
|
tuple
|
A tuple of parameters to pass to the query.
Note that a |
None
|
Returns: psycopg2.extensions.cursor: A cursor object for the executed query.
Source code in src/pg_upsert/postgres.py
rowdict ¶
Convert a cursor object to an iterable that yields dictionaries of row data.
yields dictionaries of row data with the following structure
0) dict_row (iterator) - an iterator that yields dictionaries of row data 1) headers (list) - a list of column names 2) rowcount (int) - the number of rows returned by the query
Source code in src/pg_upsert/postgres.py
Exceptions¶
pg_upsert.UserCancelledError ¶
Bases: Exception
Raised when the user cancels an interactive operation.