Skip to content

Fix: Python contextmanager Not Working — GeneratorExit, Missing yield, or Cleanup Not Running

FixDevs ·

Quick Answer

How to fix Python context manager issues — @contextmanager generator, __enter__ and __exit__, exception handling inside with blocks, async context managers, and common pitfalls.

The Problem

A @contextmanager decorator raises GeneratorExit or doesn’t run cleanup code:

from contextlib import contextmanager

@contextmanager
def open_resource():
    resource = acquire_resource()
    yield resource
    # This never runs if an exception occurs in the with block
    release_resource(resource)

with open_resource() as r:
    do_work(r)
    raise ValueError("Something went wrong")
# resource is never released!

Or a class-based context manager raises AttributeError:

class DatabaseConnection:
    def __init__(self, url):
        self.url = url

    def connect(self):
        self.conn = create_connection(self.url)

# AttributeError: __enter__
with DatabaseConnection('postgresql://...') as conn:
    conn.execute("SELECT 1")

Or an async context manager doesn’t work with async with:

# TypeError: 'async_generator' object does not support the asynchronous context manager protocol
async with my_context() as value:
    pass

Why This Happens

Context managers have strict protocols that must be followed exactly:

  • @contextmanager doesn’t handle exceptions by default — if an exception is raised inside the with block, it’s thrown into the generator at the yield point. Unless you wrap yield in try/finally, cleanup code after yield doesn’t run on exceptions.
  • Class-based context managers need __enter__ and __exit__ methods — without both, the with statement fails with AttributeError.
  • __exit__ return value controls exception propagation — returning True from __exit__ suppresses the exception. Returning None (the default) lets it propagate. This is a common source of unexpected swallowed exceptions.
  • Async context managers require __aenter__/__aexit__ — regular context managers don’t work with async with. Use @asynccontextmanager from contextlib.

Fix 1: Use try/finally in @contextmanager

Always wrap yield in try/finally to ensure cleanup runs even when exceptions occur:

from contextlib import contextmanager

# WRONG — cleanup doesn't run on exceptions
@contextmanager
def managed_resource():
    resource = acquire()
    yield resource
    release(resource)  # Skipped if exception is raised

# CORRECT — try/finally ensures cleanup
@contextmanager
def managed_resource():
    resource = acquire()
    try:
        yield resource
    finally:
        release(resource)  # Always runs

# Example: managed file with custom logic
@contextmanager
def temp_directory():
    import tempfile
    import shutil
    tmpdir = tempfile.mkdtemp()
    try:
        yield tmpdir
    finally:
        shutil.rmtree(tmpdir, ignore_errors=True)

with temp_directory() as tmpdir:
    # Work with tmpdir
    with open(f"{tmpdir}/file.txt", "w") as f:
        f.write("data")
# tmpdir is deleted even if an exception occurs above

Handling specific exceptions inside the context manager:

@contextmanager
def transaction(db):
    """Context manager that commits on success and rolls back on error."""
    tx = db.begin()
    try:
        yield tx
    except Exception:
        tx.rollback()
        raise  # Re-raise — don't swallow the exception
    else:
        tx.commit()  # Only runs if no exception
    finally:
        tx.close()   # Always runs

Fix 2: Implement Class-Based Context Managers

For complex context managers, use a class with __enter__ and __exit__:

class DatabaseConnection:
    def __init__(self, url: str):
        self.url = url
        self.conn = None

    def __enter__(self):
        # Called when entering the 'with' block
        # Return value is assigned to 'as' variable
        self.conn = create_connection(self.url)
        return self.conn  # or return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # Called when leaving the 'with' block
        # exc_type, exc_val, exc_tb are None if no exception occurred
        if self.conn:
            if exc_type is not None:
                self.conn.rollback()
            else:
                self.conn.commit()
            self.conn.close()

        # Return True to suppress the exception
        # Return False (or None) to let it propagate
        return False

# Usage
with DatabaseConnection('postgresql://localhost/mydb') as conn:
    conn.execute("INSERT INTO users VALUES ('alice')")
# conn.commit() and conn.close() called automatically

__exit__ exception parameters:

def __exit__(self, exc_type, exc_val, exc_tb):
    # exc_type: the exception class (e.g., ValueError) or None
    # exc_val:  the exception instance or None
    # exc_tb:   the traceback object or None

    if exc_type is None:
        # No exception — normal exit
        self.commit()
    elif issubclass(exc_type, KeyboardInterrupt):
        # Don't suppress Ctrl+C
        self.rollback()
        return False  # Let KeyboardInterrupt propagate
    else:
        # Other exceptions — rollback and let them propagate
        self.rollback()
        return False  # False = don't suppress

    return True  # Suppress only if we explicitly decide to

Fix 3: Reuse Context Managers with contextlib

contextlib provides utilities for building context managers:

from contextlib import contextmanager, suppress, nullcontext, ExitStack

# suppress — silently ignore specific exceptions
from contextlib import suppress

with suppress(FileNotFoundError):
    os.remove('temp.txt')  # No error if file doesn't exist
# Equivalent to try/except FileNotFoundError: pass

# nullcontext — a no-op context manager for optional context managers
def process(file_path, lock=None):
    context = lock if lock is not None else nullcontext()
    with context:
        with open(file_path) as f:
            return f.read()

# ExitStack — combine multiple context managers dynamically
from contextlib import ExitStack

def process_files(file_paths):
    with ExitStack() as stack:
        files = [
            stack.enter_context(open(path)) for path in file_paths
        ]
        # All files open — process them
        for f in files:
            print(f.read())
    # All files closed automatically, even on exception

# ExitStack for conditional context managers
def connect(use_tls: bool):
    with ExitStack() as stack:
        if use_tls:
            stack.enter_context(ssl_context())
        conn = stack.enter_context(create_connection())
        return conn

Fix 4: Async Context Managers

For async with, use @asynccontextmanager or implement __aenter__/__aexit__:

from contextlib import asynccontextmanager
import asyncio

# @asynccontextmanager — async version of @contextmanager
@asynccontextmanager
async def async_db_connection(url: str):
    conn = await create_async_connection(url)
    try:
        yield conn
    finally:
        await conn.close()

# Usage
async def main():
    async with async_db_connection('postgresql://...') as conn:
        result = await conn.fetch("SELECT 1")

# Class-based async context manager
class AsyncCache:
    def __init__(self, redis_url: str):
        self.redis_url = redis_url
        self.client = None

    async def __aenter__(self):
        import aioredis
        self.client = await aioredis.from_url(self.redis_url)
        return self.client

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.client:
            await self.client.close()
        return False

async def handler():
    async with AsyncCache('redis://localhost') as cache:
        await cache.set('key', 'value')
        value = await cache.get('key')

Mixing async and sync context managers in async code:

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_task(coro):
    """Run a background task and cancel it on exit."""
    task = asyncio.create_task(coro)
    try:
        yield task
    finally:
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

async def main():
    async with managed_task(background_worker()) as task:
        await do_main_work()
    # Background task cancelled here

Fix 5: Context Managers for Resource Management Patterns

Common patterns where context managers shine:

# Temporary working directory
import os
from contextlib import contextmanager

@contextmanager
def working_directory(path: str):
    old_dir = os.getcwd()
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(old_dir)

with working_directory('/tmp'):
    os.system('ls')  # Runs in /tmp
# Back to original directory

# Timer context manager
import time
from contextlib import contextmanager
from dataclasses import dataclass

@dataclass
class Timer:
    elapsed: float = 0.0

@contextmanager
def timer():
    t = Timer()
    start = time.perf_counter()
    try:
        yield t
    finally:
        t.elapsed = time.perf_counter() - start

with timer() as t:
    time.sleep(0.1)
    heavy_computation()
print(f"Elapsed: {t.elapsed:.3f}s")

# Mocking for tests
from unittest.mock import patch
from contextlib import contextmanager

@contextmanager
def mock_current_time(dt):
    with patch('mymodule.datetime') as mock_dt:
        mock_dt.now.return_value = dt
        yield mock_dt

with mock_current_time(datetime(2026, 1, 1)):
    assert get_current_year() == 2026

Fix 6: Nesting and Combining Context Managers

Clean syntax for multiple context managers:

# Python 3.10+ — parenthesized with statement
with (
    open('input.txt') as src,
    open('output.txt', 'w') as dst,
    timer() as t,
):
    dst.write(src.read())
print(f"Copy took {t.elapsed:.3f}s")

# Python 3.9 and earlier — comma syntax
with open('input.txt') as src, open('output.txt', 'w') as dst:
    dst.write(src.read())

# Dynamic number of context managers — use ExitStack
resources = ['db', 'cache', 'queue']
with ExitStack() as stack:
    connections = {
        name: stack.enter_context(connect(name))
        for name in resources
    }
    process(connections)
# All connections closed automatically

Still Not Working?

@contextmanager generator must yield exactly once — if the generator yields more than once, RuntimeError: generator didn't stop after throw() is raised. The generator must yield exactly one value, then return (or reach the end of the function).

__exit__ called even when __enter__ fails — if __enter__ raises an exception, __exit__ is NOT called. Only wrap __exit__ cleanup for resources acquired in __enter__. For @contextmanager, code before yield is __enter__, after yield is __exit__.

Context managers and generators — if you have a generator function and try to use it as a context manager, it won’t work. Decorate it with @contextmanager to make it work with with.

Returning a value from __exit__ — the return value of __exit__ must be truthy to suppress the exception. return False and return None both let the exception propagate. Only return True when you intentionally want to suppress the exception.

For related Python issues, see Fix: Python Async/Sync Mix and Fix: Python Decorator Not Working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles