Phase 3: The Async Worker (Postgres Queue)

Overview

Queued messages are delivered by a PostgreSQL-backed async worker using a claim → deliver → finalize pattern: the row is transitioned to processing before the SMTP call, then to sent or back to queued (with incremented attempts) after. A reaper loop reclaims rows stuck in processing after a worker crash, preventing double-sends. SMTP 5xx responses are fast-failed immediately (no pointless retries of permanent errors); transient failures retry up to 10 times with exponential back-off. All error details are PII-scrubbed before being stored. Operators get full visibility via CLI and Prometheus metrics.

Goal

Implement a background worker that securely fetches queued messages from the database and delivers them via the configured self-hosted SMTP server.

TDD Acceptance Criteria

  1. pytest tests/workers/test_queue.py::test_select_for_update_skip_locked MUST PASS
  2. pytest tests/workers/test_smtp_delivery.py::test_successful_smtp_dispatch MUST PASS
  3. pytest tests/workers/test_smtp_delivery.py::test_mime_multipart_construction MUST PASS
  4. pytest tests/workers/test_backoff.py::test_failed_smtp_call_updates_next_retry MUST PASS

Technical Specifications

Worker Polling Loop

  • Uses asyncpg or SQLAlchemy async session.
  • Query: SELECT * FROM outbound_messages WHERE status = 'queued' AND (next_retry_at IS NULL OR next_retry_at <= NOW()) FOR UPDATE SKIP LOCKED LIMIT 10
  • Fetches the associated mailbox to decrypt smtp_password_encrypted.

SMTP Delivery

  • Construct email.message.EmailMessage.
  • Attach text_body and html_body as multipart/alternative.
  • Add standard headers (Subject, From, To, Date, Message-ID).
  • Use aiosmtplib to connect to smtp_host:smtp_port, STARTTLS if required, login, and send_message.

Exponential Backoff

  • Max attempts: 10
  • Formula: now() + (attempts ** 2) * 1 minute (e.g. 1m, 4m, 9m…)
  • If attempts > max, set status='failed' and populate error_log.

PII Scrubbing in error_log (CONTRIBUTING.md rule)

SMTP server responses frequently include recipient addresses (e.g. 550 5.1.1 <alice@example.com> user unknown). Before any string is persisted to error_log, it must pass through redact_pii (src/app/utils/), which replaces email addresses, Message-IDs, and other PII with <redacted> plus a stable correlation hash.

error_log is sensitive: treat it as such for backups, support dumps, and Prometheus scrape exposure. Do not surface raw error_log values in metrics labels or API responses.

Sourced from docs/features/03_async_worker_queue in the repo. Edits go through the same review as code.