Importance of real-time data processing in manufacturing
In the Industrial Internet of Things (IIoT) space, real-time data processing is vital for unlocking operational efficiencies. By combining platforms like EMQX for industrial data streaming and RisingWave for real-time analytics, manufacturers can tap into machine-generated data as it happens, enabling predictive maintenance, reduced downtime, and improved efficiency. This integrated approach allows industries to respond swiftly to equipment failures, optimize production, and make data-driven decisions that boost overall equipment effectiveness (OEE) and operational agility.
Overview of technologies used
EMQ is a global leader in providing edge-cloud connectivity and data platform solutions. EMQ facilitates data collection, transmission, transformation, storage, analysis, and control between the physical devices and digital systems with its high-performance MQTT messaging platform EMQX and other software solutions. EMQ helps businesses tackle data challenges across various scenarios, such as connected vehicles, smart factories, smart buildings, retail, robotics, drones, maritime shipping, and large-scale distributed energy device networks (including renewable energy grids, distributed energy storage, transmission and distribution power grid, gas and water networks).
RisingWave is a SQL-based platform for event-driven workloads, designed to handle large volumes of real-time data efficiently. It offers robust connectors for various data systems, Postgres compatibility, and delivers low-latency results using real-time materialized views. With simple scaling and seamless integration, it is ideal for use cases such as predictive maintenance, quality control, supply chain optimization, energy management, and production line optimization.
Understanding the technical stack
In this blog post, we develop a real-time monitoring, predictive maintenance, and anomaly detection system for PBL86-80 motors used in robotic solutions in manufacturing. This system collects data from the motors on a factory-floor, sends it to an EMQX broker running in EMQX Cloud, and ingests it into RisingWave for advanced real-time analytics for monitoring, predictive maintenance and anomaly detection. Then, Grafana is used to create charts and real-time dashboard that monitors the factory shop floor.
Set up EMQX
First, create an EMQX broker on EMQX Cloud and connect it to RisingWave for data ingestion. For more information on data ingestion from EMQX, please refer to this guide to get started with EMQX.
With EMQX running on EMQX Cloud, we are now ready to ingest shop-floor data from the factory floor into the EMQX broker using the Paho Python client.
Here is a sample of the shop-floor data for electric motors in JSON format:
{
"machine_id": "machine_1",
"winding_temperature": 80,
"ambient_temperature": 40,
"vibration_level": 1.97,
"current_draw": 14.43,
"voltage_level": 50.37,
"nominal_speed": 4207.69,
"power_consumption": 646.32,
"efficiency": 82.88,
"ts": "2024-09-09 09:57:51"
}
Data ingestion process
We’ll use RisingWave to ingest and analyze the events from EMQX broker. In this blog, we use RisingWave Cloud, which provides a user-friendly experience and simplifies the operational aspects of managing and utilizing RisingWave for our real-time monitoring and predictive maintenance system.
Sign up for RisingWave Cloud
To create a RisingWave cluster in RisingWave Cloud and explore the various features it offers, you can sign up for the free plan available. The free plan allows you to test the functionalities of RisingWave without any cost.
Create a RisingWave cluster
RisingWave supports various options for creating clusters, select the Developer cluster type that free tier for users.
- Cluster Name: Give your RisingWave cluster a unique name for identification.
- Cloud Provider: Select either AWS or Google Cloud for the RisingWave cluster.
- Region: Choose the region where your RisingWave cluster will be hosted.
With the RisingWave cluster up and running, navigate to the Workspace and connect to the data streams using the SQL statements to create tables, materialized views and sinks.
Once you have deployed the RisingWave cluster, create a source in the Workspace using the following SQL query:
CREATE TABLE shop_floor_machine_data (
machine_id VARCHAR,
winding_temperature INT,
ambient_temperature INT,
vibration_level FLOAT,
current_draw FLOAT,
voltage_level FLOAT,
nominal_speed FLOAT,
power_consumption FLOAT,
efficiency FLOAT,
ts TIMESTAMP
)
WITH (
connector='mqtt',
url='ssl://xxxxxxxxx.us-east-1.emqxsl.com:8883',
topic= 'factory/machine_data',
username='xxxxxx',
password='xxxxxx',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON;
Data transformation in RisingWave
We create a set of materialized views in RisingWave based on the shop_floor_machine_data
source table to facilitate monitoring, predictive maintenance, and anomaly detection. In RisingWave, materialized views are automatically and incrementally updated with each new event, ensuring minimal computing overhead. The RisingWave engine continuously monitors for relevant events from the source after the materialized view is created.
Materialized view for machine metrics aggregation
This query creates a materialized view monitoring_mv
that provides one-minute aggregated metrics for each machine on the shop floor. It calculates the average values for winding temperature, ambient temperature, vibration level, current draw, voltage level, nominal speed, power consumption, and efficiency, grouped by machine ID and time windows (window start and window end).
CREATE MATERIALIZED VIEW monitoring_mv AS
SELECT
machine_id,
AVG(winding_temperature) AS avg_winding_temperature,
AVG(ambient_temperature) AS avg_ambient_temperature,
AVG(vibration_level) AS avg_vibration_level,
AVG(current_draw) AS avg_current_draw,
AVG(voltage_level) AS avg_voltage_level,
AVG(nominal_speed) AS avg_nominal_speed,
AVG(power_consumption) AS avg_power_consumption,
AVG(efficiency) AS avg_efficiency,
window_start,
window_end
FROM TUMBLE (shop_floor_machine_data,ts, INTERVAL '1 MINUTE')
GROUP BY machine_id, window_start,window_end;
Materialized view for maintenance alerts based on recent machine metrics
This query creates a materialized view named maintenance_mv
that combines recent and historical averages for machine metrics, such as winding temperature, vibration level, current draw, power consumption, and efficiency. It generates alerts for potential maintenance issues by comparing recent statistics against historical averages, flagging conditions like overheating, increased vibration, overcurrent, or efficiency drops. The results are ordered by the end of the time window for easy monitoring.
CREATE MATERIALIZED VIEW maintenance_mv AS
WITH Historical_Averages AS (
SELECT
machine_id,
AVG(winding_temperature) AS avg_winding_temp,
AVG(vibration_level) AS avg_vibration,
AVG(current_draw) AS avg_current_draw,
AVG(power_consumption) AS avg_power_consumption,
AVG(efficiency) AS avg_efficiency
FROM shop_floor_machine_data
WHERE ts < NOW() - INTERVAL '1' HOUR -- Historical data for the last 1 hour
GROUP BY machine_id
),
Recent_Stats AS (
SELECT
machine_id,
COUNT(*) AS event_count,
window_start,
window_end,
AVG(winding_temperature) AS avg_winding_temp,
AVG(vibration_level) AS avg_vibration,
AVG(current_draw) AS avg_current_draw,
AVG(power_consumption) AS avg_power_consumption,
AVG(efficiency) AS avg_efficiency
FROM TUMBLE (shop_floor_machine_data,ts, INTERVAL '1 MINUTES')
GROUP BY machine_id, window_start,window_end
)
SELECT
r.machine_id,
r.window_start,
r.window_end,
r.avg_winding_temp,
r.avg_vibration,
r.avg_current_draw,
r.avg_power_consumption,
r.avg_efficiency,
CASE
WHEN r.avg_winding_temp > h.avg_winding_temp + 5 THEN 'Potential Overheating'
WHEN r.avg_vibration > h.avg_vibration + 0.1 THEN 'Increased Vibration'
WHEN r.avg_current_draw > h.avg_current_draw + 2 THEN 'Possible overcurrent condition'
WHEN r.avg_efficiency < h.avg_efficiency - 5 THEN 'Efficiency Drop'
ELSE 'Normal'
END AS maintenance_alert
FROM
Recent_Stats r
JOIN
Historical_Averages h
ON
r.machine_id = h.machine_id
WHERE
r.avg_winding_temp > h.avg_winding_temp + 5 OR
r.avg_vibration > h.avg_vibration + 0.1 OR
r.avg_current_draw > h.avg_current_draw + 2 OR
r.avg_efficiency < h.avg_efficiency - 5
ORDER BY
r.window_end DESC;
Materialized view for anomaly detection in machine metrics
This query creates a materialized view named anomalies_mv
that identifies anomalies in machine metrics by analyzing recent data. It computes average values, standard deviations, and maximums for metrics such as winding temperature, vibration level, current draw, voltage level, and power consumption over one-minute intervals. By comparing current metrics with historical data, it generates alerts for significant deviations and rising trends, filtering the results to highlight only notable anomalies and ordering them by the time window's end.
CREATE MATERIALIZED VIEW anomalies_mv AS
WITH Anomaly_Metrics AS (
SELECT
machine_id,
window_start,
window_end,
AVG(winding_temperature) AS avg_winding_temp,
AVG(vibration_level) AS avg_vibration,
AVG(current_draw) AS avg_current_draw,
AVG(voltage_level) AS avg_voltage_level,
AVG(power_consumption) AS avg_power_consumption,
STDDEV_POP(winding_temperature) AS stddev_winding_temp,
STDDEV_POP(vibration_level) AS stddev_vibration,
STDDEV_POP(current_draw) AS stddev_current_draw,
STDDEV_POP(voltage_level) AS stddev_voltage_level,
STDDEV_POP(power_consumption) AS stddev_power_consumption,
MAX(winding_temperature) AS max_winding_temp,
MAX(vibration_level) AS max_vibration,
MAX(current_draw) AS max_current_draw,
MAX(voltage_level) AS max_voltage_level,
MAX(power_consumption) AS max_power_consumption
FROM TUMBLE (shop_floor_machine_data,ts, INTERVAL '1 MINUTES')
GROUP BY machine_id, window_start,window_end
),
Trend_Analysis AS (
SELECT
machine_id,
window_start,
window_end,
avg_winding_temp,
avg_vibration,
avg_current_draw,
avg_voltage_level,
avg_power_consumption,
stddev_winding_temp,
stddev_vibration,
stddev_current_draw,
stddev_voltage_level,
stddev_power_consumption,
max_winding_temp,
max_vibration,
max_current_draw,
max_voltage_level,
max_power_consumption,
LAG(avg_winding_temp, 1) OVER (PARTITION BY machine_id ORDER BY window_end) AS prev_avg_winding_temp,
LAG(avg_vibration, 1) OVER (PARTITION BY machine_id ORDER BY window_end) AS prev_avg_vibration,
LAG(avg_current_draw, 1) OVER (PARTITION BY machine_id ORDER BY window_end) AS prev_avg_current_draw,
LAG(avg_voltage_level, 1) OVER (PARTITION BY machine_id ORDER BY window_end) AS prev_avg_voltage_level,
LAG(avg_power_consumption, 1) OVER (PARTITION BY machine_id ORDER BY window_end) AS prev_avg_power_consumption
FROM Anomaly_Metrics
)
SELECT
machine_id,
window_start,
window_end,
avg_winding_temp,
avg_vibration,
avg_current_draw,
avg_voltage_level,
avg_power_consumption,
stddev_winding_temp,
stddev_vibration,
stddev_current_draw,
stddev_voltage_level,
stddev_power_consumption,
max_winding_temp,
max_vibration,
max_current_draw,
max_voltage_level,
max_power_consumption,
CASE
WHEN max_winding_temp > avg_winding_temp + 3 * stddev_winding_temp THEN 'Anomalous Winding Temperature'
WHEN max_vibration > avg_vibration + 3 * stddev_vibration THEN 'Anomalous Vibration Level'
WHEN max_current_draw > avg_current_draw + 3 * stddev_current_draw THEN 'Anomalous Current Draw'
WHEN max_voltage_level > avg_voltage_level + 3 * stddev_voltage_level THEN 'Anomalous Voltage Level'
WHEN max_power_consumption > avg_power_consumption + 3 * stddev_power_consumption THEN 'Anomalous Power Consumption'
WHEN (avg_winding_temp - prev_avg_winding_temp) > 2 * stddev_winding_temp THEN 'Rising Winding Temperature'
WHEN (avg_vibration - prev_avg_vibration) > 2 * stddev_vibration THEN 'Increasing Vibration'
WHEN (avg_current_draw - prev_avg_current_draw) > 2 * stddev_current_draw THEN 'Rising Current Draw'
WHEN (avg_voltage_level - prev_avg_voltage_level) > 2 * stddev_voltage_level THEN 'Rising Voltage Level'
WHEN (avg_power_consumption - prev_avg_power_consumption) > 2 * stddev_power_consumption THEN 'Rising Power Consumption'
ELSE 'Normal'
END AS anomaly_alert
FROM Trend_Analysis
WHERE
max_winding_temp > avg_winding_temp + 3 * stddev_winding_temp OR
max_vibration > avg_vibration + 3 * stddev_vibration OR
max_current_draw > avg_current_draw + 3 * stddev_current_draw OR
max_voltage_level > avg_voltage_level + 3 * stddev_voltage_level OR
max_power_consumption > avg_power_consumption + 3 * stddev_power_consumption OR
(avg_winding_temp - prev_avg_winding_temp) > 2 * stddev_winding_temp OR
(avg_vibration - prev_avg_vibration) > 2 * stddev_vibration OR
(avg_current_draw - prev_avg_current_draw) > 2 * stddev_current_draw OR
(avg_voltage_level - prev_avg_voltage_level) > 2 * stddev_voltage_level OR
(avg_power_consumption - prev_avg_power_consumption) > 2 * stddev_power_consumption
ORDER BY
window_end DESC;
Visualizing data with Grafana
To use RisingWave as a data source in Grafana and create visualizations and dashboards, follow the steps in Configure Grafana to read data from RisingWave.
Once the connection between RisingWave and Grafana is set up, you can use materialized views from RisingWave as tables to design charts and build a complete dashboard.
Table, charts, and dashboards
This table is generated from the previously created shop_floor_machine_data
table, which stores sensor readings and operational metrics from machines on the shop floor. Each record represents a machine's performance at a specific point in time.
This chart is generated from the anomalies_mv
materialized view, which displays alerts like "Anomalous Vibration Level" or "Rising Power Consumption" triggered when deviations exceed predefined thresholds. These alerts highlight unusual machine behavior, enabling real-time monitoring and predictive maintenance.
This chart is based on the maintenance_mv
materialized view, which tracks real-time machine performance metrics and compares them against historical averages to detect potential maintenance issues. When specific thresholds are exceeded — such as a winding temperature increase of more than 5 degrees or an efficiency drop of over 5% — the system triggers alerts like "Potential Overheating" or "Efficiency Drop." This enables proactive maintenance, helping prevent equipment failures.
This chart is based on the shop_floor_machine_data
table and displays the winding temperatures of the machines on the shop floor. If the winding temperature exceeds the threshold of 81°C, the corresponding data points are highlighted in red to indicate a potential issue.
This chart is based on the shop_floor_machine_data
table and illustrates the vibration levels of the machines on the shop floor. If the vibration level exceeds the threshold of 2.05, it indicates potential mechanical issues that may require attention.
This pie chart is generated using data from the shop_floor_machine_data
table and represents the distribution of ambient temperatures across the factory floor.
This bar chart is generated using data from the shop_floor_machine_data
table and displays the nominal speed and power consumption of the machines. A red line is included to indicate the threshold of 4210, serving as a reference point for comparison.
This chart is based on the shop_floor_machine_data
table and displays the efficiency of all the machines. If the efficiency drops below the threshold of 80, the affected data points are highlighted in red to indicate suboptimal performance.
This is a real-time unified dashboard that monitors the factory shop floor, providing a comprehensive view of operational performance. It displays alerts related to predictive maintenance and anomaly detection, enabling quick identification of potential issues and facilitating proactive management of equipment health.
>
In this blog, we demonstrated a real-time monitoring and predictive maintenance system for PBL86-80 motors, leveraging EMQX Cloud and RisingWave for real-time analytics. By integrating these platforms, manufacturers can collect motor data, detect anomalies, and perform predictive maintenance, all while visualizing insights through Grafana dashboards. > >
>
The combination of EMQX and RisingWave offers a powerful solution for IIoT and manufacturing use cases, enabling scalable and cost-effective anomaly detection, predictive maintenance, and real-time monitoring. We invite readers to explore these platforms and unlock their potential for building innovative IIoT solutions. > >
>
To get started with RisingWave, create a cluster in RisingWave Cloud and explore its features with the free plan. For step-by-step guidance, refer to the official RisingWave documentation. If you need further assistance with setting up this integration, join our active Slack community for support. > >