Utilities face growing regulatory and voluntary pressure to report accurate, near-real-time carbon emissions data. With RisingWave, a PostgreSQL-compatible streaming database, you can continuously compute tonne CO₂e emissions from generation telemetry and grid import data—enabling live dashboards, regulatory feeds, and carbon budget alerts without batch processing delays.
Why Real-Time Carbon Tracking Matters
Environmental, Social, and Governance (ESG) commitments and emerging regulations such as the EU Carbon Border Adjustment Mechanism and US SEC climate disclosure rules demand granular, auditable emissions data. The challenge for utilities is threefold:
- Generation fleet diversity — a single utility may operate gas turbines, hydro, wind, and solar assets, each with different emission factors.
- Grid purchases — imported electricity carries a marginal emission rate that fluctuates with the dispatch stack hour by hour.
- Reporting latency — monthly or quarterly batch reporting is too coarse for carbon budget management and intraday trading decisions tied to carbon price.
Streaming SQL enables a continuously updated emissions ledger. Every megawatt-hour generated or imported triggers an incremental calculation in RisingWave's materialized views.
The Streaming SQL Approach
The pipeline ingests generation telemetry from the EMS and grid import data from the ISO/RTO API via Kafka. Emission factors are maintained in a reference table updated via Postgres CDC. RisingWave joins the streams, applies emission factors, and computes rolling totals. Alerts fire when hourly budgets are exceeded.
Step-by-Step Tutorial
Step 1: Data Source Setup
-- Generation asset telemetry (MW output, fuel type, heat rate)
CREATE SOURCE generation_output (
asset_id VARCHAR,
plant_id VARCHAR,
fuel_type VARCHAR, -- NATURAL_GAS | COAL | OIL | BIOMASS | HYDRO | WIND | SOLAR
gross_output_mw DOUBLE PRECISION,
aux_load_mw DOUBLE PRECISION,
heat_rate_mmbtu_mwh DOUBLE PRECISION,
telemetry_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'ems.generation.output',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Grid import data from ISO/RTO (net interchange from adjacent balancing areas)
CREATE SOURCE grid_imports (
interchange_id VARCHAR,
from_balancing_area VARCHAR,
import_mw DOUBLE PRECISION,
marginal_emission_rate_kg_mwh DOUBLE PRECISION, -- published by EPA eGrid or ISO
interval_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'iso.interchange.imports',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Emission factors table via Postgres CDC (kg CO2e per MMBtu, updated periodically)
CREATE SOURCE emission_factors (
fuel_type VARCHAR,
co2_kg_per_mmbtu DOUBLE PRECISION,
ch4_kg_per_mmbtu DOUBLE PRECISION,
n2o_kg_per_mmbtu DOUBLE PRECISION,
gwp_co2e_factor DOUBLE PRECISION, -- 100-year GWP weighting
effective_date DATE
) WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'risingwave',
password = 'password',
database.name = 'emissions_ref',
schema.name = 'public',
table.name = 'emission_factors'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Latest emission factor per fuel type
CREATE MATERIALIZED VIEW latest_emission_factors AS
SELECT DISTINCT ON (fuel_type)
fuel_type,
co2_kg_per_mmbtu,
gwp_co2e_factor,
effective_date
FROM emission_factors
ORDER BY fuel_type, effective_date DESC;
-- Per-asset scope 1 emissions in 5-minute intervals (tonne CO2e)
CREATE MATERIALIZED VIEW asset_scope1_emissions AS
SELECT
g.asset_id,
g.plant_id,
g.fuel_type,
window_start,
window_end,
AVG(g.gross_output_mw - g.aux_load_mw) AS net_output_mw,
AVG(g.heat_rate_mmbtu_mwh) AS avg_heat_rate,
-- energy input (MMBtu) = net MW * heat rate * (5 min / 60)
SUM((g.gross_output_mw - g.aux_load_mw)
* g.heat_rate_mmbtu_mwh * (5.0/60.0)) AS heat_input_mmbtu,
SUM((g.gross_output_mw - g.aux_load_mw)
* g.heat_rate_mmbtu_mwh * (5.0/60.0)
* ef.co2_kg_per_mmbtu
* ef.gwp_co2e_factor / 1000.0) AS scope1_tco2e
FROM TUMBLE(generation_output, telemetry_ts, INTERVAL '5 MINUTES') g
LEFT JOIN latest_emission_factors ef ON g.fuel_type = ef.fuel_type
GROUP BY g.asset_id, g.plant_id, g.fuel_type, window_start, window_end;
-- Scope 2 emissions from grid imports (tonne CO2e)
CREATE MATERIALIZED VIEW grid_scope2_emissions AS
SELECT
interchange_id,
from_balancing_area,
window_start,
window_end,
AVG(import_mw) AS avg_import_mw,
SUM(import_mw * marginal_emission_rate_kg_mwh
* (5.0/60.0) / 1000.0) AS scope2_tco2e
FROM TUMBLE(grid_imports, interval_ts, INTERVAL '5 MINUTES')
GROUP BY interchange_id, from_balancing_area, window_start, window_end;
-- Rolling hourly fleet-wide emissions (HOP window for trend dashboards)
CREATE MATERIALIZED VIEW hourly_fleet_emissions AS
SELECT
window_start,
window_end,
SUM(scope1_tco2e) AS total_scope1_tco2e,
SUM(scope1_tco2e) FILTER (WHERE fuel_type IN ('NATURAL_GAS','COAL','OIL'))
AS fossil_tco2e,
SUM(scope1_tco2e) FILTER (WHERE fuel_type = 'NATURAL_GAS') AS gas_tco2e
FROM HOP(asset_scope1_emissions, window_start, INTERVAL '5 MINUTES', INTERVAL '1 HOUR')
GROUP BY window_start, window_end;
Step 3: Alerting Logic
-- Alert when hourly fleet emissions exceed budget threshold (tonne CO2e)
CREATE MATERIALIZED VIEW emissions_budget_alerts AS
SELECT
window_start,
window_end,
total_scope1_tco2e,
fossil_tco2e,
NOW() AS alert_ts,
'EMISSIONS_BUDGET_BREACH' AS alert_type
FROM hourly_fleet_emissions
WHERE total_scope1_tco2e > 500; -- configure per fleet capacity
-- Sink alert to Kafka for ESG dashboard and regulatory notification
CREATE SINK emissions_alert_sink AS
SELECT * FROM emissions_budget_alerts
WITH (
connector = 'kafka',
topic = 'emissions.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Sink detailed 5-minute asset emissions to Iceberg for audit trail
CREATE SINK emissions_audit_sink AS
SELECT
asset_id, plant_id, fuel_type,
window_start, window_end,
heat_input_mmbtu, scope1_tco2e
FROM asset_scope1_emissions
WITH (
connector = 'iceberg',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-rest:8181',
warehouse.path = 's3://emissions-lake/scope1/',
database.name = 'emissions',
table.name = 'asset_scope1_5min'
);
Comparison Table
| Approach | Emissions Granularity | Latency | Audit Trail | Regulatory Grade |
| Monthly utility report | Monthly | 30–60 days | Manual | Basic |
| Hourly batch ETL | Hourly | 1–2 hours | Limited | Moderate |
| Custom stream processor | 5-minute | Minutes (complex) | Requires separate storage | Good |
| RisingWave streaming SQL | 5-minute | Seconds | Iceberg sink | High |
FAQ
Q: How do I handle emission factor updates mid-month without reprocessing historical data?
A: Emission factors are stored in Postgres and ingested via CDC. When a factor changes, RisingWave propagates the update incrementally through latest_emission_factors and downstream views. For regulatory audit purposes, the Iceberg sink preserves each 5-minute record with the factor that was in effect at computation time.
Q: Can I compute scope 3 emissions (e.g., from fuel supply chains) in RisingWave?
A: Yes. Scope 3 categories that have structured upstream data—such as fuel procurement records arriving via Kafka or a CDC source—can be joined with generation data in a materialized view. Categories that require manual supplier surveys are better handled in the data warehouse layer downstream of the Iceberg sink.
Q: Does RisingWave support the GHG Protocol accounting methodology natively?
A: RisingWave is a general-purpose streaming SQL database. The GHG Protocol methodology is implemented through your SQL logic—choosing market-based vs. location-based emission factors, applying GWP multipliers, and aggregating by organizational boundary. The platform enforces no specific methodology; your SQL defines it.
Key Takeaways
- RisingWave enables continuously updated scope 1 and scope 2 carbon accounting without batch jobs or manual aggregation.
- Temporal joins with CDC-backed emission factor tables ensure calculations always use the latest regulatory values.
- The Iceberg sink creates a tamper-evident audit trail suitable for regulatory disclosure and third-party verification.
- Streaming SQL replaces fragile Python or Spark pipelines with declarative, version-controlled SQL materialized views.

