The trap is believing that environmental monitoring is an enterprise problem that needs enterprise hardware. Facility managers buy $5,000 environmental monitoring stations with proprietary gateways, per-sensor licensing fees, and cloud dashboards that take three months to configure. Data scientists evaluate cloud-based IoT platforms at $500/month before they've even proven that the readings they want to collect are useful. The result is always the same: by the time the system is installed, the budget is spent, and nobody has validated whether the data actually answers the question they started with.
The worst environmental monitoring system is the one that costs so much to deploy that nobody validates the data before committing to it.
A Raspberry Pi with a $4 temperature sensor does in an afternoon what those enterprise systems do in a fiscal quarter. It reads the sensor, publishes to MQTT, stores readings in a database, renders a Grafana dashboard, and alerts when thresholds are breached. The entire stack runs on the Pi itself or in Docker containers on the same board. Total cost: under $50. Total configuration time: less than the meeting where someone would normally propose the enterprise solution.
This chapter builds that system end to end. It uses patterns from Chapter 16 (The MQTT Contract), Chapter 17 (The Edge API Pattern), and Chapter 22 (The Dashboard Axiom). The architecture scales to any sensor type — temperature, humidity, air quality, light levels, soil moisture. The pattern is the same. Only the sensor library changes.
The system has four stages:
Every code example in this chapter includes a simulated-data fallback. If you don't have a DHT22 sensor, the system generates realistic temperature and humidity readings with natural variation (sinusoidal daily pattern plus random noise). The architecture, MQTT messages, database schema, and Grafana dashboard are identical whether the data comes from a real sensor or the simulator. Build the whole system first, buy the sensor later.
The separation between these stages matters. The sensor reader doesn't know about the database. The database writer doesn't know about Grafana. Each component communicates through MQTT — a published message on a topic. That means you can replace any component independently. Swap SQLite for InfluxDB? Only the subscriber changes. Add a second sensor on another Pi? Only the publisher changes. This is the MQTT Contract from Chapter 16, applied to a real system.
Every environmental monitoring system is the same four-stage pipeline: sense, publish, store, visualize. The stages communicate through a message broker, never directly. This decoupling means any stage can be replaced, scaled, or debugged independently. If you find yourself writing code where the sensor reader also writes to the database, you've coupled two stages and made both harder to change.
The sensor reader is a standalone Python script that reads the DHT22 and publishes to MQTT. It runs in a loop, sleeps between readings, and handles sensor errors gracefully.
#!/usr/bin/env python3
"""Environmental sensor reader — publishes temperature/humidity to MQTT."""
import json
import math
import random
import time
from datetime import datetime
import paho.mqtt.client as mqtt
# ── Configuration ─────────────────────────────────────────────────────────
MQTT_BROKER = "localhost"
MQTT_TOPIC_TEMP = "environment/temperature"
MQTT_TOPIC_HUMIDITY = "environment/humidity"
MQTT_TOPIC_STATUS = "environment/status"
READING_INTERVAL = 30 # seconds between readings
SENSOR_LOCATION = "office"
USE_REAL_SENSOR = False # Set True if DHT22 is connected
# ── Sensor initialization ────────────────────────────────────────────────
if USE_REAL_SENSOR:
import adafruit_dht
import board
sensor = adafruit_dht.DHT22(board.D4) # GPIO pin 4
# ── Simulated sensor (realistic daily temperature curve) ──────────────────
def simulated_reading():
"""Generate realistic temperature/humidity with daily variation."""
hour = datetime.now().hour + datetime.now().minute / 60.0
# Temperature: 20°C base, ±3°C daily swing, peak at 2pm
temp_base = 22.0
temp_swing = 3.0 * math.sin((hour - 8) * math.pi / 12)
temp_noise = random.gauss(0, 0.3)
temperature = round(temp_base + temp_swing + temp_noise, 1)
# Humidity: inverse correlation with temperature
humidity_base = 55.0
humidity_swing = -8.0 * math.sin((hour - 8) * math.pi / 12)
humidity_noise = random.gauss(0, 1.5)
humidity = round(max(20, min(90, humidity_base + humidity_swing + humidity_noise)), 1)
return temperature, humidity
def read_sensor():
"""Read from real sensor or simulator."""
if USE_REAL_SENSOR:
try:
temperature = sensor.temperature
humidity = sensor.humidity
if temperature is not None and humidity is not None:
return round(temperature, 1), round(humidity, 1)
return None, None
except RuntimeError as e:
# DHT sensors occasionally fail to read — this is normal
print(f"Sensor read error (retrying next cycle): {e}")
return None, None
else:
return simulated_reading()
# ── MQTT setup ────────────────────────────────────────────────────────────
client = mqtt.Client(client_id=f"env-sensor-{SENSOR_LOCATION}")
client.connect(MQTT_BROKER, 1883, 60)
client.loop_start()
# Publish a birth message so subscribers know we're alive
client.publish(MQTT_TOPIC_STATUS, json.dumps({
"status": "online",
"location": SENSOR_LOCATION,
"interval": READING_INTERVAL,
"simulated": not USE_REAL_SENSOR,
"timestamp": datetime.now().isoformat()
}), qos=1, retain=True)
# ── Main loop ─────────────────────────────────────────────────────────────
print(f"Environmental monitor started — {SENSOR_LOCATION}")
print(f"Reading interval: {READING_INTERVAL}s")
print(f"Sensor mode: {'REAL DHT22' if USE_REAL_SENSOR else 'SIMULATED'}")
consecutive_failures = 0
while True:
temperature, humidity = read_sensor()
if temperature is not None and humidity is not None:
consecutive_failures = 0
timestamp = datetime.now().isoformat()
temp_payload = json.dumps({
"value": temperature,
"unit": "celsius",
"location": SENSOR_LOCATION,
"timestamp": timestamp
})
humidity_payload = json.dumps({
"value": humidity,
"unit": "percent",
"location": SENSOR_LOCATION,
"timestamp": timestamp
})
client.publish(MQTT_TOPIC_TEMP, temp_payload, qos=1)
client.publish(MQTT_TOPIC_HUMIDITY, humidity_payload, qos=1)
print(f"[{timestamp}] {temperature}°C, {humidity}% RH — {SENSOR_LOCATION}")
else:
consecutive_failures += 1
if consecutive_failures >= 5:
print(f"WARNING: {consecutive_failures} consecutive read failures")
time.sleep(READING_INTERVAL)
Two design decisions worth noting. First, the simulated readings aren't random noise — they follow a sinusoidal daily temperature curve with Gaussian noise added. This produces dashboard graphs that look like real sensor data, which matters when you're testing threshold alerts and visualization. Random flat noise doesn't teach you anything about how your dashboard handles trends. Second, the consecutive_failures counter prevents log spam. A single failed DHT22 read is normal — the sensor's one-wire protocol occasionally drops a bit. Five failures in a row means the sensor is disconnected or damaged, and that deserves attention.
Separate the sensor from the pipeline. The reader script's only job is to produce structured MQTT messages. Everything downstream — storage, visualization, alerting — subscribes to those messages and has no idea whether the data came from a real sensor or a simulator.
Each reading publishes two messages to separate topics:
environment/temperature → {"value": 23.4, "unit": "celsius", "location": "office", "timestamp": "..."}
environment/humidity → {"value": 52.1, "unit": "percent", "location": "office", "timestamp": "..."}
Splitting temperature and humidity into separate topics is intentional. A subscriber that only cares about temperature — a thermostat controller, for example — subscribes to environment/temperature and ignores humidity entirely. A dashboard subscribes to environment/# and gets both. A multi-room deployment publishes to environment/temperature/office, environment/temperature/warehouse, and subscribers choose their scope with topic wildcards.
MQTT topic design is API design. Get the hierarchy right and every future subscriber writes itself.
The retain flag on the status message means new subscribers immediately receive the last-known status without waiting for the next cycle. When Grafana reconnects after a restart, it knows whether the sensor is online before the next reading arrives. Small detail, large operational impact.
You have two storage options. SQLite is simpler and runs on the Pi with zero additional infrastructure. InfluxDB is purpose-built for time-series data and pairs naturally with Grafana. I'll show both.
#!/usr/bin/env python3
"""MQTT subscriber that writes environmental readings to SQLite."""
import json
import sqlite3
from datetime import datetime
import paho.mqtt.client as mqtt
DB_PATH = "/home/pi/environment/readings.db"
# ── Database setup ────────────────────────────────────────────────────────
conn = sqlite3.connect(DB_PATH)
conn.execute("""
CREATE TABLE IF NOT EXISTS readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
location TEXT NOT NULL,
metric TEXT NOT NULL,
value REAL NOT NULL,
unit TEXT NOT NULL,
received_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_readings_ts ON readings(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_readings_loc ON readings(location, metric)")
conn.commit()
# ── MQTT callback ─────────────────────────────────────────────────────────
def on_message(client, userdata, msg):
try:
data = json.loads(msg.payload.decode())
metric = msg.topic.split("/")[-1] # "temperature" or "humidity"
conn.execute(
"INSERT INTO readings (timestamp, location, metric, value, unit) VALUES (?, ?, ?, ?, ?)",
(data["timestamp"], data["location"], metric, data["value"], data["unit"])
)
conn.commit()
except (json.JSONDecodeError, KeyError) as e:
print(f"Bad message on {msg.topic}: {e}")
# ── MQTT setup ────────────────────────────────────────────────────────────
client = mqtt.Client(client_id="env-db-writer")
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.subscribe("environment/#")
print("Database writer started — subscribing to environment/#")
client.loop_forever()
SQLite handles one writer at a time. For a single sensor publishing every 30 seconds, this is a non-issue — the write completes in microseconds. For ten sensors publishing every second, you'll start seeing database is locked errors. At that scale, switch to InfluxDB or PostgreSQL. The threshold is lower than most engineers expect: SQLite struggles above roughly 50 writes per second in WAL mode.
For production deployments, InfluxDB is the right storage engine. It's designed for time-series data — it compresses timestamps efficiently, supports downsampling (store per-second data for a week, per-minute data for a month, per-hour data forever), and Grafana speaks its query language natively.
The MQTT-to-InfluxDB bridge is part of the Docker Compose stack below. Telegraf handles it — it subscribes to MQTT topics and writes to InfluxDB without custom code.
This is where the system comes together. One docker-compose.yml runs the entire monitoring infrastructure:
# docker-compose.yml — Environmental monitoring stack
version: "3.8"
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
volumes:
- ./mosquitto/config:/mosquitto/config
- mosquitto_data:/mosquitto/data
restart: unless-stopped
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=changeme123
- DOCKER_INFLUXDB_INIT_ORG=pi-monitor
- DOCKER_INFLUXDB_INIT_BUCKET=environment
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=pi-monitor-token
volumes:
- influxdb_data:/var/lib/influxdb2
restart: unless-stopped
telegraf:
image: telegraf:1.30
depends_on:
- mosquitto
- influxdb
volumes:
- ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
restart: unless-stopped
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=changeme123
- GF_INSTALL_PLUGINS=
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- influxdb
restart: unless-stopped
volumes:
mosquitto_data:
influxdb_data:
grafana_data:
The Telegraf configuration bridges MQTT to InfluxDB:
# telegraf/telegraf.conf
[agent]
interval = "10s"
flush_interval = "10s"
[[inputs.mqtt_consumer]]
servers = ["tcp://mosquitto:1883"]
topics = ["environment/#"]
data_format = "json"
json_time_key = "timestamp"
json_time_format = "2006-01-02T15:04:05.999999"
topic_tag = "topic"
tag_keys = ["location"]
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "pi-monitor-token"
organization = "pi-monitor"
bucket = "environment"
And the Mosquitto config:
# mosquitto/config/mosquitto.conf
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
Create the directory structure and launch:
# Create project structure
mkdir -p ~/env-monitor/{mosquitto/config,telegraf,grafana/provisioning/datasources}
# Save mosquitto.conf, telegraf.conf, and docker-compose.yml
# to their respective paths (shown above)
# Create Grafana datasource provisioning
cat > ~/env-monitor/grafana/provisioning/datasources/influxdb.yml << 'EOF'
apiVersion: 1
datasources:
- name: InfluxDB
type: influxdb
access: proxy
url: http://influxdb:8086
jsonData:
version: Flux
organization: pi-monitor
defaultBucket: environment
secureJsonData:
token: pi-monitor-token
isDefault: true
EOF
# Launch the stack
cd ~/env-monitor
docker compose up -d
# Verify all containers are running
docker compose ps
# Start the sensor reader (outside Docker — it needs GPIO access)
python3 sensor_reader.py
Within 60 seconds of starting the sensor reader, data flows through the pipeline: sensor → MQTT → Telegraf → InfluxDB → Grafana. Open http://<pi-ip>:3000 in a browser, log in with admin/changeme123, and the InfluxDB datasource is already configured.
Docker Compose turns a four-service monitoring stack into a single docker compose up -d. The sensor reader stays outside Docker because it needs GPIO access — everything else runs containerized.
Create a dashboard with two panels: temperature over time and humidity over time. In Grafana, click "New Dashboard" → "Add Visualization" → select the InfluxDB datasource, then use this Flux query for temperature:
from(bucket: "environment")
|> range(start: -24h)
|> filter(fn: (r) => r["topic"] == "environment/temperature")
|> filter(fn: (r) => r["_field"] == "value")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
Duplicate the panel for humidity, changing the topic filter to environment/humidity. Add threshold lines at your alert boundaries — 28 degrees Celsius for high temperature, 70% for high humidity.
For readers who want a pre-built dashboard, here is a Grafana dashboard JSON you can import directly via Settings → JSON Model:
{
"title": "Environmental Monitor",
"panels": [
{
"title": "Temperature (°C)",
"type": "timeseries",
"gridPos": {"h": 10, "w": 12, "x": 0, "y": 0},
"targets": [{
"query": "from(bucket: \"environment\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r[\"topic\"] == \"environment/temperature\") |> filter(fn: (r) => r[\"_field\"] == \"value\") |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)",
"refId": "A"
}],
"fieldConfig": {
"defaults": {
"unit": "celsius",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 26},
{"color": "red", "value": 30}
]
}
}
}
},
{
"title": "Humidity (%RH)",
"type": "timeseries",
"gridPos": {"h": 10, "w": 12, "x": 12, "y": 0},
"targets": [{
"query": "from(bucket: \"environment\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r[\"topic\"] == \"environment/humidity\") |> filter(fn: (r) => r[\"_field\"] == \"value\") |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)",
"refId": "A"
}],
"fieldConfig": {
"defaults": {
"unit": "percent",
"thresholds": {
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 65},
{"color": "red", "value": 75}
]
}
}
}
}
]
}
Chapter 22's Dashboard Axiom says: a dashboard that nobody opens is worse than no dashboard, because it creates a false sense of monitoring. Set Grafana's alert rules to send notifications — MQTT messages, emails, or webhook calls — when thresholds are breached. Don't rely on someone staring at a graph. The graph is for diagnosis. The alert is for detection.
Grafana handles alerting natively. Set up an alert rule for high temperature:
For MQTT-based alerting without Grafana's alert engine, add this to a separate subscriber:
#!/usr/bin/env python3
"""Threshold alerter — subscribes to readings, publishes alerts."""
import json
import paho.mqtt.client as mqtt
TEMP_HIGH = 28.0
TEMP_LOW = 16.0
HUMIDITY_HIGH = 70.0
alert_client = mqtt.Client(client_id="env-alerter")
alert_client.connect("localhost", 1883, 60)
def on_message(client, userdata, msg):
data = json.loads(msg.payload.decode())
value = data["value"]
location = data["location"]
metric = msg.topic.split("/")[-1]
alert = None
if metric == "temperature" and value > TEMP_HIGH:
alert = f"HIGH TEMP: {value}°C at {location} (threshold: {TEMP_HIGH}°C)"
elif metric == "temperature" and value < TEMP_LOW:
alert = f"LOW TEMP: {value}°C at {location} (threshold: {TEMP_LOW}°C)"
elif metric == "humidity" and value > HUMIDITY_HIGH:
alert = f"HIGH HUMIDITY: {value}% at {location} (threshold: {HUMIDITY_HIGH}%)"
if alert:
print(f"ALERT: {alert}")
alert_client.publish("environment/alerts", json.dumps({
"alert": alert,
"metric": metric,
"value": value,
"location": location,
"timestamp": data["timestamp"]
}), qos=1)
client = mqtt.Client(client_id="env-monitor-alerter")
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.subscribe("environment/+")
print("Alert monitor started")
client.loop_forever()
This alerter is itself an MQTT subscriber and publisher. It subscribes to sensor readings, checks thresholds, and publishes to environment/alerts. A phone notification service, a Home Assistant automation, or even a relay-controlling Pi could subscribe to that alerts topic and act on it. Every component in the chain is replaceable because every component communicates through the broker.
Every component in the monitoring stack is replaceable because every component communicates through the broker, not through direct function calls.
The architecture handles multiple sensors without modification. Add a second sensor — say, a soil moisture probe in a greenhouse:
SENSOR_LOCATION = "greenhouse" and publish to environment/soil-moistureenvironment/# — the new topic is ingested automaticallyenvironment/soil-moistureNo code changes to existing components. No database migrations. No configuration file edits on the storage or visualization layer. The MQTT topic hierarchy absorbed the new sensor type with zero friction. This is the Sensor Pipeline framework in action — each stage is independent, and new data sources plug in at the publish stage without touching anything downstream.
This pattern scales to any sensor type. Temperature, humidity, soil moisture, air quality, light level, sound level — the pipeline is identical. Only the reader script and the MQTT topic change.
Create the directory structure, save the configuration files, and run docker compose up -d. Verify all four containers (Mosquitto, InfluxDB, Telegraf, Grafana) are running with docker compose ps. Open Grafana at http://<pi-ip>:3000 and confirm the InfluxDB datasource is provisioned.
Save sensor_reader.py with USE_REAL_SENSOR = False and run it. Watch MQTT messages arrive with mosquitto_sub -t "environment/#" -v. Confirm data appears in InfluxDB by querying the environment bucket in the InfluxDB UI at http://<pi-ip>:8086.
Import the dashboard JSON from this chapter or create panels manually with the Flux queries provided. Let the system run for an hour and watch the simulated daily temperature curve form. Set a low threshold alert (say, 21 degrees) and confirm it fires.
Save the alerter script, run it alongside the sensor reader, and subscribe to environment/alerts in a separate terminal. Lower the temperature threshold until the simulated readings trigger it. Confirm the alert message arrives on the alerts topic.
Create service files for the sensor reader and the alerter (following the pattern from Chapter 23). Enable them, reboot the Pi, and confirm both services start automatically. The Docker Compose stack already restarts via restart: unless-stopped. After a reboot, the entire monitoring system should be running without manual intervention.
Wire a DHT22 to GPIO 4 with a 10K pull-up resistor to 3.3V. Install adafruit-circuitpython-dht and libgpiod2. Set USE_REAL_SENSOR = True in the sensor reader. The MQTT messages, database schema, dashboard, and alerter remain identical — only the source of the data changes.
The enterprise monitoring vendors aren't selling hardware. They're selling the integration between sensors, databases, and dashboards. You just built that integration with a Docker Compose file and two Python scripts. The sensor cost $4. The infrastructure cost $0. The pattern works for any physical quantity you can measure.
The enterprise vendors aren't selling hardware — they're selling the integration. You just built that integration with a Docker Compose file and two Python scripts.