SignalFlow

Ingestion
Kafka
Processing
Postgres
Topic Agg
Refresh
WebSocket
Grafana
1
STEP 01
Ingestion — New Post Detected
poll_subreddit() · priority: aggressive · dual producer.send()
asyncio aiokafka asyncpraw
# Post r/technology ID: xyz789 — first seen poll_subreddit() → xyz789 not in seen_ids → priority = "aggressive" ── Legacy path (unchanged) ────────────────────── producer.send("reddit.posts.raw", post) ── NEW unified path ───────────────────────────── signal = normalise(post) # schema-agnostic signal object producer.send("signals.normalised", signal) NEW
Legacy Path
reddit.posts.raw
raw Reddit schema · unchanged
title, score, author, subreddit, url
New Unified Path NEW
signals.normalised
platform-agnostic schema
source_id, platform, title, body, raw_score, ts
Both topics produced in the same ingestion task. signals.normalised is platform-agnostic — Reddit, HackerNews, Bluesky, YouTube all write the same schema. Downstream consumers read one topic instead of four.
kafka partition assignment
2
STEP 02
Kafka — Partition Routing
hash(xyz789) % N · RF=3 · minISR=2 · Snappy compression
3 brokers RF=3 minISR=2
reddit.posts.raw → partition hash("xyz789") % 3 → partition 1 signals.normalised → partition hash("xyz789") % 6 → partition 3
reddit.posts.raw3 partitions
reddit.posts.refresh3 partitions
hackernews.stories.raw3 partitions
bluesky.posts.raw6 partitions
youtube.comments.raw3 partitions
signals.normalised6 · unified ★
*.dlq1 each
Same post ID hashes to the same partition every time — ordering guaranteed per post across its lifecycle (raw → refresh). signals.normalised has 6 partitions because all 4 sources write to it — higher throughput needed. RF=3 + minISR=2 means no acknowledged write is ever on fewer than 2 brokers simultaneously.
consumer group picks up · micro-batch window opens
3
STEP 03
Processing — Dual Path
×3 replicas · flush_batches() legacy + flush_signal_batch() new · parallel
VADER spaCy NER ×3 replicas
Legacy Path — reddit.posts.raw
flush_batches()
sentiment(title) → 0.34
keywords → ["breakthrough", "quantum", "computing"]
bulk_upsert_posts()
bulk_upsert_nlp_features()
New Path — signals.normalised NEW
flush_signal_batch()
sentiment(title+body) → VADER compound
extract_topics_batch() → spaCy NER → ["quantum computing", "ibm"]
keywords → word frequency filter
bulk_upsert_signals() → signals table
bulk_upsert_signal_nlp() → topics + sentiment
calculate_velocity()
compute_trending()
bulk_insert_signal_metrics_history()
Both paths run in parallel — same consumer group, different topics. Legacy path keeps existing reddit pipeline untouched. New path processes title + body (not just title) for richer sentiment. spaCy NER extracts named entities as structured topics, not just keyword frequency.
WAL streams to replica within milliseconds
4
STEP 04 🗄
PostgreSQL — Signals Table Populated
TimescaleDB primary · WAL → replica · ReadReplicaRouter for ORM reads
TimescaleDB WAL stream
tablecolumntypepopulated by
signalstopicsjsonbbulk_upsert_signal_nlp() ← spaCy NER
signalssentiment_compoundfloatVADER on title+body
signalstrending_scorefloatcompute_trending()
signalsvelocityfloatcalculate_velocity()
signal_metrics_historyscore, velocity, tshypertablebulk_insert_signal_metrics_history()
S3 archivesignal batchesParquetraw signal payloads archived to S3 after flush · eu-north-1 · 90 day retention mirrors TimescaleDB
ReadReplicaRouter in Django routes all db_for_read() to the replica, db_for_write() to primary. Django ORM never touches the primary for reads — replica handles all API query load. WAL replication lag is typically <50ms on local network.
topic aggregator polls every 60s · watermark advances
5
STEP 05
Topic Aggregator — Separate Container
every 60s · watermark-based · cross-platform event detection
60s cycle watermark cross-platform
── Every 60 seconds ───────────────────────────────────── _poll_signals_from_db() → SELECT * FROM signals WHERE last_updated_at >= watermarkwatermark advances after each successful flush _accumulate() → buckets by (topic, platform, 15min window) → aggregates: count, avg_score, avg_sentiment, avg_velocity _flush_to_db() → UPSERT topic_timeseries _detect_cross_platform_events() → IF topic seen on 2+ platforms in same window → INSERT cross_platform_events (topic, platforms[], delta_score, ts)
Runs as an independent container — not inside Django, not inside the processing service. Failure here doesn't affect ingestion or API serving. Watermark prevents reprocessing already-aggregated signals. Cross-platform events only fire when the same topic surfaces on 2+ platforms in the same 15-minute bucket.
5 minutes pass · post score climbs to 1200
6
STEP 06
Ingestion Refresh + Velocity Calculation
5 min later · score 45→1200 · comment_velocity computed · dual path again
+5 min velocity
── Ingestion picks up updated post ────────────────────── producer.send("reddit.posts.refresh", updated_post) producer.send("signals.normalised", updated_signal) ── Processing calculates velocity ────────────────────── score_velocity = (1200 - 45) / 300 = 3.85/s comment_velocity = (45 - 2) / 300 = 0.14/s trending_score → compute_trending(velocity, normalised_score, recency_decay) ── Writes back to signals table ───────────────────────── bulk_upsert_signals() # updates velocity + trending_score
Same dual-path logic as Step 3 — refresh topic triggers the same processing pipeline. velocity = Δscore / Δtime in seconds. trending_score weights velocity heavily — a post gaining 1000 points in 5 minutes ranks above a post with 5000 points gained over 3 days.
Redis publish · Channels layer picks up
7
STEP 07
Django / Daphne ASGI — Live Push
Redis DB2 pub/sub → Django Channels → WebSocket clients
ASGI Channels WS push
── Processing service publishes after bulk upsert ──────── redis.publish("asgi:group:posts_feed", signal_payload) ── Django Channels consumer receives ───────────────────── class SignalConsumer(AsyncWebsocketConsumer): async def signal_update(self, event): await self.send(json.dumps(event["payload"])) ── Client dashboard updates ────────────────────────────── score: 451200 velocity badge: 3.85/s trending: ★ RISING
AUTHJWT — required for all endpoints belowPOST /api/token/ → {access, refresh} · access expires 60min · POST /api/token/refresh/ to rotate
GET/api/v1/signals/cross-platform feed · paginated · 30s Redis cache
GET/api/v1/pulse/topic sentiment summary · VADER + NER entities
GET/api/v1/trending/velocity-ranked cross-platform trending
GET/api/v1/compare/divergence events · platform delta scores
WSws://localhost:8000/ws/signals/real-time push · no polling · Redis DB2 pub/sub
GET/health/health check · no auth required · no DB query
Daphne serves HTTP and WebSocket on the same ASGI process. REST responses cached in Redis DB1 at 30s TTL — expires automatically, not explicitly invalidated on flush. WebSocket bypasses the cache entirely — always live from Redis DB2 pub/sub channel layer. All REST endpoints require Authorization: Bearer <access_token> header.
Grafana scrapes every 60s · dashboards refresh
8
STEP 08 📊
Grafana — Observability Layer
1 min refresh · Postgres replica + Prometheus · 6 panels
1min refresh 4 panels
📈 Topic Traction Timeline
source: topic_timeseries · 15min buckets · line chart per topic
🌡 Lead/Lag Heatmap
source: cross_platform_events · platform × topic matrix · colour = delta_score
🔥 Trending Topic Cards
source: trending_topics_24h VIEW · stat panels · velocity + score
📊 Top Topics by Platform
source: topic_leaderboard_2h VIEW · bar chart · grouped by platform
⚡ Pipeline Throughput
source: Prometheus · scrapes ×3 processing replicas port 8000 every 15s · messages/sec · batch flush latency P99
🚨 DLQ Rate
source: Prometheus · reddit_processor_dlq_messages_total · should be 0 sustained — any non-zero rate = data loss in progress
Grafana has two datasources: Postgres replica for topic/signal panels and Prometheus for pipeline health panels (scrapes all 3 processing replicas on port 8000 every 15s). Both provisioned automatically via grafana/provisioning/ — no manual setup needed. trending_topics_24h and topic_leaderboard_2h are regular views — they recompute on each query. Promote to MATERIALIZED VIEW with a refresh job if query latency becomes an issue at scale.