Fix: Structlog Not Working — Processor Chain, Context Variables, and Stdlib Integration
Part of: Python Errors
Quick Answer
How to fix Structlog errors — output is dict not JSON, context vars not propagating, stdlib logging not unified, async context loss, configure_once, KeyValueRenderer vs JSONRenderer, and async filtering.
The Error
You configure Structlog and the output looks like a Python dict, not JSON:
import structlog
log = structlog.get_logger()
log.info("user_created", user_id=123)
# 2025-04-09 10:00:00 [info ] user_created user_id=123
# Wanted: {"timestamp": "...", "level": "info", "event": "user_created", "user_id": 123}Or bind_contextvars() values disappear in nested code:
structlog.contextvars.bind_contextvars(request_id="abc")
log.info("processing") # OK — has request_id
async def nested():
log.info("nested") # Missing request_id in async contextOr stdlib logging and Structlog produce inconsistent output:
import logging
logging.getLogger("sqlalchemy.engine").info("SQL: SELECT ...")
# Plain text format — different from Structlog's JSONOr configure() runs multiple times in tests, causing duplicate log lines:
# pytest fixture configures Structlog
# Test imports module that also configures Structlog
# Result: every log line printed 2-3 timesOr async tasks lose their bound context variables across await points:
async def request(req_id):
structlog.contextvars.bind_contextvars(request_id=req_id)
await some_task()
log.info("done") # request_id still present? Sometimes yes, sometimes noStructlog is the structured logging library favored for production observability — JSON output, processor pipelines, context propagation. Unlike Loguru (which optimizes for simplicity), Structlog gives you composable processor chains for transforming log records. This power has costs: misconfigured processors silently change output format, context vars need careful setup, and the dual stdlib/Structlog model surprises newcomers. This guide covers each.
Why This Happens
Structlog’s processor chain is a list of functions, each taking the previous output and returning a transformed record. By default, the chain ends with ConsoleRenderer — colored, human-friendly output. To get JSON, you swap the renderer. Misordering or omitting processors silently changes behavior — Structlog won’t error on a “broken” chain; it’ll just produce unexpected output.
The standard library’s logging module is entirely separate from Structlog. Without explicit integration, third-party libraries (SQLAlchemy, Uvicorn, requests) log through stdlib in their own format. Unifying both takes deliberate setup.
Fix 1: Configuration and Renderer Selection
import structlog
import logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
log.info("user_created", user_id=123)
# {"timestamp": "2025-04-09T10:00:00", "level": "info", "event": "user_created", "user_id": 123}The processor chain runs in order — each function takes the previous output as a dict and returns the transformed dict. The final processor in the list is the renderer (converts dict to string).
Common Mistake: Forgetting that the last processor must be a renderer. If you put JSONRenderer in the middle of the chain, subsequent processors see a string instead of a dict and silently break.
# WRONG — JSONRenderer not last
processors = [
structlog.processors.JSONRenderer(), # Wrong position
structlog.processors.add_log_level, # Won't add level — chain broken
]
# CORRECT
processors = [
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(), # Last position
]Common renderers:
| Renderer | Use case |
|---|---|
ConsoleRenderer | Dev — colored, human-readable |
JSONRenderer | Production — log aggregators |
KeyValueRenderer | logfmt format (key=value key=value) |
LogfmtRenderer | Strict logfmt for Heroku, Honeycomb |
Environment-based config:
import os
import sys
import structlog
renderer = (
structlog.processors.JSONRenderer()
if not sys.stdout.isatty()
else structlog.dev.ConsoleRenderer(colors=True)
)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
renderer,
],
)Dev terminals get color; CI and production get JSON.
Fix 2: Context Variables for Request-Scoped Data
import structlog
from structlog.contextvars import bind_contextvars, unbind_contextvars, clear_contextvars
log = structlog.get_logger()
def handle_request(req_id, user_id):
bind_contextvars(request_id=req_id, user_id=user_id)
try:
log.info("request_started") # Has request_id, user_id
do_work()
log.info("request_complete")
finally:
clear_contextvars()
def do_work():
log.info("processing") # Also has request_id, user_id from parent contextmerge_contextvars processor is required — without it, the context isn’t included in logs:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # MUST be in the chain
...
],
)Context vars work across async boundaries — Python’s contextvars module is async-aware:
import asyncio
import structlog
from structlog.contextvars import bind_contextvars
log = structlog.get_logger()
async def request(req_id):
bind_contextvars(request_id=req_id)
log.info("start")
await asyncio.sleep(0.1)
log.info("after_await") # Still has request_id
async def main():
await asyncio.gather(
request("abc"),
request("xyz"),
)
# Each task has its own context — abc and xyz don't leak acrossbound_contextvars() context manager for scoped binding:
from structlog.contextvars import bound_contextvars
def handler():
with bound_contextvars(request_id="abc", trace_id="xyz"):
log.info("inside") # Has the bound vars
log.info("outside") # Doesn'tPro Tip: Bind context vars at the request boundary (middleware, route handler, message consumer entrypoint) — everything downstream gets the context for free. This is the single biggest improvement Structlog brings over plain logging: correlation IDs propagate automatically through deeply nested code without threading them through every function signature.
Fix 3: Unifying Stdlib Logging with Structlog
Third-party libraries use stdlib logging. Unify both formats so logs look consistent:
import logging
import structlog
# Shared processors run for both Structlog and stdlib logs
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
]
structlog.configure(
processors=shared_processors + [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Configure stdlib to also use Structlog's formatter
formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.processors.JSONRenderer(),
],
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
# Now both produce identical JSON output
log = structlog.get_logger()
log.info("structlog message", user_id=123)
logging.getLogger("requests").info("HTTP request")
# Both lines: same JSON format with timestamp, level, message, fieldsforeign_pre_chain processes stdlib log records before merging with Structlog’s chain. This ensures third-party loggers get the same context vars and formatting.
Set log levels per logger:
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)Reduces noise from chatty libraries. Set this after configuring Structlog so the levels stick.
Fix 4: Configuring Once vs Multiple Times
structlog.configure(...) # Replaces any prior configconfigure() is idempotent — calling it multiple times replaces the configuration entirely. But in tests where modules import each other, you may want first-config-wins:
structlog.configure_once(...) # Only configures if not already configuredconfigure_once for libraries:
# mylib/__init__.py
import structlog
# Only set default config if the user hasn't configured Structlog
structlog.configure_once(
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)Library code shouldn’t override user configuration — configure_once is the polite default for libraries.
Common Mistake: Calling structlog.configure() inside a function that runs multiple times (e.g., a test fixture, a Flask route handler). Each call replaces the global config. Symptoms include intermittent format changes between requests and logs that look correct in tests but wrong in production. Always configure Structlog once at startup, not in request handlers.
Fix 5: Async-Aware Logging
Structlog’s BoundLogger is synchronous by default. For async code, use AsyncBoundLogger:
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.AsyncBoundLogger, # Async variant
logger_factory=structlog.stdlib.LoggerFactory(),
)
log = structlog.get_logger()
async def handler():
await log.ainfo("async log") # Note: ainfo, not info
await log.adebug("debug log")
await log.awarning("warn log")Why async-aware? Sync log calls in async code work but block the event loop briefly. For high-throughput async services (FastAPI, aiohttp), use AsyncBoundLogger to delegate logging to a thread pool.
For FastAPI specifically:
from fastapi import FastAPI, Request
import structlog
from structlog.contextvars import bind_contextvars, clear_contextvars
import uuid
app = FastAPI()
log = structlog.get_logger()
@app.middleware("http")
async def correlation_id_middleware(request: Request, call_next):
clear_contextvars()
bind_contextvars(
request_id=str(uuid.uuid4()),
method=request.method,
path=request.url.path,
)
response = await call_next(request)
await log.ainfo("request_complete", status=response.status_code)
return responseFor FastAPI middleware patterns that interact with Structlog context, see FastAPI dependency injection error.
Platform Differences and Aggregator Targets
Structlog’s processor chain is portable, but every cloud log aggregator expects a slightly different JSON shape. The stdlib bridge, the async context model, and the Sentry integration also each have quirks worth knowing before you ship.
Stdlib bridge — the source of “duplicate log lines.” When you wire structlog.stdlib.ProcessorFormatter into the root logger, both Structlog calls (log.info(...)) and stdlib calls (logging.getLogger("uvicorn").info(...)) flow through the same JSON renderer. But Structlog itself also writes to stdout if you haven’t switched its logger_factory to LoggerFactory(). The default PrintLoggerFactory() prints directly to stdout, bypassing stdlib. Result: every Structlog line appears twice. Fix by setting logger_factory=structlog.stdlib.LoggerFactory() so Structlog routes through stdlib, and stdlib’s single handler produces one line per call.
Async context with contextvars — works across await, breaks across threads. Python’s contextvars module propagates context across await boundaries automatically because asyncio.create_task copies the current context. But if you offload work to a ThreadPoolExecutor (or asyncio.to_thread), the new thread starts with an empty context and Structlog loses your request_id. Capture and re-bind explicitly:
import contextvars
from structlog.contextvars import bind_contextvars
async def offload_with_context(fn, *args):
ctx = contextvars.copy_context()
return await asyncio.to_thread(ctx.run, fn, *args)contextvars.copy_context().run(...) re-enters the snapshot inside the worker thread, and Structlog’s merge_contextvars processor finds your fields again.
Datadog. Datadog ingests JSON logs and expects the message in message and severity in status. Add a field remapper:
def datadog_remap(_, method, ev):
ev["message"] = ev.pop("event", "")
ev["status"] = ev.pop("log_level", method).lower()
return evDatadog’s APM auto-injects dd.trace_id and dd.span_id env vars into the process; pick them up via a processor so logs link to traces.
AWS CloudWatch Logs. No field remapping is required — CloudWatch parses any valid JSON. But it expects each log event on a single line. If a Structlog message contains an embedded newline (e.g., a multi-line traceback), CloudWatch splits it into separate events. Set processors=[..., structlog.processors.format_exc_info, ...] so tracebacks render as a single escaped string, not raw \n characters.
Grafana Loki. Loki indexes log streams by labels (small set of key-value pairs) and keeps the rest as the line. Don’t dump every field into Loki labels — high cardinality kills its index. Use a separate, small processor that emits only service, env, and level as labels and keeps request_id, user_id, and other high-cardinality fields inside the JSON line. Promtail’s pipeline_stages then extracts them at query time.
GCP Cloud Logging expects severity (uppercase string) and message. The gcp_field_mapping example from earlier covers this. GCP also recognizes logging.googleapis.com/trace as a top-level field for trace correlation if you’re using Cloud Trace.
Sentry integration. Sentry’s Python SDK captures logging records at ERROR and above by default. If you’ve routed Structlog through stdlib, Sentry sees them automatically. But the structured fields (your request_id, user_id) are inside the JSON-rendered message string by the time Sentry reads it — Sentry can’t index them as tags. Attach them as Sentry tags explicitly:
import sentry_sdk
def to_sentry(logger, method_name, event_dict):
if method_name in ("error", "exception", "critical"):
with sentry_sdk.push_scope() as scope:
for k, v in event_dict.items():
if k not in ("event", "log_level", "timestamp"):
scope.set_tag(k, str(v))
sentry_sdk.capture_message(event_dict.get("event", ""), level=method_name)
return event_dictInsert before the renderer. Now request_id becomes a Sentry tag, queryable in the dashboard.
Container stdout vs file. In Kubernetes, write JSON to stdout/stderr and let the container runtime ship logs to your aggregator. Writing to a file inside the container means logs disappear when the pod dies. The only exception is sidecar-based shipping (a sidecar tails a shared volume) — rare and usually unnecessary.
Performance under load. Structlog’s chain runs per log call. At 50,000 logs/second, the JSON renderer becomes a hot path. Enable cache_logger_on_first_use=True, drop debug logs early in the chain, and use orjson as the JSON renderer (processors=[..., structlog.processors.JSONRenderer(serializer=orjson.dumps)]). orjson is 2-5x faster than stdlib json and handles datetime/UUID natively.
Fix 6: Filtering and Sampling
import structlog
import logging
# Drop debug logs in production
def filter_by_level(logger, method_name, event_dict):
if method_name == "debug" and os.getenv("ENV") == "production":
raise structlog.DropEvent
return event_dict
structlog.configure(
processors=[
filter_by_level,
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(),
],
)DropEvent signals “skip this log entirely” — Structlog stops processing and returns nothing.
Sample high-volume logs:
import random
def sample_debug(logger, method_name, event_dict):
if method_name == "debug" and random.random() > 0.01:
raise structlog.DropEvent # Drop 99% of debug logs
return event_dictRedact sensitive fields:
SENSITIVE = {"password", "api_key", "credit_card", "ssn"}
def redact_sensitive(logger, method_name, event_dict):
for key in list(event_dict.keys()):
if key.lower() in SENSITIVE:
event_dict[key] = "[REDACTED]"
return event_dict
structlog.configure(
processors=[
redact_sensitive, # Run early in the chain
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(),
],
)
log.info("login_attempt", username="alice", password="secret123")
# {"event": "login_attempt", "username": "alice", "password": "[REDACTED]"}Fix 7: Exception Logging
import structlog
log = structlog.get_logger()
try:
risky_operation()
except Exception:
log.exception("operation_failed") # Includes full tracebackformat_exc_info processor must be in the chain for tracebacks to render:
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.processors.format_exc_info, # Required for exception tracebacks
structlog.processors.JSONRenderer(),
],
)Add exception type and message as fields:
def add_exception_info(logger, method_name, event_dict):
exc_info = event_dict.get("exc_info")
if exc_info:
exc_type, exc_value, _ = exc_info
event_dict["exc_type"] = exc_type.__name__
event_dict["exc_message"] = str(exc_value)
return event_dict
structlog.configure(
processors=[
add_exception_info,
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
)With manual exception:
try:
1/0
except ZeroDivisionError as e:
log.error("calculation_failed", error=str(e), error_type=type(e).__name__)
# Or use log.exception("calculation_failed") to auto-include tracebackFix 8: Testing with Structlog
Structlog provides testing.capture_logs for inspecting logs in tests:
import structlog
def fn():
log = structlog.get_logger()
log.info("event", user_id=123)
def test_fn():
with structlog.testing.capture_logs() as cap_logs:
fn()
assert cap_logs == [{"event": "event", "user_id": 123, "log_level": "info"}]For pytest, configure Structlog to use the test renderer:
# conftest.py
import structlog
import pytest
@pytest.fixture(autouse=True)
def configure_structlog():
structlog.configure(
processors=[
structlog.processors.add_log_level,
structlog.testing.LogCapture(),
],
wrapper_class=structlog.testing.BoundLoggerMock,
)For pytest fixture patterns that pair with structlog testing, see pytest fixture not found.
Still Not Working?
Structlog vs Loguru
- Structlog — Processor chain, more configurable, first-class context vars, production observability focus.
- Loguru — Simpler API, fewer concepts, drop-in replacement for stdlib logging. See Loguru not working.
Use Loguru for simplicity. Use Structlog when you need processor chains (custom transformations, redaction, sampling) or strict JSON output with context propagation.
Performance Considerations
Structlog is generally fast but the chain runs on every log call. Optimizations:
# 1. Cache logger on first use (avoids re-creation per call)
structlog.configure(cache_logger_on_first_use=True, ...)
# 2. Filter early in the chain (skip expensive processors for dropped logs)
processors = [
filter_by_level, # Cheap; drops first
structlog.processors.add_log_level,
expensive_enrichment, # Only runs if not dropped
structlog.processors.JSONRenderer(),
]
# 3. Lazy evaluation of expensive values
log.info("done", expensive=lambda: compute_summary())
# Structlog evaluates the lambda only if the log is rendered (not dropped)Integration with FastAPI/Django/Flask
For FastAPI, use middleware as shown above. For Django:
# settings.py
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.processors.JSONRenderer(),
},
},
"handlers": {
"default": {"class": "logging.StreamHandler", "formatter": "json"},
},
"root": {"handlers": ["default"], "level": "INFO"},
}Django’s LOGGING dict accepts the formatter callable directly — no extra glue code.
Multiple Output Sinks
Sometimes you want different log destinations — JSON to a file for aggregation, colored output to stdout for humans. Configure stdlib logging with two handlers:
import logging
import structlog
import sys
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
]
# Console handler — colored
console_formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(colors=True),
],
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
# File handler — JSON
file_formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors,
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.processors.JSONRenderer(),
],
)
file_handler = logging.FileHandler("app.log")
file_handler.setFormatter(file_formatter)
root = logging.getLogger()
root.addHandler(console_handler)
root.addHandler(file_handler)
root.setLevel(logging.INFO)This pattern leverages stdlib’s multi-handler model — Structlog handles record formatting per handler, stdlib handles routing.
Cloud Logging Integration
GCP, AWS, and DataDog all consume JSON logs with specific field names:
# Rename fields to GCP convention
def gcp_field_mapping(logger, method_name, event_dict):
event_dict["severity"] = event_dict.pop("log_level", "INFO").upper()
event_dict["message"] = event_dict.pop("event")
return event_dict
structlog.configure(processors=[..., gcp_field_mapping, JSONRenderer()])For Loguru-based cloud logging patterns that compare to this Structlog approach, see Loguru not working.
Combining with Tenacity for Retry Logging
import structlog
from tenacity import retry, stop_after_attempt, before_sleep_log
import logging
log = structlog.get_logger()
@retry(
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logging.getLogger(), logging.INFO),
)
def fetch():
...For Tenacity-specific retry patterns, see Tenacity not working.
Debugging Misconfigured Chains
If logs come out wrong (missing fields, wrong format), add a print processor in the middle to inspect the dict at that point:
def debug_processor(logger, method_name, event_dict):
print(f"DEBUG processor state: {event_dict}")
return event_dict
structlog.configure(processors=[
structlog.processors.add_log_level,
debug_processor, # Inspect here
structlog.contextvars.merge_contextvars,
debug_processor, # And here
structlog.processors.JSONRenderer(),
])This makes the chain’s transformations visible — invaluable when a custom processor isn’t producing the expected output. Remove debug processors before deploying.
Bound Logger Reuse vs get_logger() Per Call
Calling structlog.get_logger() at module top-level is fine and recommended — with cache_logger_on_first_use=True, the logger is constructed once. Calling it inside a tight loop allocates a new wrapper each iteration; it’s not catastrophic but it’s wasteful. For the request-scoped pattern, bind a contextual logger once at the boundary:
log = structlog.get_logger().bind(request_id=req_id)
log.info("step_one")
log.info("step_two")Both lines share request_id without going through bind_contextvars — useful when you want the binding scoped to the local function and not visible to nested coroutines.
Renderer Choice Inside Tests
The pytest caplog fixture only captures stdlib logging records, not Structlog’s direct output. If your tests assert on log content, use structlog.testing.capture_logs() instead — it returns a list of dicts ([{"event": "...", "log_level": "info", ...}]) you can assert on directly. Don’t rely on caplog.records unless you’ve set up the full stdlib bridge in the test config; the bridge order matters and is easy to get subtly wrong.
Time Zone Choice for TimeStamper
structlog.processors.TimeStamper(fmt="iso") uses naive UTC. Production aggregators usually want timezone-aware timestamps to disambiguate logs across regions. Pass utc=True (the default) and fmt="iso" — Structlog emits 2026-05-22T10:30:00.123456Z. Avoid fmt="%Y-%m-%d %H:%M:%S" (no timezone, no fractional seconds), which causes ordering bugs in CloudWatch and Loki when bursts of logs arrive in the same second.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Loguru Not Working — Missing Logs, Rotation Errors, and Multiprocessing Issues
How to fix Loguru errors — logs not appearing after logger.add, file rotation not working, enqueue required for multiprocessing, structured logging JSON, intercepting stdlib logging, and handler removal.
Fix: OpenTelemetry Not Working — Traces Not Appearing, Spans Missing, or Exporter Connection Refused
How to fix OpenTelemetry issues — SDK initialization order, auto-instrumentation setup, OTLP exporter configuration, context propagation, and missing spans in Node.js, Python, and Java.
Fix: joblib Not Working — Parallel Backends, Memory Cache, and Pickling Errors
How to fix joblib errors — Parallel n_jobs slower than expected, Memory cache miss, backend loky vs threading vs multiprocessing, pickling lambda not supported, dump load file size, and pytest interference.
Fix: Marshmallow Not Working — Schema Errors, Load vs Dump, and Field Validation
How to fix Marshmallow errors — Schema not validated on dump, ValidationError messages format, unknown field handling, missing vs default, post_load object construction, and Marshmallow 3 to 4 migration.