Energy companies now require real-time data processing technologies more than ever to continuously optimize operations, anticipate maintenance needs, and guarantee grid stability to prevent outages. In particular, real-time analytics can enable:
- Grid monitoring and optimization
- Predictive maintenance
- Renewable energy integration
- Customized customer experiences
While energy companies recognize this need, few have made the transition from legacy batch systems to stream processing systems. The primary barrier is that stream processing systems tend to be difficult to deploy and maintain.
RisingWave is a streaming database purpose-built to make stream processing more accessible. Using SQL, you can set up streaming data pipelines within RisingWave to ingest, filter, and transform event streams on the fly.
In this post, we will explore how RisingWave supports real-time analytics for the energy sector. We will also demonstrate how to construct a data pipeline for monitoring power grids in real time.
Typical Technical Stack for Real-Time Energy Grid Monitoring
To effectively monitor a power grid in real time, a data pipeline tailored for stream processing is essential.
The typical technical stack includes the following systems:
- Apache Kafka: Event Streaming Platform
Kafka serves as an event streaming platform for logging, managing, and organizing data streams. Alternatives like Pulsar and NATS can also be utilized.
- PostgreSQL: Relational Database Management System
PostgreSQL is employed to store historical and customer data, providing a robust relational database solution.
- RisingWave: SQL Streaming Database
RisingWave is a PostgreSQL-compatible, distributed streaming database that operates using SQL queries, ensuring a user-friendly experience. While stream processors like Flink and additional data stores can be used, they often introduce greater complexity in deployment.
- Grafana: Monitoring and Visualization Platform
Grafana enables the creation of interactive dashboards for tracking metrics and receiving alerts on anomalies or specific events.
In summary, RisingWave ingests data from Kafka topics and historical data from PostgreSQL. It then joins these data streams to create an enriched data stream, transforms the data for specific metrics, and streams the results to Grafana for live dashboarding.
Tutorial: Monitoring Power Grid in Real Time with RisingWave
To see real-time data analysis in action, check out our runnable demo available in our GitHub repository, which incorporates all the platforms discussed.
To run the demo, follow these steps after cloning the repository to your device:
- Ensure Docker is installed and running. Navigate to the
energy_grid
folder, then run the following command in a terminal window to start all services:docker-compose up -d
- In a separate terminal window, connect to the RisingWave instance using this command:
psql -h localhost -p 4566 -d dev -U root
Continue reading this blog to learn how to effectively process, join, and transform streaming data. You can execute the included SQL queries as they are.
Data sources
We utilize several data sources to simulate a power grid's energy production and consumption, as well as to track individual households. Below is a brief overview of the data schema and how to ingest it into RisingWave.
PostgreSQL Table
A PostgreSQL table stores information about each household, featuring a unique customer ID and electric meter ID. Households can choose between two price plans: tiered or time-of-use.
- Tiered Plan: The price per unit remains constant until the household’s net power usage exceeds a specific threshold.
- Time-of-Use Plan: Energy prices fluctuate throughout the day, with higher rates during peak hours (4 PM - 8 PM).
Customer ID | Unique string |
Meter ID | Unique string |
Price plan | String: tiered or time of use |
Address | String |
Kafka Topics
Two data generators write to two Kafka topics to represent households generating and consuming electricity. Each message is timestamped one minute apart for extended monitoring of power usage.
- Consumption Topic: Logs how much power a household has consumed in the past minute. Each record includes a timestamp, meter ID, and energy consumed in kilowatt-hours (kWh). Field Type Timestamp Timestamptz Meter ID String Energy Consumed Float
- Production Topic: Logs how much energy a household has produced in the past minute, assuming a single energy production source measured in kWh.
This setup provides a comprehensive view of energy dynamics within the simulated power grid.
Timestamp | Timestamptz |
Meter ID | String |
Energy produced | Float |
Ingest data into RisingWave
To ingest data from PostgreSQL and Kafka into RisingWave, we need to run some SQL queries in RisingWave.
RisingWave ingests CDC data from PostgreSQL.
First, we use the CREATE SOURCE
command to establish a connection with the PostgreSQL database. Since the demo is deployed through Docker, the hostname is host.docker.internal
and the port is set at 5433
.
Next, the CREATE TABLE
command is used to ingest data from a specific table. In this case, the customers
table created in RisingWave is consuming CDC data from the customers
table in PostgreSQL. When defining the data schema of the table, a primary key must be identified and it must be consistent with the upstream table.
CREATE SOURCE pg_mydb WITH (
connector = 'postgres-cdc',
hostname = 'host.docker.internal',
port = '5433',
username = 'myuser',
password = '123456',
database.name = 'mydb'
);
CREATE TABLE customers (
customer_id int,
meter_id int,
address varchar,
price_plan varchar,
PRIMARY KEY (customer_id)
) FROM pg_mydb TABLE 'public.customers';
For more details on ingesting CDC data from PostgreSQL, see Ingest data from PostgreSQL CDC.
To ingest data from Kafka topics into RisingWave
We can use either the CREATE SOURCE
or CREATE TABLE
command. In this case, either can be used but the difference is that a table persists the data in RisingWave while a source does not. For more details on the difference between a source and table when ingesting data streams, see Ingest data from external systems.
The bootstrap server of the Kafka instance is set at kafka:9092
. There are no security protocols set up in this case for simplicity, but TLS/SSL encryption and SASL authentication are supported.
CREATE SOURCE energy_consume (
consumption_time timestamptz,
meter_id integer,
energy_consumed double precision
) WITH (
connector = 'kafka',
topic = 'energy_consumed',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
CREATE SOURCE energy_produce (
production_time timestamptz,
meter_id integer,
energy_produced double precision
) WITH (
connector = 'kafka',
topic = 'energy_produced',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
For more details on ingesting data from Kafka topics, see Ingest data from Kafka.
Create materialized views to monitor and analyze the power grid
Once all our sources are set up, we can start creating materialized views in RisingWave to process, join, and transform the data streams. A materialized view stores the results of a view query and the results are updated whenever new, relevant events arrive in the system. It relies on incremental computation, which significantly saves on computing costs and resources, and allows you to instantly access the results.
Monitor energy usage patterns
To transform our data such that it is easier to manage, we create a materialized view that joins the energy_consume
and energy_produce
sources and calculates the net energy consumed per household. The net energy consumed is found by subtracting the amount of energy produced from the amount of energy consumed. We also employ the TUMBLE
function to assign all records into a five-minute time window before grouping by the time window to find the net energy consumed.
CREATE MATERIALIZED VIEW energy_per_house AS
SELECT
consumed.meter_id,
energy_consumed,
energy_produced,
energy_consumed - energy_produced AS total_energy,
consumed.window_end
FROM
(
SELECT
meter_id,
SUM(energy_consumed) AS energy_consumed,
window_end
FROM
TUMBLE(
energy_consume,
consumption_time,
INTERVAL '5' MINUTE)
GROUP BY
meter_id,
window_end
) AS consumed
JOIN (
SELECT
meter_id,
SUM(energy_produced) AS energy_produced,
window_end
FROM
TUMBLE(
energy_produce,
production_time,
INTERVAL '5' MINUTE)
GROUP BY
meter_id,
window_end
) AS produced ON consumed.meter_id = produced.meter_id
AND consumed.window_end = produced.window_end;
Next, we can monitor the monthly power usage per household. As new data is streamed in, these values will update to show how much energy each household has consumed so far this month.
CREATE MATERIALIZED VIEW energy_per_month AS
SELECT
meter_id,
SUM(total_energy) AS total_energy,
date_trunc('month', window_end) AS month,
date_trunc('year', window_end) AS year
FROM energy_per_house
GROUP BY meter_id, date_trunc('month', window_end), date_trunc('year', window_end);
To check the results of the materialized view, query from it. These results will automatically and continuously update as new data is ingested.
SELECT * FROM energy_per_month LIMIT 5;
meter_id | total_energy | month | year
----------+--------------------+---------------------------+---------------------------
8 | 17.261999999999983 | 1997-05-01 00:00:00+00:00 | 1997-01-01 00:00:00+00:00
13 | 17.245999999999974 | 1997-05-01 00:00:00+00:00 | 1997-01-01 00:00:00+00:00
18 | 17.22899999999998 | 1997-05-01 00:00:00+00:00 | 1997-01-01 00:00:00+00:00
2 | 17.263999999999974 | 1997-05-01 00:00:00+00:00 | 1997-01-01 00:00:00+00:00
7 | 17.177999999999972 | 1997-05-01 00:00:00+00:00 | 1997-01-01 00:00:00+00:00
(5 rows)
Now that we have mapped the usage pattern for each household, we can further transform the data to keep track of the monthly energy bills.
Estimate monthly bills
For this demo, the billing cycle will be set for each month instead of a set number of days. To do so, we need a function that returns the number of days for each month. Since there is no built-in function in RisingWave that performs this task, we can create a user-defined function (UDF). We will define the count_days
function, which takes in a timestamptz
value as input and returns the number of days in the given month.
UDFs allow us to perform calculations and transformations that are not supported by any existing function. Within RisingWave, UDFs can also be defined in Python, JavaScript, or Rust. Or, you can define them externally using Python or Java to employ external services, such as APIs or libraries. For more information on UDFs, see User-defined functions.
CREATE FUNCTION count_days(input_date timestamptz)
RETURNS NUMERIC LANGUAGE SQL AS
$$SELECT EXTRACT(DAY FROM (DATE_TRUNC('month', input_date) + INTERVAL '1 month' - INTERVAL'1 day'))$$;
Since there are two payment plans, we separate the customers into two groups based on the plan they are on to simplify the process. We create two materialized views by joining the energy_per_house
materialized view with the customers
table and filtering on the specified payment plan.
CREATE MATERIALIZED VIEW tiered_meters AS
SELECT
customers.meter_id,
total_energy,
window_end
FROM energy_per_house
LEFT JOIN customers ON energy_per_house.meter_id = customers.meter_id
WHERE customers.price_plan = 'tier';
CREATE MATERIALIZED VIEW tou_meters AS
SELECT
customers.meter_id,
total_energy,
window_end
FROM energy_per_house
LEFT JOIN customers ON energy_per_house.meter_id = customers.meter_id
WHERE customers.price_plan = 'time of use';
Tiered customers
Let us calculate the electricity bill for each household on the tiered payment plan. For this payment plan, each kWh consumed under 200 kWh is priced at 20 cents and each kWh consumed above 200 is priced at 40 cents. Feel free to adjust these parameters as you see fit or add more energy usage tiers.
CREATE MATERIALIZED VIEW current_bill_tiered AS
WITH monthly_consumption AS (
SELECT
meter_id,
date_trunc('month', window_end) AS month,
date_trunc('year', window_end) AS year,
SUM(total_energy) AS total_monthly_energy
FROM
tiered_meters
GROUP BY
meter_id,
date_trunc('month', window_end),
date_trunc('year', window_end)
),
estimated_bills AS (
SELECT
meter_id,
total_monthly_energy,
month,
year,
CASE
WHEN total_monthly_energy <= 200 THEN total_monthly_energy * 0.2
ELSE (200 * 0.20) + ((total_monthly_energy - 200) * 0.4)
END AS estimated_bill_amount
FROM
monthly_consumption
)
SELECT
meter_id,
SUM(estimated_bill_amount) AS current_bill,
month,
year
FROM
estimated_bills
GROUP BY
meter_id,
month, year;
To estimate what the bill might be at the end of the month, we extrapolate from the current energy usage patterns. First, exclude the current date from this process for a more accurate projection. Next, find the average daily energy usage to estimate the total monthly usage. From there, we can calculate the total monthly bill. For simplicity, we assume that usage patterns will stay constant on a daily basis.
CREATE MATERIALIZED VIEW estimated_tier_cost AS
WITH truncated_month AS (
SELECT * FROM tiered_meters
WHERE DATE_TRUNC('day', window_end) < (SELECT MAX(date_trunc('day', window_end)) FROM energy_per_house)
),
daily_consumption AS (
SELECT
meter_id,
DATE_TRUNC('day', window_end) AS days,
SUM(total_energy) AS daily_energy
FROM
truncated_month
GROUP BY
meter_id,
DATE_TRUNC('day', window_end)
),
projected_monthly_consumption AS (
SELECT
meter_id,
SUM(daily_energy) AS total_energy_so_far ,
(SUM(daily_energy) / date_part('day', max(days)))*count_days(DATE_TRUNC('month', days)) AS estimated_monthly_energy,
DATE_TRUNC('month', days) AS month,
DATE_TRUNC('year', days) AS year
FROM
daily_consumption
GROUP BY
meter_id, DATE_TRUNC('month', days), DATE_TRUNC('year', days)
),
estimated_bills AS (
SELECT
meter_id,
estimated_monthly_energy,
CASE
WHEN estimated_monthly_energy <= 200 THEN estimated_monthly_energy * 0.2
ELSE (200 * 0.20) + ((estimated_monthly_energy - 200) * 0.4)
END AS estimated_bill_amount,
month,
year
FROM
projected_monthly_consumption
)
SELECT
meter_id,
SUM(estimated_bill_amount) AS estimated_total_bill,
sum(estimated_monthly_energy) AS estimated_total_energy,
month
FROM
estimated_bills
GROUP BY
meter_id, month;
Time-of-use customers
To calculate the monthly bill for time-of-use customers, we create a materialized view that tracks the hourly expenditure for each household. We price each kWh consumed between 4 pm and 8 pm at 40 cents, and at all other times of the day, each kWh will be priced at 20 cents. Then we group by the month to find the monthly bill.
CREATE MATERIALIZED VIEW current_bill_tou AS
WITH hourly_cost AS (
SELECT
meter_id,
date_trunc('month', window_end) AS month,
date_trunc('year', window_end) AS year,
CASE
WHEN date_part('hour', window_end) BETWEEN 16 AND 20 THEN total_energy * 0.4
ELSE total_energy * 0.2
END AS cost
FROM
tou_meters
),
month_cost AS (
SELECT
meter_id,
month,
year,
SUM(cost) AS monthly_cost
FROM
hourly_cost
GROUP BY
meter_id, month, year
)
SELECT
month_cost.meter_id,
monthly_cost,
month_cost.month,
month_cost.year
FROM
month_cost
LEFT JOIN energy_per_month
ON month_cost.meter_id = energy_per_month.meter_id
AND month_cost.month = energy_per_month.month
AND month_cost.year = energy_per_month.year;
To estimate the bill at the end of the month for each household, calculate the average daily amount each household is paying. Then, we can estimate their bill at the end of the month, assuming that their energy usage patterns stay consistent. Like before, we exclude the most recent date when performing this calculation.
CREATE MATERIALIZED VIEW estimated_tou_cost AS
WITH truncated_month AS (
SELECT * FROM tou_meters
WHERE DATE_TRUNC('day', window_end) < (select max(date_trunc('day', window_end)) from energy_per_house)
),
daily_energy_consumption_hourly AS (
SELECT
meter_id,
SUM(total_energy) AS daily_energy,
date_part('hour', window_end) AS hour,
DATE_part('day', window_end) AS day,
DATE_TRUNC('month', window_end) AS month,
DATE_TRUNC('year', window_end) AS year
FROM
truncated_month
GROUP BY
meter_id,
date_part('hour', window_end),
DATE_part('day', window_end),
date_trunc('month', window_end),
date_trunc('year', window_end)
)
SELECT
meter_id,
(SUM(
CASE
WHEN hour BETWEEN 16 AND 20 THEN daily_energy * 0.4
ELSE daily_energy * 0.2
END
) / max(day)) * count_days(month) AS estimated_monthly_bill,
(sum(daily_energy)/max(day))*count_days(month) as estimated_total_energy,
month
FROM
daily_energy_consumption_hourly
GROUP BY
meter_id,
month;
To view the results of any materialized view we just created, query from them using a SELECT
statement. While RisingWave is responsible for processing the data streams, we need a way to present the results in a digestible way.
Gain insights by creating a dashboard
We use Grafana as our visualization platform to easily monitor the grid and keep track of energy usage patterns.
Open Grafana at localhost:3000 and navigate to the pre-built dashboard titled ‘Grid Monitoring’. In the Docker Compose demo, Grafana is already configured to read data from RisingWave. To learn more about the setup process, see Configure Grafana to read data from RisingWave.
The pre-built dashboard has a few panels already built so let’s walk through what they display.
This first panel monitors the energy usage patterns with meter ID 8. Around noon, the net energy consumed by this household is close to zero. Since there is a renewable energy component in this demo, we can assume that the amount of energy generated during this time is equal to the amount of energy consumed.
Additionally, we can monitor the grid as a whole, keeping track of when energy usage peaks and dips throughout the month. If energy consumption starts rising too high, we can be immediately alerted and take necessary actions.
This panel provides a quick overview of how much total energy all households consume in each hour, day, and month. The energy usage pattern is consistent, with dips around noon, and higher net usage from 6 pm to 6 am.
Lastly, we can observe the current bill and the estimated monthly bill of all households. Based on the current usage patterns, the time-of-use payment plan would be more appealing to customers. These insights help customers stay up to date on their energy usage patterns and bill information.
These are some basic insights to gain from the dashboard. Feel free to continue experimenting with the data. You can also track the rate of change in the amount of energy used by the grid to see when there is more strain on the power grid.
>
By combining Kafka for data ingestion, PostgreSQL for storage, RisingWave for real-time processing, and Grafana for visualization, we showed how energy companies can build a robust, scalable system for monitoring power grids in real time. This integrated approach enables faster responses to grid issues, more efficient operations, and a better customer experience. > >
>
If you would like to test out this process yourself, see our runnable demo in our GitHub repository. Follow the instructions included to get started. > >
>
To stay up to date on what RisingWave is up to, sign up for our monthly newsletter. Follow us on Twitter and LinkedIn, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts. > >