Building Real-time IoT Pipelines with NATS and InfluxDB
How we designed a 15-minute aggregation pipeline for smart building telemetry — ingesting thousands of sensor events per minute and making them queryable in seconds.
At Lutron, we run a smart building platform that ingests sensor telemetry from energy meters, occupancy sensors, HVAC controllers, and DALI emergency lighting systems — continuously, across multiple on-prem deployments. The challenge: make that raw stream of device events into clean, queryable 15-minute aggregates without losing a single data point.
This post walks through the architecture we built using NATS for stream ingestion and InfluxDB for time-series storage and aggregation.
The Problem
A typical smart building deployment generates hundreds of sensor readings per second. Each reading is a small JSON payload — device ID, timestamp, value, unit. Raw storage of every event is expensive and queries over it are slow.
What facilities teams actually need:
- "What was the average energy consumption in Zone A for the last 8 hours?"
- "Show me 15-minute occupancy intervals for the last 7 days"
- "Alert me when energy spikes above threshold in any zone"
This means we need pre-aggregated 15-minute windows, aligned to clock boundaries (00:00, 00:15, 00:30...).
Architecture Overview
IoT Device → NATS Stream → Django Ingest Endpoint
↓
InfluxDB (raw metrics)
↓
Scheduler (every 15 min) → Aggregator
↓
InfluxDB (aggregated)
↓
REST API → Client
The stack:
- NATS JetStream — durable message broker, device events land here first
- Django — ingest endpoints that consume NATS subjects and write to InfluxDB
- InfluxDB v2 — time-series database for both raw and aggregated data
- Celery Beat — scheduler that triggers aggregation every 15 minutes
- Python — aggregation logic, Flux query composition
NATS Ingestion
NATS subjects follow a hierarchical pattern:
iot.energy.{site_id}.{device_id}
iot.occupancy.{site_id}.{area_id}
iot.hvac.{site_id}.{zone_id}
The Django ingest view subscribes to these subjects and writes raw points to InfluxDB:
@api_view(["POST"])
def stream_energy_data(request):
payload = request.data
point = (
Point("energy_metrics")
.tag("device_id", payload["device_id"])
.tag("site_id", payload["site_id"])
.field("power_w", payload["power_w"])
.field("energy_kwh", payload["energy_kwh"])
.time(payload["timestamp"], WritePrecision.SECONDS)
)
influx_write_api.write(bucket="iot_data", record=point)
return Response({"status": "ok"})
The Aggregation Pipeline
Every 15 minutes, Celery Beat triggers the energy aggregator. The key insight: we use Flux queries in tall format (not pivot), which handles multi-series grouping correctly.
flux_query = f"""
from(bucket: "iot_data")
|> range(start: {window_start}, stop: {window_end})
|> filter(fn: (r) => r._measurement == "energy_metrics")
|> filter(fn: (r) => r._field == "energy_kwh")
|> group(columns: ["device_id", "site_id"])
|> sum()
|> map(fn: (r) => ({{r with _time: {scheduled_time}}}))
"""
The scheduled_time is critical — it must snap to the nearest 15-minute boundary:
def get_scheduled_time(run_time: datetime) -> datetime:
minutes = run_time.minute
snapped = (minutes // 15) * 15
return run_time.replace(minute=snapped, second=0, microsecond=0)
Without this, timestamps drift 1–9 minutes off the boundary and aggregates become unqueryable by time.
Tall vs Wide Format: The Gotcha
Early versions used Flux pivot() to reshape the data before summing:
# WRONG — pivot breaks multi-series grouping
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> sum(column: "energy_kwh") # silently returns wrong values
The correct approach is tall format — group, then aggregate without pivot:
# RIGHT — each field stays in its own row, group correctly
|> group(columns: ["device_id", "site_id"])
|> sum()
This was a production bug that caused aggregates to silently undercount. Flux's pivot changes the data shape in ways that break downstream sum() when multiple devices share the same timestamp.
Work Hours Boundary Edge Case
We tag aggregated points with is_work_hours_data — whether the 15-minute window falls within configured work hours (e.g., 08:00–18:00 UTC). This sounds simple, but the 08:00–08:15 window was consistently missing.
Root cause: the aggregator's window_start was calculated as scheduled_time - 15 minutes. For the 08:00 run, window_start landed at 07:45, but the Flux range stop was exclusive — so 08:00:00 events were never included.
Fix: use scheduled_time itself as range stop, not scheduled_time - epsilon.
Results
After these fixes:
- Zero gap in 15-minute aggregates across all deployment environments
- Sub-100ms REST API response times for 7-day energy queries (InfluxDB column store is fast)
- Consistent timestamps — every aggregate aligns exactly to a 15-minute boundary
- BDD test coverage using Behave — feature files describe expected aggregate counts and values, catching environment-specific discrepancies before they hit production
What's Next
We're exploring:
- Streaming anomaly detection — flag energy spikes in real time via NATS before they're aggregated
- Multi-site rollups — aggregate across sites for portfolio-level dashboards
- Retention policies — auto-downsample 15-min aggregates to 1-hour for data older than 90 days
If you're building IoT data pipelines and hitting similar issues, feel free to reach out. The combination of NATS + InfluxDB + Python is underrated for on-prem deployments where cloud dependency isn't an option.