Predictive maintenance transforms sensor data into maintenance schedules — but only if that data is analyzed fast enough to act on. RisingWave, a PostgreSQL-compatible streaming database, continuously evaluates equipment signals against degradation models in SQL, triggering work orders hours or days before failures occur.
Why Predictive Maintenance Requires Real-Time Analytics
The promise of predictive maintenance is compelling: rather than fixing equipment after it breaks (reactive) or replacing parts on a calendar schedule regardless of condition (preventive), you service equipment exactly when the data says it needs servicing. This minimizes both unplanned downtime and unnecessary maintenance labor and parts cost.
Achieving that promise depends on how quickly sensor data is analyzed. Most industrial facilities already collect sensor data at high frequency — the problem is latency in the analytical pipeline. When sensor data is batch-loaded into a data lake overnight and analyzed by a Python model the following morning, you have a 24-hour analysis lag. A bearing that begins showing early-stage failure signatures at 2 AM is not flagged until the next afternoon. By then, the degradation may have advanced to the point where intervention requires a full replacement rather than a simple bearing swap.
Real-time predictive maintenance closes this loop. The same degradation signals that appear in batch analysis appear in streaming analysis within seconds. A maintenance planner receives a work order recommendation while the equipment is still running smoothly, with days of lead time to procure parts and schedule the downtime window.
How Streaming SQL Supports Predictive Maintenance
RisingWave ingests multi-parameter sensor streams from Kafka and applies feature-computation SQL to produce the inputs that degradation models need: rolling means, rate-of-change, peak-to-peak amplitude, frequency of threshold exceedance. These features are computed continuously by materialized views and exposed through the PostgreSQL interface.
Window functions handle the temporal aggregation: TUMBLE windows for fixed-period features (hourly energy consumption, daily vibration average), HOP windows for overlapping trend analysis (rolling 7-day vibration trend updated hourly). SESSION windows detect event clusters — bursts of high-vibration readings that individually might not trigger a threshold but collectively indicate a developing fault pattern.
Building It Step by Step
Step 1: Create the Data Source
-- Equipment sensor telemetry (multi-parameter)
CREATE SOURCE equipment_sensors (
equipment_id VARCHAR,
plant_id VARCHAR,
sensor_id VARCHAR,
parameter VARCHAR, -- VIBRATION_RMS, TEMP_BEARING, CURRENT_RMS, OIL_PRESSURE, SPEED_RPM
value DOUBLE PRECISION,
unit VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'iot.equipment_sensors',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Historical failure events for supervised learning feature validation
CREATE SOURCE failure_events (
equipment_id VARCHAR,
failure_mode VARCHAR, -- BEARING_WEAR, OVERHEATING, ELECTRICAL_FAULT, LUBRICATION
severity VARCHAR, -- WARNING, CRITICAL, FAILURE
failure_ts TIMESTAMPTZ,
work_order_id VARCHAR
) WITH (
connector = 'kafka',
topic = 'maintenance.failure_events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
-- 1-hour feature set per equipment-parameter (TUMBLE)
CREATE MATERIALIZED VIEW equipment_features_1h AS
SELECT
window_start,
window_end,
equipment_id,
plant_id,
parameter,
AVG(value) AS mean_value,
STDDEV(value) AS std_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
MAX(value) - MIN(value) AS peak_to_peak,
COUNT(*) FILTER (WHERE value > AVG(value) + 2 * STDDEV(value)
OVER (PARTITION BY equipment_id, parameter)) AS exceedance_count,
COUNT(*) AS sample_count
FROM TUMBLE(equipment_sensors, event_ts, INTERVAL '1 HOUR')
GROUP BY window_start, window_end, equipment_id, plant_id, parameter;
-- 24-hour trend: rate of change in mean value (HOP: 1-hour interval, 24-hour width)
CREATE MATERIALIZED VIEW equipment_trend_24h AS
SELECT
window_start,
window_end,
equipment_id,
parameter,
AVG(value) AS daily_mean,
STDDEV(value) AS daily_std,
COUNT(*) AS sample_count,
-- Compare first and second half of the window
AVG(value) FILTER (WHERE event_ts >= window_start + INTERVAL '12 HOURS') -
AVG(value) FILTER (WHERE event_ts < window_start + INTERVAL '12 HOURS') AS half_period_delta
FROM HOP(equipment_sensors, event_ts, INTERVAL '1 HOUR', INTERVAL '24 HOURS')
GROUP BY window_start, window_end, equipment_id, parameter;
Step 3: Add Alerts and Aggregations
-- Bearing failure precursor: vibration RMS rising AND bearing temperature rising
CREATE MATERIALIZED VIEW bearing_precursor_alerts AS
SELECT
v.equipment_id,
v.plant_id,
v.window_end AS detection_ts,
v.mean_value AS vibration_rms,
v.half_period_delta AS vibration_trend,
t.mean_value AS bearing_temp,
t.half_period_delta AS temp_trend,
CASE
WHEN v.half_period_delta > 0.5 AND t.half_period_delta > 2 THEN 'HIGH_RISK'
WHEN v.half_period_delta > 0.2 AND t.half_period_delta > 1 THEN 'MEDIUM_RISK'
ELSE 'LOW_RISK'
END AS risk_level
FROM equipment_trend_24h v
JOIN equipment_trend_24h t
ON t.equipment_id = v.equipment_id
AND t.window_end = v.window_end
AND t.parameter = 'TEMP_BEARING'
WHERE v.parameter = 'VIBRATION_RMS'
AND (v.half_period_delta > 0.2 OR t.half_period_delta > 1);
-- Lubrication alert: oil pressure declining trend
CREATE MATERIALIZED VIEW lubrication_alerts AS
SELECT
equipment_id,
plant_id,
window_end AS detection_ts,
mean_value AS oil_pressure,
half_period_delta AS pressure_trend,
daily_std
FROM equipment_trend_24h
WHERE parameter = 'OIL_PRESSURE'
AND half_period_delta < -5 -- pressure dropping more than 5 units per 12hr period
AND mean_value < 80; -- already below nominal range
-- Electrical fault precursor: current imbalance between phases
CREATE MATERIALIZED VIEW current_imbalance AS
SELECT
window_start,
window_end,
equipment_id,
plant_id,
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A') AS phase_a,
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B') AS phase_b,
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C') AS phase_c,
(MAX(GREATEST(
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A'),
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B'),
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C')
)) - MIN(LEAST(
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A'),
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B'),
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C')
))) AS current_imbalance_pct
FROM equipment_features_1h
WHERE parameter IN ('CURRENT_A', 'CURRENT_B', 'CURRENT_C')
GROUP BY window_start, window_end, equipment_id, plant_id
HAVING (MAX(GREATEST(
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A'),
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B'),
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C')
)) - MIN(LEAST(
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A'),
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B'),
MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C')
))) > 10;
Step 4: Sink Results Downstream
-- Stream maintenance recommendations to CMMS
CREATE SINK maintenance_recommendation_sink
FROM bearing_precursor_alerts
WITH (
connector = 'kafka',
topic = 'maintenance.recommendations.bearing',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Archive features for ML model retraining
CREATE SINK feature_archive_sink
FROM equipment_features_1h
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
database.name = 'predictive_maintenance',
table.name = 'equipment_features_1h'
) FORMAT PLAIN ENCODE JSON;
How This Compares to Traditional Approaches
| Aspect | Batch ML Pipeline | Streaming SQL (RisingWave) |
| Analysis latency | 12–24 hours | Sub-second feature computation |
| Feature engineering | Scheduled Python jobs | Declarative SQL materialized views |
| Alert lead time | Hours after degradation begins | Minutes after degradation begins |
| Model input freshness | Yesterday's features | Current features |
| Ops complexity | Orchestrator + model server | Single SQL deployment |
| Historical retention | Data lake | Iceberg sink from RisingWave |
FAQ
What is predictive maintenance?
Predictive maintenance uses sensor data and analytical models to identify equipment degradation before failure occurs. Unlike time-based preventive maintenance, it schedules interventions based on actual condition signals, reducing both unplanned downtime and unnecessary maintenance activity.
How does RisingWave support machine learning models?
RisingWave continuously computes the feature set (rolling means, standard deviations, rate-of-change, exceedance counts) that ML models need as inputs. These features are exposed through the PostgreSQL interface and can be consumed by external inference services, or rule-based degradation logic can be implemented directly in SQL as shown above.
Can I integrate RisingWave with my existing stack?
Yes. RisingWave connects to Kafka, PostgreSQL via CDC, and MySQL via CDC. It writes computed features and alerts to JDBC databases, Kafka topics, and Iceberg tables. CMMS platforms and ERP systems that expose a Kafka consumer or JDBC interface can receive automated maintenance recommendations.
How far in advance can streaming SQL detect equipment failures?
Detection lead time depends on the degradation rate and the feature-computation window. In practice, vibration and temperature trends computed over rolling 24-hour windows can surface bearing wear signatures 48 to 72 hours before alarm thresholds are reached.
Key Takeaways
- Batch-based predictive maintenance pipelines have a 12–24 hour analysis lag; streaming SQL shrinks that lag to seconds, turning days of lead time into actionable maintenance windows.
- RisingWave's TUMBLE and HOP window functions compute the feature set (rolling mean, standard deviation, rate-of-change) that degradation models require without custom application code.
- Multi-parameter precursor patterns — rising vibration combined with rising bearing temperature — are expressible as SQL JOINs between materialized views.
- Computed features are archived to Iceberg for ML model retraining, creating a closed feedback loop between real-time detection and model improvement.
Ready to try this? Get started with RisingWave. Join our Slack community.

