Phase 3: The Async Worker (Postgres Queue)
Overview
Queued messages are delivered by a PostgreSQL-backed async worker using a
claim → deliver → finalize pattern: the row is transitioned to processing
before the SMTP call, then to sent or back to queued (with incremented
attempts) after. A reaper loop reclaims rows stuck in processing after a
worker crash, preventing double-sends. SMTP 5xx responses are fast-failed
immediately (no pointless retries of permanent errors); transient failures
retry up to 10 times with exponential back-off. All error details are
PII-scrubbed before being stored. Operators get full visibility via CLI and
Prometheus metrics.
Goal
Implement a background worker that securely fetches queued messages from the database and delivers them via the configured self-hosted SMTP server.
TDD Acceptance Criteria
pytest tests/workers/test_queue.py::test_select_for_update_skip_lockedMUST PASSpytest tests/workers/test_smtp_delivery.py::test_successful_smtp_dispatchMUST PASSpytest tests/workers/test_smtp_delivery.py::test_mime_multipart_constructionMUST PASSpytest tests/workers/test_backoff.py::test_failed_smtp_call_updates_next_retryMUST PASS
Technical Specifications
Worker Polling Loop
- Uses
asyncpgor SQLAlchemy async session. - Query:
SELECT * FROM outbound_messages WHERE status = 'queued' AND (next_retry_at IS NULL OR next_retry_at <= NOW()) FOR UPDATE SKIP LOCKED LIMIT 10 - Fetches the associated
mailboxto decryptsmtp_password_encrypted.
SMTP Delivery
- Construct
email.message.EmailMessage. - Attach
text_bodyandhtml_bodyasmultipart/alternative. - Add standard headers (Subject, From, To, Date, Message-ID).
- Use
aiosmtplibto connect tosmtp_host:smtp_port, STARTTLS if required, login, andsend_message.
Exponential Backoff
- Max attempts: 10
- Formula:
now() + (attempts ** 2) * 1 minute(e.g. 1m, 4m, 9m…) - If attempts > max, set
status='failed'and populateerror_log.
PII Scrubbing in error_log (CONTRIBUTING.md rule)
SMTP server responses frequently include recipient addresses
(e.g. 550 5.1.1 <alice@example.com> user unknown). Before any string is
persisted to error_log, it must pass through redact_pii
(src/app/utils/), which replaces email addresses, Message-IDs, and other
PII with <redacted> plus a stable correlation hash.
error_log is sensitive: treat it as such for backups, support dumps, and
Prometheus scrape exposure. Do not surface raw error_log values in metrics
labels or API responses.