Shop Floor Analytics: Processing Millions of Events Per Second

Shop Floor Analytics: Processing Millions of Events Per Second

Modern factory floors generate tens of thousands of sensor and MES events per second per line. RisingWave, a PostgreSQL-compatible streaming database, ingests these events from Kafka at production scale, maintains continuously updated OEE, throughput, and defect materialized views, and serves query results to dashboards with sub-second latency—without a single line of custom streaming code.

Why Shop Floor Analytics at Scale Matters

A large assembly plant running 20 lines with 50 stations each generates roughly 1,000 state-change events per second per line: machine heartbeats, cycle completions, defect flags, downtime starts and ends, and tooling alerts. Across 20 lines that is 20,000 events per second sustained—with bursts during shift changes. Traditional historians and batch ETL pipelines were not designed for this volume at interactive query latency.

The consequences of delayed analytics are material: a five-minute lag on an OEE dashboard means a quality escape that started at minute one runs for six minutes before anyone reacts. On a line making 300 units per minute that is 1,800 potentially defective units. Real-time visibility turns quality escapes from post-shift findings into second-by-second interventions.

The data also does not live in one place. Cycle times come from PLCs via MQTT. Defect flags come from vision systems via HTTP. Downtime codes come from operator HMI entries. Tooling wear comes from CNC controllers via OPC-UA. Streaming SQL must join all of these streams coherently.

The Streaming SQL Approach

RisingWave is designed to handle exactly this pattern. It consumes multiple Kafka topics concurrently, maintains shared state across stream-stream and stream-table joins, and exposes results through a PostgreSQL-wire-protocol interface that Grafana, Tableau, and custom React dashboards can query directly.

Key design principles for shop floor analytics at scale:

  • Partition by machine ID in Kafka topics and in materialized views to keep aggregations local and avoid cross-partition shuffles.
  • Use tumbling windows for OEE calculations—OEE is inherently a windowed metric (planned production time, run time, quality rate over a shift or hour).
  • Separate high-frequency raw events from lower-frequency aggregates; dashboards read aggregates, not raw events.
  • Push critical alerts downstream via Kafka sinks so alert consumers are decoupled from the analytics layer.

Building It Step by Step

Step 1: Data Source

-- High-frequency machine cycle events
CREATE SOURCE machine_cycles (
    machine_id      VARCHAR,
    line_id         VARCHAR,
    plant_id        VARCHAR,
    cycle_start_ts  TIMESTAMPTZ,
    cycle_end_ts    TIMESTAMPTZ,
    cycle_time_ms   INT,
    part_number     VARCHAR,
    good_unit       BOOLEAN,
    defect_code     VARCHAR,
    operator_id     VARCHAR,
    event_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'shopfloor.machine.cycles',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

-- Machine downtime events
CREATE SOURCE machine_downtime (
    event_id        VARCHAR,
    machine_id      VARCHAR,
    line_id         VARCHAR,
    downtime_type   VARCHAR,  -- 'planned', 'unplanned', 'changeover'
    downtime_code   VARCHAR,
    start_ts        TIMESTAMPTZ,
    end_ts          TIMESTAMPTZ,
    duration_sec    INT,
    event_ts        TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'shopfloor.machine.downtime',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

Step 2: Core Materialized View

-- Per-machine OEE components in 1-hour tumbling windows
CREATE MATERIALIZED VIEW oee_hourly AS
SELECT
    machine_id,
    line_id,
    plant_id,
    window_start,
    window_end,
    COUNT(*)                                         AS total_cycles,
    COUNT(*) FILTER (WHERE good_unit = true)         AS good_units,
    COUNT(*) FILTER (WHERE good_unit = false)        AS defect_units,
    AVG(cycle_time_ms)                               AS avg_cycle_time_ms,
    MIN(cycle_time_ms)                               AS min_cycle_time_ms,
    MAX(cycle_time_ms)                               AS max_cycle_time_ms,
    STDDEV(cycle_time_ms)                            AS stddev_cycle_time_ms,
    -- Quality rate component of OEE
    ROUND(
        COUNT(*) FILTER (WHERE good_unit = true) * 100.0 / NULLIF(COUNT(*), 0),
        2
    )                                                AS quality_rate_pct,
    -- Performance: actual cycles vs ideal (ideal = 3600000 ms / ideal_cycle_ms)
    -- Availability computed separately via downtime join
    part_number
FROM TUMBLE(machine_cycles, event_ts, INTERVAL '1 hour')
GROUP BY machine_id, line_id, plant_id, window_start, window_end, part_number;

-- Downtime summary per machine per hour
CREATE MATERIALIZED VIEW downtime_hourly AS
SELECT
    machine_id,
    line_id,
    window_start,
    window_end,
    SUM(duration_sec) FILTER (WHERE downtime_type = 'unplanned') AS unplanned_downtime_sec,
    SUM(duration_sec) FILTER (WHERE downtime_type = 'planned')   AS planned_downtime_sec,
    SUM(duration_sec) FILTER (WHERE downtime_type = 'changeover') AS changeover_sec,
    COUNT(*) FILTER (WHERE downtime_type = 'unplanned')           AS unplanned_events,
    SUM(duration_sec)                                             AS total_downtime_sec
FROM TUMBLE(machine_downtime, event_ts, INTERVAL '1 hour')
GROUP BY machine_id, line_id, window_start, window_end;

-- Defect Pareto: top defect codes per line per shift
CREATE MATERIALIZED VIEW defect_pareto_shift AS
SELECT
    line_id,
    plant_id,
    part_number,
    defect_code,
    window_start,
    window_end,
    COUNT(*) FILTER (WHERE good_unit = false) AS defect_count,
    COUNT(*)                                   AS total_units,
    ROUND(
        COUNT(*) FILTER (WHERE good_unit = false) * 100.0 / NULLIF(COUNT(*), 0),
        3
    )                                          AS defect_rate_pct
FROM TUMBLE(machine_cycles, event_ts, INTERVAL '8 hours')
WHERE good_unit = false
GROUP BY line_id, plant_id, part_number, defect_code, window_start, window_end
ORDER BY defect_count DESC;

Step 3: Alerts and Aggregations

-- Real-time defect spike: defect rate > 5% in the last 10 minutes
CREATE MATERIALIZED VIEW defect_spike_alerts AS
SELECT
    machine_id,
    line_id,
    defect_code,
    COUNT(*) FILTER (WHERE good_unit = false) AS defect_count_10m,
    COUNT(*)                                   AS total_count_10m,
    ROUND(
        COUNT(*) FILTER (WHERE good_unit = false) * 100.0 / NULLIF(COUNT(*), 0),
        2
    )                                          AS defect_rate_pct,
    window_start,
    window_end
FROM TUMBLE(machine_cycles, event_ts, INTERVAL '10 minutes')
GROUP BY machine_id, line_id, defect_code, window_start, window_end
HAVING COUNT(*) FILTER (WHERE good_unit = false) * 100.0 / NULLIF(COUNT(*), 0) > 5.0;

CREATE SINK defect_spike_sink
AS SELECT * FROM defect_spike_alerts
WITH (
    connector = 'kafka',
    topic = 'shopfloor.alerts.quality',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

-- Cycle time anomaly: machine running > 20% over average cycle time
CREATE MATERIALIZED VIEW cycle_time_anomalies AS
SELECT
    mc.machine_id,
    mc.line_id,
    mc.cycle_time_ms,
    oh.avg_cycle_time_ms      AS baseline_avg_ms,
    ROUND(
        (mc.cycle_time_ms - oh.avg_cycle_time_ms) * 100.0 /
        NULLIF(oh.avg_cycle_time_ms, 0),
        1
    )                         AS pct_over_baseline,
    mc.event_ts
FROM machine_cycles mc
JOIN oee_hourly oh
  ON mc.machine_id = oh.machine_id
 AND mc.part_number = oh.part_number
WHERE mc.cycle_time_ms > oh.avg_cycle_time_ms * 1.2;

Comparison Table

MetricHistorian + Batch ETLSpark StreamingRisingWave
Ingest throughput per node50K events/s200K events/s500K+ events/s
Dashboard query latencyMinutesSecondsSub-second
OEE window freshness1 hour5 minutesContinuous
Stream-table join supportNoYes (complex)Yes (SQL)
Operational complexityHigh (ETL jobs)High (Spark cluster)Low (SQL only)

FAQ

How does RisingWave handle the fan-out from thousands of machine IDs?

RisingWave partitions materialized view state by the GROUP BY keys. For a view grouped by machine_id, each partition holds state for a subset of machines. Adding more RisingWave compute nodes redistributes partitions automatically, so throughput scales horizontally.

Can I query individual machine cycle history alongside aggregates?

Yes. You can create one materialized view that retains recent raw events (e.g., the last 24 hours using a time-bounded filter) and another that computes hourly aggregates. Dashboards can join them at query time over the PostgreSQL interface.

Does RisingWave support exactly-once semantics when writing to Kafka sinks?

RisingWave provides at-least-once delivery for Kafka sinks by default, with idempotent producer settings available. For exactly-once guarantees end-to-end, you can enable Kafka transactions on the sink connector.

Key Takeaways

  • RisingWave ingests high-volume MES and machine-cycle events from Kafka and maintains OEE, defect, and downtime aggregates as continuously updated materialized views.
  • Tumbling windows on INTERVAL '1 hour' and INTERVAL '8 hours' match shift and hourly OEE reporting cadences without scheduled batch jobs.
  • Defect-spike and cycle-time-anomaly alerts are pure SQL HAVING clauses that produce Kafka sink output in sub-second latency.
  • Grafana or any PostgreSQL-compatible BI tool reads materialized views directly—no intermediate data warehouse required.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.