Predictive Maintenance with Streaming SQL

Predictive Maintenance with Streaming SQL

Predictive maintenance transforms sensor data into maintenance schedules — but only if that data is analyzed fast enough to act on. RisingWave, a PostgreSQL-compatible streaming database, continuously evaluates equipment signals against degradation models in SQL, triggering work orders hours or days before failures occur.

Why Predictive Maintenance Requires Real-Time Analytics

The promise of predictive maintenance is compelling: rather than fixing equipment after it breaks (reactive) or replacing parts on a calendar schedule regardless of condition (preventive), you service equipment exactly when the data says it needs servicing. This minimizes both unplanned downtime and unnecessary maintenance labor and parts cost.

Achieving that promise depends on how quickly sensor data is analyzed. Most industrial facilities already collect sensor data at high frequency — the problem is latency in the analytical pipeline. When sensor data is batch-loaded into a data lake overnight and analyzed by a Python model the following morning, you have a 24-hour analysis lag. A bearing that begins showing early-stage failure signatures at 2 AM is not flagged until the next afternoon. By then, the degradation may have advanced to the point where intervention requires a full replacement rather than a simple bearing swap.

Real-time predictive maintenance closes this loop. The same degradation signals that appear in batch analysis appear in streaming analysis within seconds. A maintenance planner receives a work order recommendation while the equipment is still running smoothly, with days of lead time to procure parts and schedule the downtime window.

How Streaming SQL Supports Predictive Maintenance

RisingWave ingests multi-parameter sensor streams from Kafka and applies feature-computation SQL to produce the inputs that degradation models need: rolling means, rate-of-change, peak-to-peak amplitude, frequency of threshold exceedance. These features are computed continuously by materialized views and exposed through the PostgreSQL interface.

Window functions handle the temporal aggregation: TUMBLE windows for fixed-period features (hourly energy consumption, daily vibration average), HOP windows for overlapping trend analysis (rolling 7-day vibration trend updated hourly). SESSION windows detect event clusters — bursts of high-vibration readings that individually might not trigger a threshold but collectively indicate a developing fault pattern.

Building It Step by Step

Step 1: Create the Data Source

-- Equipment sensor telemetry (multi-parameter)
CREATE SOURCE equipment_sensors (
    equipment_id    VARCHAR,
    plant_id        VARCHAR,
    sensor_id       VARCHAR,
    parameter       VARCHAR,   -- VIBRATION_RMS, TEMP_BEARING, CURRENT_RMS, OIL_PRESSURE, SPEED_RPM
    value           DOUBLE PRECISION,
    unit            VARCHAR,
    event_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'iot.equipment_sensors',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Historical failure events for supervised learning feature validation
CREATE SOURCE failure_events (
    equipment_id    VARCHAR,
    failure_mode    VARCHAR,   -- BEARING_WEAR, OVERHEATING, ELECTRICAL_FAULT, LUBRICATION
    severity        VARCHAR,   -- WARNING, CRITICAL, FAILURE
    failure_ts      TIMESTAMPTZ,
    work_order_id   VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'maintenance.failure_events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Build the Core Materialized View

-- 1-hour feature set per equipment-parameter (TUMBLE)
CREATE MATERIALIZED VIEW equipment_features_1h AS
SELECT
    window_start,
    window_end,
    equipment_id,
    plant_id,
    parameter,
    AVG(value)                                         AS mean_value,
    STDDEV(value)                                      AS std_value,
    MIN(value)                                         AS min_value,
    MAX(value)                                         AS max_value,
    MAX(value) - MIN(value)                            AS peak_to_peak,
    COUNT(*) FILTER (WHERE value > AVG(value) + 2 * STDDEV(value)
                     OVER (PARTITION BY equipment_id, parameter)) AS exceedance_count,
    COUNT(*)                                           AS sample_count
FROM TUMBLE(equipment_sensors, event_ts, INTERVAL '1 HOUR')
GROUP BY window_start, window_end, equipment_id, plant_id, parameter;

-- 24-hour trend: rate of change in mean value (HOP: 1-hour interval, 24-hour width)
CREATE MATERIALIZED VIEW equipment_trend_24h AS
SELECT
    window_start,
    window_end,
    equipment_id,
    parameter,
    AVG(value)                              AS daily_mean,
    STDDEV(value)                           AS daily_std,
    COUNT(*)                                AS sample_count,
    -- Compare first and second half of the window
    AVG(value) FILTER (WHERE event_ts >= window_start + INTERVAL '12 HOURS') -
    AVG(value) FILTER (WHERE event_ts < window_start + INTERVAL '12 HOURS')  AS half_period_delta
FROM HOP(equipment_sensors, event_ts, INTERVAL '1 HOUR', INTERVAL '24 HOURS')
GROUP BY window_start, window_end, equipment_id, parameter;

Step 3: Add Alerts and Aggregations

-- Bearing failure precursor: vibration RMS rising AND bearing temperature rising
CREATE MATERIALIZED VIEW bearing_precursor_alerts AS
SELECT
    v.equipment_id,
    v.plant_id,
    v.window_end                                    AS detection_ts,
    v.mean_value                                    AS vibration_rms,
    v.half_period_delta                             AS vibration_trend,
    t.mean_value                                    AS bearing_temp,
    t.half_period_delta                             AS temp_trend,
    CASE
        WHEN v.half_period_delta > 0.5 AND t.half_period_delta > 2 THEN 'HIGH_RISK'
        WHEN v.half_period_delta > 0.2 AND t.half_period_delta > 1 THEN 'MEDIUM_RISK'
        ELSE 'LOW_RISK'
    END AS risk_level
FROM equipment_trend_24h v
JOIN equipment_trend_24h t
    ON  t.equipment_id = v.equipment_id
    AND t.window_end   = v.window_end
    AND t.parameter    = 'TEMP_BEARING'
WHERE v.parameter = 'VIBRATION_RMS'
  AND (v.half_period_delta > 0.2 OR t.half_period_delta > 1);

-- Lubrication alert: oil pressure declining trend
CREATE MATERIALIZED VIEW lubrication_alerts AS
SELECT
    equipment_id,
    plant_id,
    window_end                                      AS detection_ts,
    mean_value                                      AS oil_pressure,
    half_period_delta                               AS pressure_trend,
    daily_std
FROM equipment_trend_24h
WHERE parameter = 'OIL_PRESSURE'
  AND half_period_delta < -5  -- pressure dropping more than 5 units per 12hr period
  AND mean_value < 80;        -- already below nominal range

-- Electrical fault precursor: current imbalance between phases
CREATE MATERIALIZED VIEW current_imbalance AS
SELECT
    window_start,
    window_end,
    equipment_id,
    plant_id,
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A') AS phase_a,
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B') AS phase_b,
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C') AS phase_c,
    (MAX(GREATEST(
        MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A'),
        MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B'),
        MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C')
    )) - MIN(LEAST(
        MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A'),
        MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B'),
        MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C')
    ))) AS current_imbalance_pct
FROM equipment_features_1h
WHERE parameter IN ('CURRENT_A', 'CURRENT_B', 'CURRENT_C')
GROUP BY window_start, window_end, equipment_id, plant_id
HAVING (MAX(GREATEST(
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A'),
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B'),
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C')
)) - MIN(LEAST(
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_A'),
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_B'),
    MAX(mean_value) FILTER (WHERE parameter = 'CURRENT_C')
))) > 10;

Step 4: Sink Results Downstream

-- Stream maintenance recommendations to CMMS
CREATE SINK maintenance_recommendation_sink
FROM bearing_precursor_alerts
WITH (
    connector = 'kafka',
    topic = 'maintenance.recommendations.bearing',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Archive features for ML model retraining
CREATE SINK feature_archive_sink
FROM equipment_features_1h
WITH (
    connector = 'iceberg',
    type = 'append-only',
    catalog.type = 'rest',
    catalog.uri = 'http://iceberg-catalog:8181',
    database.name = 'predictive_maintenance',
    table.name = 'equipment_features_1h'
) FORMAT PLAIN ENCODE JSON;

How This Compares to Traditional Approaches

AspectBatch ML PipelineStreaming SQL (RisingWave)
Analysis latency12–24 hoursSub-second feature computation
Feature engineeringScheduled Python jobsDeclarative SQL materialized views
Alert lead timeHours after degradation beginsMinutes after degradation begins
Model input freshnessYesterday's featuresCurrent features
Ops complexityOrchestrator + model serverSingle SQL deployment
Historical retentionData lakeIceberg sink from RisingWave

FAQ

What is predictive maintenance?

Predictive maintenance uses sensor data and analytical models to identify equipment degradation before failure occurs. Unlike time-based preventive maintenance, it schedules interventions based on actual condition signals, reducing both unplanned downtime and unnecessary maintenance activity.

How does RisingWave support machine learning models?

RisingWave continuously computes the feature set (rolling means, standard deviations, rate-of-change, exceedance counts) that ML models need as inputs. These features are exposed through the PostgreSQL interface and can be consumed by external inference services, or rule-based degradation logic can be implemented directly in SQL as shown above.

Can I integrate RisingWave with my existing stack?

Yes. RisingWave connects to Kafka, PostgreSQL via CDC, and MySQL via CDC. It writes computed features and alerts to JDBC databases, Kafka topics, and Iceberg tables. CMMS platforms and ERP systems that expose a Kafka consumer or JDBC interface can receive automated maintenance recommendations.

How far in advance can streaming SQL detect equipment failures?

Detection lead time depends on the degradation rate and the feature-computation window. In practice, vibration and temperature trends computed over rolling 24-hour windows can surface bearing wear signatures 48 to 72 hours before alarm thresholds are reached.

Key Takeaways

  • Batch-based predictive maintenance pipelines have a 12–24 hour analysis lag; streaming SQL shrinks that lag to seconds, turning days of lead time into actionable maintenance windows.
  • RisingWave's TUMBLE and HOP window functions compute the feature set (rolling mean, standard deviation, rate-of-change) that degradation models require without custom application code.
  • Multi-parameter precursor patterns — rising vibration combined with rising bearing temperature — are expressible as SQL JOINs between materialized views.
  • Computed features are archived to Iceberg for ML model retraining, creating a closed feedback loop between real-time detection and model improvement.

Ready to try this? Get started with RisingWave. Join our Slack community.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.