IoT Data Pipeline: From Raw Sensor Reading to Live Dashboard in Under 100ms
"The dashboard shows data from 5 minutes ago" is a complaint we hear constantly from clients inheriting IoT systems built without latency in mind. A sub-100ms sensor-to-dashboard pipeline is achievable — it just requires understanding where time goes at each layer and making deliberate choices.
The Full Stack
[ESP32 Sensor] ~1ms sampling + JSON encode
↓ WiFi + MQTT TLS
[AWS IoT Core] ~8ms broker receive + rules engine
↓ IoT Rule → Lambda / direct MQTT subscription
[Node.js Backend] ~2ms parse + fan-out
├─→ [InfluxDB] ~5ms async write (non-blocking)
└─→ [WebSocket] ~1ms push to subscribed dashboards
↓ WebSocket frame
[React Dashboard] ~5ms React state update + renderTotal budget: ~22ms (LAN) to ~80ms (cross-region internet)
Layer 1: ESP32 Firmware — Minimize On-Device Latency
The most common firmware latency killer: formatting a JSON string character-by-character with sprintf. Use a fixed pre-allocated buffer.
#include
#include
#include // Pre-allocate JSON document (stack-allocated, no heap fragmentation)
StaticJsonDocument<256> doc;
char pubBuffer[256];
void publishReading(float temperature, float humidity, uint8_t battery) {
doc.clear()
doc["deviceId"] = DEVICE_ID;
doc["temperature"] = serialized(String(temperature, 2));
doc["humidity"] = serialized(String(humidity, 1));
doc["battery"] = battery;
doc["ts"] = millis(); // relative — server adds absolute timestamp
size_t len = serializeJson(doc, pubBuffer, sizeof(pubBuffer));
// QoS 0 for telemetry: no ack overhead, lowest latency
mqttClient.publish("devices/" DEVICE_ID "/telemetry", pubBuffer, len);
}
Key ESP32 firmware decisions for latency:
WIFI_PS_NONE (disable power save) for 10-20ms latency reduction in exchange for ~30mA extra drawLayer 2: Node.js MQTT Subscriber — Fan-Out Without Blocking
The backend subscribes to all device telemetry topics and fans data to both the database write path and the WebSocket push path. The critical rule: never await the database write before pushing to WebSocket.
// server.js — MQTT to WebSocket fan-out
const mqtt = require('mqtt')
const { WebSocketServer } = require('ws')
const { InfluxDB, Point } = require('@influxdata/influxdb-client')const influxWriteApi = new InfluxDB({
url: process.env.INFLUXDB_URL,
token: process.env.INFLUXDB_TOKEN,
}).getWriteApi('org', 'sensors', 'ms')
// Batch InfluxDB writes: flush every 500ms or 1000 points
influxWriteApi.writeOptions = {
batchSize: 1000,
flushInterval: 500,
maxRetries: 3,
}
const wss = new WebSocketServer({ port: 8080 })
// Track subscriptions: deviceId → Set of WebSocket clients
const subscriptions = new Map()
wss.on('connection', (ws) => {
ws.on('message', (msg) => {
const { action, deviceId } = JSON.parse(msg)
if (action === 'subscribe') {
if (!subscriptions.has(deviceId)) subscriptions.set(deviceId, new Set())
subscriptions.get(deviceId).add(ws)
ws.deviceSubscriptions = ws.deviceSubscriptions || new Set()
ws.deviceSubscriptions.add(deviceId)
}
})
ws.on('close', () => {
ws.deviceSubscriptions?.forEach(id => subscriptions.get(id)?.delete(ws))
})
})
const mqttClient = mqtt.connect(process.env.MQTT_BROKER_URL, {
// TLS options...
})
mqttClient.subscribe('devices/+/telemetry')
mqttClient.on('message', (topic, buffer) => {
const deviceId = topic.split('/')[1]
let payload
try { payload = JSON.parse(buffer) } catch { return }
// 1. Push to WebSocket immediately (synchronous, ~0.1ms)
const subscribers = subscriptions.get(deviceId)
if (subscribers?.size) {
const frame = JSON.stringify({ deviceId, ...payload, serverTs: Date.now() })
subscribers.forEach(ws => {
if (ws.readyState === ws.OPEN) ws.send(frame)
})
}
// 2. Write to InfluxDB asynchronously (does NOT block fan-out)
const point = new Point('environment')
.tag('deviceId', deviceId)
.floatField('temperature', payload.temperature)
.floatField('humidity', payload.humidity)
.intField('battery', payload.battery)
.timestamp(new Date())
influxWriteApi.writePoint(point) // non-blocking, batched internally
})
Layer 3: React Dashboard — WebSocket State Management
// hooks/useDeviceStream.ts
import { useEffect, useRef, useCallback, useState } from 'react'interface Reading {
deviceId: string
temperature: number
humidity: number
battery: number
serverTs: number
}
export function useDeviceStream(deviceIds: string[]) {
const [readings, setReadings] = useState>({})
const wsRef = useRef(null)
const connect = useCallback(() => {
const ws = new WebSocket(process.env.NEXT_PUBLIC_WS_URL!)
wsRef.current = ws
ws.onopen = () => {
deviceIds.forEach(id =>
ws.send(JSON.stringify({ action: 'subscribe', deviceId: id }))
)
}
ws.onmessage = (event) => {
const reading: Reading = JSON.parse(event.data)
setReadings(prev => ({ ...prev, [reading.deviceId]: reading }))
}
ws.onclose = () => setTimeout(connect, 3000) // reconnect
ws.onerror = () => ws.close()
}, [deviceIds])
useEffect(() => {
connect()
return () => wsRef.current?.close()
}, [connect])
return readings
}
React rendering optimization: Use useMemo and React.memo on individual gauge components so a reading update for device A doesn't re-render device B's gauge. At 50+ devices, unoptimized rendering becomes the bottleneck.
Latency Budget Analysis
| Stage | LAN (same region) | Cross-region (e.g., EU device, US cloud) | |-------|-------------------|------------------------------------------| | ESP32 WiFi + MQTT publish | 5–15ms | 5–15ms | | IoT Core broker processing | 2–5ms | 2–5ms | | Network (device → cloud) | 1–5ms | 50–150ms | | Node.js MQTT → WebSocket | <1ms | <1ms | | Network (cloud → browser) | 1–10ms | 20–80ms | | React state update + render | 3–8ms | 3–8ms | | Total | 12–44ms | 80–259ms |
For cross-region deployments, deploy a regional Node.js relay close to the devices. The relay receives MQTT locally and forwards aggregates to the central cloud. This cuts the device → cloud leg from 150ms to 5ms.
Backpressure Handling
When sensor message rate exceeds WebSocket push capacity (rare but possible at 1000+ devices):
// Rate-limit per-device WebSocket pushes to max 10/second
const lastPushed = new Map()function pushIfNotThrottled(ws, deviceId, frame) {
const now = Date.now()
const last = lastPushed.get(deviceId) || 0
if (now - last >= 100) { // 10 Hz max per device
ws.send(frame)
lastPushed.set(deviceId, now)
}
}
For InfluxDB write backpressure, the client library handles batching internally. Monitor influxWriteApi.writeOptions.maxBufferLines and alert if the buffer approaches capacity.
Buffering Strategies for Offline Recovery
If the MQTT subscriber process restarts, messages published during downtime are lost (QoS 0). For QoS 1 subscriptions, the broker queues messages during reconnection. Choose based on your acceptable data loss window:
For the storage layer details, see [Time-Series Databases for IoT](/blog/timeseries-databases-iot-influxdb-vs-timestream). For the Flutter mobile dashboard side of this architecture, see [Flutter IoT Real-Time Dashboard Architecture](/blog/flutter-iot-real-time-dashboard-architecture).
Need help building a low-latency IoT data pipeline? [Contact Code Caracal](/contact) — we've shipped these systems for clients across 15+ countries.