Skip to content

Fix: psycopg Not Working — psycopg2 to psycopg3 Migration, Connection Pool, and Async Errors

FixDevs · (Updated: )

Part of:  Python Errors

Quick Answer

How to fix psycopg errors — psycopg2 to psycopg 3 import migration, connection pool setup, row factory tuple vs dict, COPY protocol changes, async psycopg pool, server-side cursor confusion, and binary mode performance.

The Error

You upgrade and old psycopg2 imports break:

import psycopg2
from psycopg2.extras import RealDictCursor   # Different in psycopg 3

Or you migrate to psycopg 3 and the API has changed:

import psycopg
conn = psycopg.connect("postgresql://...")
cur = conn.cursor()
cur.execute("SELECT * FROM users")
rows = cur.fetchall()
# Returns list of tuples — used to be RealDictRow in psycopg2 with cursor_factory

Or your async code fails because you used the wrong import:

import psycopg
conn = await psycopg.connect("postgresql://...")
# AttributeError: object NoneType has no attribute __aenter__

Or the pool exhausts under load:

psycopg_pool.PoolTimeout: couldn't get a connection after 30.0 sec

Or COPY syntax from psycopg2 no longer works:

cur.copy_from(file, "users")   # AttributeError in psycopg 3

psycopg is the most widely-used PostgreSQL driver for Python. psycopg2 has been the standard since 2007; psycopg 3 (released October 2021) is a complete rewrite with native async support, better performance, and a cleaner API. Many tutorials still use psycopg2, and the migration has specific patterns developers need to learn. This guide covers both psycopg2 quirks and the v2 → v3 migration.

Why This Happens

psycopg2 and psycopg 3 are different packages — import psycopg2 and import psycopg resolve to different libraries. They co-exist, so you can install both. The APIs overlap substantially but differ on row formats, async support, connection pools, and COPY semantics.

psycopg 3 introduced first-class async support with AsyncConnection — distinct from the sync Connection. Mixing them produces confusing errors because the method signatures look identical until you await.

In Production: Incident Lens

psycopg 3’s async pool failures show up as connection lifecycle leaks — connections are checked out but never returned, the pool gradually drains, and eventually PoolTimeout fires across every request path. The blast radius is all DB writes block: read replicas might absorb some load if you’ve configured them, but writes funnel through the primary pool and stop entirely. Unlike asyncpg’s instant-failure mode, psycopg’s leaks are often slow burns — the pool degrades over hours, p99 latency creeps up, and the on-call wakes up when the alarm finally fires at 4 AM.

Monitoring signals. AsyncConnectionPool exposes get_stats() — export pool_available, pool_size, requests_waiting, and requests_errors to your metrics system on a 10s cadence. The leading indicator is requests_waiting > 0 for sustained periods; in a healthy pool that gauge spikes during traffic bursts but returns to 0 immediately. PostgreSQL-side, watch pg_stat_activity for connections in state idle in transaction longer than 60s, and for the count of connections grouped by application_name creeping upward — every active pool’s connections should be roughly constant in steady state. A useful query for diagnosis:

SELECT application_name, state, COUNT(*), MAX(now() - query_start) AS oldest
FROM pg_stat_activity
WHERE application_name LIKE 'myapp%'
GROUP BY application_name, state;

If oldest for idle in transaction keeps growing, you have a leak.

Blast radius and triage. Pool exhaustion takes down everything that writes to the primary: order processing, auth state updates, queue consumers committing offsets, even background workers that “should be” async-safe. Read paths that bypass the pool (replica routing, cached reads) stay up but quickly diverge from the stuck primary. If your service is behind a load balancer with health checks that hit the database, expect the autoscaler to also kick in and add pods — each one opens its own pool, each one fails identically. Cap the replica count during recovery so you don’t blow past max_connections on the primary.

Recovery sequence. First, identify the leak shape. If it’s a bare pool.connection() without async with, every exception in the request handler abandons the connection. If it’s a Jsonb insert failing repeatedly and the surrounding code doesn’t roll back, you accumulate idle in transaction rows. If it’s LISTEN/NOTIFY code that captured a pool connection and never returned it (LISTEN needs a dedicated connection, not a pooled one), that one connection is permanently lost. Mitigate immediately by force-closing leaked backends:

SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE state = 'idle in transaction'
  AND now() - query_start > interval '60 seconds';

Then ship a code fix that wraps every acquire in async with and audits LISTEN connections to use a dedicated, non-pooled AsyncConnection.

Postmortem preventives. Set max_idle and max_lifetime on the pool so stale connections recycle instead of accumulating slow leaks. Run a PostgreSQL-side idle_in_transaction_session_timeout so the server kills runaway sessions even if the application misbehaves. Add a startup probe that calls pool.get_stats() and refuses readiness if the pool isn’t healthy — see Kubernetes CrashLoopBackOff for how pods recover when readiness gates correctly. For tracing async DB calls so leaks are observable per-endpoint, see Sentry not working for the integration patterns that propagate context across await points.

Fix 1: psycopg2 vs psycopg 3 — Which to Use

psycopg2 (mature, sync-only):

pip install psycopg2-binary   # Pre-built wheels (recommended for dev)
# or
pip install psycopg2           # Build from source (requires PostgreSQL dev headers)
import psycopg2
from psycopg2.extras import RealDictCursor

conn = psycopg2.connect("postgresql://user:pass@localhost/mydb")
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("SELECT id, name FROM users")
rows = cur.fetchall()
# [{"id": 1, "name": "Alice"}, ...]

psycopg 3 (modern, sync + async):

pip install "psycopg[binary]"        # Pre-built (recommended)
# or
pip install "psycopg[binary,pool]"   # With connection pool
import psycopg
from psycopg.rows import dict_row

with psycopg.connect("postgresql://user:pass@localhost/mydb") as conn:
    with conn.cursor(row_factory=dict_row) as cur:
        cur.execute("SELECT id, name FROM users")
        rows = cur.fetchall()
        # [{"id": 1, "name": "Alice"}, ...]

Use psycopg 3 for new projects. Faster, async support, better type coverage, actively developed. psycopg2 is in maintenance mode — fine for existing apps, but new development should use psycopg 3.

Common Mistake: Installing both psycopg2 and psycopg and then mixing imports across files. Pick one. Mixing creates connection pool incompatibility, doubles your dependency footprint, and confuses static analysis tools that look at import patterns.

Fix 2: Migration from psycopg2 to psycopg 3

Most code translates with simple search/replace:

# OLD (psycopg2)
import psycopg2
from psycopg2.extras import RealDictCursor
import psycopg2.errors

conn = psycopg2.connect("postgresql://...")
try:
    cur = conn.cursor(cursor_factory=RealDictCursor)
    cur.execute("SELECT * FROM users WHERE id = %s", (1,))
    rows = cur.fetchall()
except psycopg2.errors.UniqueViolation:
    ...
finally:
    cur.close()
    conn.close()

# NEW (psycopg 3)
import psycopg
from psycopg.rows import dict_row
import psycopg.errors

with psycopg.connect("postgresql://...") as conn:
    with conn.cursor(row_factory=dict_row) as cur:
        try:
            cur.execute("SELECT * FROM users WHERE id = %s", (1,))
            rows = cur.fetchall()
        except psycopg.errors.UniqueViolation:
            ...

Key changes:

psycopg2psycopg 3
from psycopg2.extras import RealDictCursorfrom psycopg.rows import dict_row
cursor(cursor_factory=RealDictCursor)cursor(row_factory=dict_row)
psycopg2.connect(...)psycopg.connect(...)
psycopg2.errors.XYZpsycopg.errors.XYZ (mostly same names)
cur.copy_from(file, "table")with cur.copy("COPY table FROM STDIN") as copy:
cur.copy_to(file, "table")with cur.copy("COPY table TO STDOUT") as copy:
cur.mogrify(sql, params)cur.mogrify(sql, params) (same)

Row factories in psycopg 3:

from psycopg.rows import tuple_row, dict_row, namedtuple_row, class_row

# Tuple (default)
with conn.cursor() as cur:
    cur.execute("SELECT id, name FROM users")
    rows = cur.fetchall()   # [(1, "Alice"), (2, "Bob")]

# Dict
with conn.cursor(row_factory=dict_row) as cur:
    cur.execute("SELECT id, name FROM users")
    rows = cur.fetchall()   # [{"id": 1, "name": "Alice"}, ...]

# Namedtuple
with conn.cursor(row_factory=namedtuple_row) as cur:
    cur.execute("SELECT id, name FROM users")
    for row in cur:
        print(row.id, row.name)

# Custom class
from dataclasses import dataclass

@dataclass
class User:
    id: int
    name: str

with conn.cursor(row_factory=class_row(User)) as cur:
    cur.execute("SELECT id, name FROM users")
    users = cur.fetchall()   # List[User]

When SQLAlchemy uses psycopg 3 as its driver (postgresql+psycopg://), psycopg’s row factory is bypassed — SQLAlchemy handles row construction itself, so registering dict_row at the psycopg layer has no effect on ORM results.

Fix 3: Connection Pools

pip install "psycopg[pool]"

Sync pool:

from psycopg_pool import ConnectionPool

pool = ConnectionPool(
    "postgresql://user:pass@localhost/mydb",
    min_size=5,
    max_size=20,
    timeout=30,          # Acquire timeout
    max_idle=600,         # Close idle connections after 10 min
    max_lifetime=3600,    # Recycle connections after 1 hour
)

with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users")
        rows = cur.fetchall()

pool.close()

Async pool:

from psycopg_pool import AsyncConnectionPool
import asyncio

async def main():
    async with AsyncConnectionPool(
        "postgresql://user:pass@localhost/mydb",
        min_size=5,
        max_size=20,
    ) as pool:
        async with pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT * FROM users")
                rows = await cur.fetchall()

asyncio.run(main())

Pool sizing — same as asyncpg’s rule of thumb: max_size = 2 × DB CPU cores. PostgreSQL max_connections (default 100) caps total connections across all apps.

Pro Tip: Open the pool once at app startup, not per request. For FastAPI:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from psycopg_pool import AsyncConnectionPool

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.db_pool = AsyncConnectionPool(
        "postgresql://...",
        min_size=5, max_size=20,
        open=False,
    )
    await app.state.db_pool.open()
    yield
    await app.state.db_pool.close()

app = FastAPI(lifespan=lifespan)

@app.get("/users")
async def list_users():
    async with app.state.db_pool.connection() as conn:
        async with conn.cursor(row_factory=dict_row) as cur:
            await cur.execute("SELECT id, name FROM users")
            return await cur.fetchall()

open=False lets the pool be constructed before the event loop is running; await pool.open() initializes it at the right time.

Fix 4: Sync vs Async API

# Sync
import psycopg

with psycopg.connect("postgresql://...") as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT 1")
        result = cur.fetchone()

# Async
import psycopg
import asyncio

async def main():
    async with await psycopg.AsyncConnection.connect("postgresql://...") as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 1")
            result = await cur.fetchone()

asyncio.run(main())

Critical syntax difference for AsyncConnection:

# WRONG
async with psycopg.AsyncConnection.connect(...) as conn:   # No await
    ...

# CORRECT
async with await psycopg.AsyncConnection.connect(...) as conn:   # Note the double await
    ...

The connect() method returns a coroutine that resolves to an async context manager — you need both await (to get the connection) and async with (to manage its lifecycle).

Common Mistake: Using psycopg.connect() in async code and not awaiting. The sync connect returns immediately but blocks the event loop while doing the actual connection. Always use AsyncConnection.connect() (with awaits) in async code.

For asyncpg as a faster alternative to psycopg 3’s async support, see asyncpg not working.

Fix 5: COPY for Bulk Operations

PostgreSQL’s COPY is the fastest way to load bulk data. psycopg 3 redesigned the API:

# psycopg2 (OLD)
with open("data.csv") as f:
    cur.copy_from(f, "users", sep=",")
    cur.copy_expert("COPY users FROM STDIN WITH CSV HEADER", f)

# psycopg 3 (NEW)
with cur.copy("COPY users FROM STDIN WITH (FORMAT CSV, HEADER)") as copy:
    with open("data.csv", "rb") as f:
        for line in f:
            copy.write(line)

Or use the write_row method for structured data:

import psycopg

with psycopg.connect("postgresql://...") as conn:
    with conn.cursor() as cur:
        with cur.copy("COPY users (id, name, email) FROM STDIN") as copy:
            for row in records:
                copy.write_row(row)   # Pass a tuple per row

write_row handles escaping automatically — no need to format CSV manually.

Read with COPY:

with cur.copy("COPY users TO STDOUT WITH CSV HEADER") as copy:
    with open("users.csv", "wb") as f:
        for data in copy:
            f.write(data)

Binary mode for max speed:

with cur.copy("COPY users FROM STDIN WITH BINARY") as copy:
    copy.set_types(["int4", "text", "text"])
    for record in records:
        copy.write_row(record)

Binary mode skips text-format conversion overhead — significantly faster for large inserts but harder to debug. Use text mode while developing, binary mode for production hot paths.

Fix 6: Server-Side Cursors

# Default — client-side cursor (loads all results into memory)
with conn.cursor() as cur:
    cur.execute("SELECT * FROM huge_table")
    rows = cur.fetchall()   # Loads everything

# Server-side cursor (named) — streams results
with conn.cursor(name="streaming_cur") as cur:
    cur.execute("SELECT * FROM huge_table")
    for row in cur:
        process(row)   # Fetches in batches as you iterate

Named cursors are server-side — PostgreSQL keeps the result set on the server and streams rows on demand. Essential for tables with millions of rows that won’t fit in client memory.

itersize parameter for batched fetches:

with conn.cursor(name="cur") as cur:
    cur.itersize = 1000   # Fetch 1000 rows per network round trip
    cur.execute("SELECT * FROM huge_table")
    for row in cur:
        process(row)

Common Mistake: Using a server-side cursor outside a transaction. PostgreSQL named cursors only exist within a transaction — autocommit mode (no transaction) makes the cursor close immediately. Wrap server-side cursor work in with conn: or with conn.transaction():.

Fix 7: Type Adaptation and Custom Types

psycopg 3 has built-in support for many PostgreSQL types beyond the basics:

import psycopg
from psycopg.types.json import Jsonb
import json

conn = psycopg.connect("postgresql://...")

# JSON / JSONB — passed as dict, returned as dict
conn.execute("INSERT INTO events (data) VALUES (%s)", [Jsonb({"key": "value"})])
row = conn.execute("SELECT data FROM events WHERE id = 1").fetchone()
print(type(row[0]))   # dict

# UUID, date, datetime — returned as their Python equivalents
# (uuid.UUID, datetime.date, datetime.datetime)

# Arrays
conn.execute("INSERT INTO posts (tags) VALUES (%s)", [["python", "psycopg"]])
row = conn.execute("SELECT tags FROM posts WHERE id = 1").fetchone()
print(row[0])   # ['python', 'psycopg'] — Python list

Custom enum:

from enum import Enum

class Status(Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"

# Pass as string
conn.execute("INSERT INTO users (status) VALUES (%s)", [Status.ACTIVE.value])

# Or register adapter (advanced)
from psycopg.adapt import Loader, Dumper

class StatusDumper(Dumper):
    def dump(self, value):
        return value.value.encode("utf-8")

conn.adapters.register_dumper(Status, StatusDumper)

Pro Tip: For JSON columns, always use Jsonb (or Json) explicitly when inserting. Without the wrapper, psycopg sends the dict as Python’s repr — which isn’t valid JSON. Common silent bug; the row appears in the database with a malformed string instead of a parseable JSON value.

# WRONG — sends dict as Python repr, not JSON
conn.execute("INSERT INTO events (data) VALUES (%s)", [{"key": "value"}])
# Stored as: '{"key": "value"}' — actually fine for VARCHAR but wrong for JSONB

# CORRECT — explicit JSONB wrapper
from psycopg.types.json import Jsonb
conn.execute("INSERT INTO events (data) VALUES (%s)", [Jsonb({"key": "value"})])

Fix 8: Connection String and Authentication

# Various formats
conn = psycopg.connect("postgresql://user:pass@localhost:5432/mydb")
conn = psycopg.connect("postgres://user:pass@localhost:5432/mydb")
conn = psycopg.connect("host=localhost dbname=mydb user=postgres password=pass")
conn = psycopg.connect(
    host="localhost",
    dbname="mydb",
    user="postgres",
    password="pass",
    port=5432,
)

SSL:

conn = psycopg.connect(
    "postgresql://user:pass@host/db",
    sslmode="require",        # require, prefer, allow, disable, verify-ca, verify-full
    sslrootcert="/path/to/ca.pem",
)

Connection options:

conn = psycopg.connect(
    "postgresql://...",
    options="-c statement_timeout=30000",   # PG session config
    application_name="myapp",
    connect_timeout=10,
)

application_name shows in pg_stat_activity — invaluable for debugging which app is holding connections:

SELECT application_name, COUNT(*) FROM pg_stat_activity GROUP BY application_name;

Environment variables auto-read by psycopg (and libpq generally):

export PGHOST=localhost
export PGPORT=5432
export PGUSER=postgres
export PGPASSWORD=secret
export PGDATABASE=mydb

# Now this works without arguments
python -c "import psycopg; psycopg.connect()"

If connect() fails with Connection refused, the issue is below psycopg — usually pg_hba.conf rejecting the client IP, listen_addresses not including the interface, or a firewall blocking the port.

Still Not Working?

psycopg2 vs psycopg 3 vs asyncpg

  • psycopg2 — Mature, sync-only. Maintenance mode.
  • psycopg 3 — Sync + async, broader type support, modern API. Choose for new projects.
  • asyncpg — Fastest async option, lower-level API. Choose for performance-critical async paths.

For most projects, psycopg 3 is the best balance. For performance-critical async services hitting PostgreSQL hard, asyncpg edges it out.

Transactions

# Implicit transaction with `with conn`
with psycopg.connect("...") as conn:
    with conn.cursor() as cur:
        cur.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
        cur.execute("INSERT INTO orders (user_id) VALUES (currval('users_id_seq'))")
# Auto-commits on exit (or rolls back on exception)

# Explicit transaction control
conn.autocommit = False   # Default is False — txn until commit/rollback

with conn.cursor() as cur:
    cur.execute("...")

conn.commit()
# Or
conn.rollback()

# Autocommit mode (each statement in its own txn)
conn.autocommit = True

For pgvector integration — pgvector uses psycopg (2 or 3) as the standard driver and registers its own vector type adapter, so import pgvector.psycopg.register_vector(conn) after each new connection or via the pool’s configure hook.

Listen/Notify

import psycopg

with psycopg.connect("postgresql://...", autocommit=True) as conn:
    cur = conn.cursor()
    cur.execute("LISTEN my_channel")

    # Poll for notifications
    import select
    while True:
        ready = select.select([conn], [], [], 5)
        if not ready[0]:
            continue   # Timeout, loop again
        conn.poll()
        while conn.notifies:
            notify = conn.notifies.pop(0)
            print(f"Got: {notify.payload}")

Autocommit is required for LISTEN — notifications don’t deliver inside an open transaction.

Testing with Real PostgreSQL

import pytest
import psycopg

@pytest.fixture
def db_conn():
    conn = psycopg.connect("postgresql://localhost/test_db", autocommit=False)
    yield conn
    conn.rollback()
    conn.close()

def test_insert(db_conn):
    with db_conn.cursor() as cur:
        cur.execute("INSERT INTO users (name) VALUES (%s)", ["Test"])
        cur.execute("SELECT name FROM users WHERE name = %s", ["Test"])
        assert cur.fetchone()[0] == "Test"
# Each test rolls back, so DB stays clean

Wrapping every test in a transaction that rolls back at teardown is faster than truncating tables and works for both sync Connection and AsyncConnection fixtures.

Still Not Working? AsyncConnection.connect() and await confusion

You wrote async with psycopg.AsyncConnection.connect(dsn) as conn and got AttributeError: object NoneType has no attribute __aenter__. The connect() classmethod is itself a coroutine — you need async with await psycopg.AsyncConnection.connect(dsn) as conn. The pool API does not have this quirk (pool.connection() already returns an async context manager), so prefer the pool in production code and reserve direct AsyncConnection.connect() for short-lived scripts.

Still Not Working? LISTEN/NOTIFY works locally, fails through PgBouncer

PgBouncer in transaction-pooling mode breaks LISTEN because the listening connection is returned to the pool between commits. The notification then arrives on a different connection that nobody is reading. Either route LISTEN traffic to a dedicated PgBouncer pool in session mode, or bypass PgBouncer entirely for the listener service and connect directly to PostgreSQL.

Still Not Working? Jsonb insert fails with cannot adapt type 'datetime'

You’re inserting a Python dict that contains a datetime into a JSONB column. Jsonb uses json.dumps by default, which can’t serialize datetime. Pass a custom encoder: Jsonb(payload, dumps=lambda v: json.dumps(v, default=str)) for a quick fix, or use orjson (handles datetime natively) by registering a custom Jsonb subclass.

Combining with Alembic

Alembic uses psycopg (2 or 3) as its driver. To run migrations with psycopg 3:

# alembic.ini
sqlalchemy.url = postgresql+psycopg://user:pass@localhost/mydb

For Alembic-specific migration errors, see Alembic not working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles