Factories can reduce energy costs and hit sustainability targets by monitoring consumption at the machine and line level in real time, not from monthly utility bills. RisingWave, a PostgreSQL-compatible streaming database, joins energy-meter streams with production-count streams continuously to compute energy-per-unit, idle-waste alerts, and carbon-intensity metrics as materialized views that update with every new reading.
Why Real-Time Energy Monitoring Matters
Energy is the third-largest cost in most manufacturing operations, after labor and materials. Yet most factories still measure it through monthly utility invoices or, at best, hourly substation readings. That lag makes root-cause analysis nearly impossible—a compressed-air leak that ran for three shifts looks identical to legitimate production ramp-up when you only see daily totals.
Modern factories are increasingly required to report Scope 2 emissions and energy intensity (kWh per unit) to customers, regulators, and ESG frameworks. That means the energy data pipeline must be accurate, auditable, and granular—down to the machine and shift, not the building and month.
The opportunity is also significant. Studies across discrete manufacturing consistently show 20–30% of factory energy consumption occurs during idle or standby periods. Real-time visibility into machine states—running, idle, standby, off—is the prerequisite for acting on that waste before the shift ends.
The Streaming SQL Approach
Smart meters and IoT gateways emit energy readings every 15 seconds to one minute. Production counters emit a unit-completion event after every cycle. RisingWave treats both as Kafka streams, joins them by machine ID and timestamp, and maintains energy-per-unit materialized views that dashboards and EMS platforms query in real time.
The architecture separates concerns cleanly:
- Source views normalize raw meter readings and production events
- Core materialized views compute energy intensity, idle consumption, and shift totals
- Alert views flag threshold violations and sink them to downstream systems
Building It Step by Step
Step 1: Data Source
-- Energy meter readings from IoT gateway (every 30 seconds per meter)
CREATE SOURCE energy_meter_readings (
meter_id VARCHAR,
machine_id VARCHAR,
line_id VARCHAR,
plant_id VARCHAR,
active_power_kw NUMERIC, -- instantaneous kW draw
kwh_cumulative NUMERIC, -- cumulative kWh from meter reset
power_factor NUMERIC,
machine_state VARCHAR, -- 'running', 'idle', 'standby', 'off'
reading_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'factory.energy.meter-readings',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Production completion events from MES
CREATE SOURCE production_completions (
machine_id VARCHAR,
line_id VARCHAR,
plant_id VARCHAR,
part_number VARCHAR,
units_completed INT,
shift_id VARCHAR,
completion_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'mes.production.completions',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Hourly energy consumption summary per machine
CREATE MATERIALIZED VIEW energy_hourly AS
SELECT
machine_id,
line_id,
plant_id,
window_start,
window_end,
AVG(active_power_kw) AS avg_power_kw,
MAX(active_power_kw) AS peak_power_kw,
-- Approximate kWh for the hour = avg kW * fraction of hour per reading
SUM(active_power_kw) * (30.0 / 3600.0) AS kwh_consumed,
AVG(power_factor) AS avg_power_factor,
COUNT(*) FILTER (WHERE machine_state = 'idle') AS idle_readings,
COUNT(*) FILTER (WHERE machine_state = 'running') AS running_readings,
COUNT(*) FILTER (WHERE machine_state = 'standby') AS standby_readings,
COUNT(*) AS total_readings,
-- Idle energy share
ROUND(
COUNT(*) FILTER (WHERE machine_state IN ('idle','standby')) * 100.0
/ NULLIF(COUNT(*), 0), 1
) AS idle_pct
FROM TUMBLE(energy_meter_readings, reading_ts, INTERVAL '1 hour')
GROUP BY machine_id, line_id, plant_id, window_start, window_end;
-- Hourly production totals per machine
CREATE MATERIALIZED VIEW production_hourly AS
SELECT
machine_id,
line_id,
plant_id,
part_number,
window_start,
window_end,
SUM(units_completed) AS units_produced
FROM TUMBLE(production_completions, completion_ts, INTERVAL '1 hour')
GROUP BY machine_id, line_id, plant_id, part_number, window_start, window_end;
-- Energy intensity: kWh per unit produced (the key sustainability KPI)
CREATE MATERIALIZED VIEW energy_intensity AS
SELECT
e.machine_id,
e.line_id,
e.plant_id,
p.part_number,
e.window_start,
e.window_end,
e.kwh_consumed,
p.units_produced,
ROUND(
e.kwh_consumed / NULLIF(p.units_produced, 0),
4
) AS kwh_per_unit,
e.idle_pct,
e.avg_power_factor
FROM energy_hourly e
JOIN production_hourly p
ON e.machine_id = p.machine_id
AND e.window_start = p.window_start;
Step 3: Alerts and Aggregations
-- Idle-waste alert: machine drawing > 5 kW while idle for more than 10 minutes
CREATE MATERIALIZED VIEW idle_waste_alerts AS
SELECT
machine_id,
line_id,
machine_state,
AVG(active_power_kw) AS avg_idle_power_kw,
COUNT(*) AS readings_count,
window_start,
window_end,
-- Estimated waste: idle kW * 10 minutes
ROUND(AVG(active_power_kw) * (10.0 / 60.0), 3) AS estimated_waste_kwh
FROM TUMBLE(energy_meter_readings, reading_ts, INTERVAL '10 minutes')
WHERE machine_state IN ('idle', 'standby')
GROUP BY machine_id, line_id, machine_state, window_start, window_end
HAVING AVG(active_power_kw) > 5.0;
CREATE SINK idle_waste_sink
AS SELECT * FROM idle_waste_alerts
WITH (
connector = 'kafka',
topic = 'factory.alerts.energy',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Plant-level shift energy summary for ESG reporting
CREATE MATERIALIZED VIEW shift_energy_summary AS
SELECT
plant_id,
window_start AS shift_start,
window_end AS shift_end,
SUM(kwh_consumed) AS total_kwh,
SUM(units_produced) AS total_units,
ROUND(SUM(kwh_consumed) / NULLIF(SUM(units_produced), 0), 4) AS plant_kwh_per_unit,
-- CO2e estimate using a grid emission factor (configurable)
ROUND(SUM(kwh_consumed) * 0.233, 2) AS co2e_kg -- 0.233 kg CO2e/kWh example
FROM energy_intensity
GROUP BY plant_id, window_start, window_end;
Comparison Table
| Approach | Granularity | Alert Latency | ESG Reporting Ready | Effort |
| Monthly utility bills | Building/month | None | No | Low |
| Hourly SCADA historian | Machine/hour | 1 hour | Partial | Medium |
| Custom IoT pipeline | Machine/minute | Minutes | Partial | High |
| RisingWave streaming SQL | Machine/30 seconds | Sub-minute | Yes | Low (SQL only) |
FAQ
How do I handle different meter sampling rates across the plant?
RisingWave processes each Kafka partition independently, so meters with different sampling rates coexist in the same topic. The tumbling-window aggregation collects whatever readings arrive within each window regardless of frequency, making the energy-hourly view correct for mixed-rate meters.
Can I correlate energy spikes with specific production orders?
Yes. Add a production_order_id field to the production-completions source, then join energy_meter_readings with production completions on machine_id and a time proximity condition. RisingWave's stream-stream join with an interval condition supports this pattern.
How accurate is the kWh approximation using average power?
For 30-second readings, the trapezoidal approximation (average kW × time interval) is accurate to within 1–2% for typical factory load profiles. If your meters report cumulative kWh registers directly, you can use the delta between consecutive readings for exact values.
Key Takeaways
- Stream energy-meter readings and MES production events into RisingWave to compute energy-per-unit continuously rather than from batch reports.
- Idle-waste alerts fire within 10 minutes of a machine drawing significant power in idle state—actionable within the same shift.
- The
energy_intensityview joins energy and production hourly windows, providing the kWh-per-unit metric needed for ESG reporting and customer sustainability disclosures. - Plant-level CO2e estimates update every hour using configurable grid emission factors, enabling real-time Scope 2 tracking.

