Modern factory floors generate tens of thousands of sensor and MES events per second per line. RisingWave, a PostgreSQL-compatible streaming database, ingests these events from Kafka at production scale, maintains continuously updated OEE, throughput, and defect materialized views, and serves query results to dashboards with sub-second latency—without a single line of custom streaming code.
Why Shop Floor Analytics at Scale Matters
A large assembly plant running 20 lines with 50 stations each generates roughly 1,000 state-change events per second per line: machine heartbeats, cycle completions, defect flags, downtime starts and ends, and tooling alerts. Across 20 lines that is 20,000 events per second sustained—with bursts during shift changes. Traditional historians and batch ETL pipelines were not designed for this volume at interactive query latency.
The consequences of delayed analytics are material: a five-minute lag on an OEE dashboard means a quality escape that started at minute one runs for six minutes before anyone reacts. On a line making 300 units per minute that is 1,800 potentially defective units. Real-time visibility turns quality escapes from post-shift findings into second-by-second interventions.
The data also does not live in one place. Cycle times come from PLCs via MQTT. Defect flags come from vision systems via HTTP. Downtime codes come from operator HMI entries. Tooling wear comes from CNC controllers via OPC-UA. Streaming SQL must join all of these streams coherently.
The Streaming SQL Approach
RisingWave is designed to handle exactly this pattern. It consumes multiple Kafka topics concurrently, maintains shared state across stream-stream and stream-table joins, and exposes results through a PostgreSQL-wire-protocol interface that Grafana, Tableau, and custom React dashboards can query directly.
Key design principles for shop floor analytics at scale:
- Partition by machine ID in Kafka topics and in materialized views to keep aggregations local and avoid cross-partition shuffles.
- Use tumbling windows for OEE calculations—OEE is inherently a windowed metric (planned production time, run time, quality rate over a shift or hour).
- Separate high-frequency raw events from lower-frequency aggregates; dashboards read aggregates, not raw events.
- Push critical alerts downstream via Kafka sinks so alert consumers are decoupled from the analytics layer.
Building It Step by Step
Step 1: Data Source
-- High-frequency machine cycle events
CREATE SOURCE machine_cycles (
machine_id VARCHAR,
line_id VARCHAR,
plant_id VARCHAR,
cycle_start_ts TIMESTAMPTZ,
cycle_end_ts TIMESTAMPTZ,
cycle_time_ms INT,
part_number VARCHAR,
good_unit BOOLEAN,
defect_code VARCHAR,
operator_id VARCHAR,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'shopfloor.machine.cycles',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
-- Machine downtime events
CREATE SOURCE machine_downtime (
event_id VARCHAR,
machine_id VARCHAR,
line_id VARCHAR,
downtime_type VARCHAR, -- 'planned', 'unplanned', 'changeover'
downtime_code VARCHAR,
start_ts TIMESTAMPTZ,
end_ts TIMESTAMPTZ,
duration_sec INT,
event_ts TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'shopfloor.machine.downtime',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Core Materialized View
-- Per-machine OEE components in 1-hour tumbling windows
CREATE MATERIALIZED VIEW oee_hourly AS
SELECT
machine_id,
line_id,
plant_id,
window_start,
window_end,
COUNT(*) AS total_cycles,
COUNT(*) FILTER (WHERE good_unit = true) AS good_units,
COUNT(*) FILTER (WHERE good_unit = false) AS defect_units,
AVG(cycle_time_ms) AS avg_cycle_time_ms,
MIN(cycle_time_ms) AS min_cycle_time_ms,
MAX(cycle_time_ms) AS max_cycle_time_ms,
STDDEV(cycle_time_ms) AS stddev_cycle_time_ms,
-- Quality rate component of OEE
ROUND(
COUNT(*) FILTER (WHERE good_unit = true) * 100.0 / NULLIF(COUNT(*), 0),
2
) AS quality_rate_pct,
-- Performance: actual cycles vs ideal (ideal = 3600000 ms / ideal_cycle_ms)
-- Availability computed separately via downtime join
part_number
FROM TUMBLE(machine_cycles, event_ts, INTERVAL '1 hour')
GROUP BY machine_id, line_id, plant_id, window_start, window_end, part_number;
-- Downtime summary per machine per hour
CREATE MATERIALIZED VIEW downtime_hourly AS
SELECT
machine_id,
line_id,
window_start,
window_end,
SUM(duration_sec) FILTER (WHERE downtime_type = 'unplanned') AS unplanned_downtime_sec,
SUM(duration_sec) FILTER (WHERE downtime_type = 'planned') AS planned_downtime_sec,
SUM(duration_sec) FILTER (WHERE downtime_type = 'changeover') AS changeover_sec,
COUNT(*) FILTER (WHERE downtime_type = 'unplanned') AS unplanned_events,
SUM(duration_sec) AS total_downtime_sec
FROM TUMBLE(machine_downtime, event_ts, INTERVAL '1 hour')
GROUP BY machine_id, line_id, window_start, window_end;
-- Defect Pareto: top defect codes per line per shift
CREATE MATERIALIZED VIEW defect_pareto_shift AS
SELECT
line_id,
plant_id,
part_number,
defect_code,
window_start,
window_end,
COUNT(*) FILTER (WHERE good_unit = false) AS defect_count,
COUNT(*) AS total_units,
ROUND(
COUNT(*) FILTER (WHERE good_unit = false) * 100.0 / NULLIF(COUNT(*), 0),
3
) AS defect_rate_pct
FROM TUMBLE(machine_cycles, event_ts, INTERVAL '8 hours')
WHERE good_unit = false
GROUP BY line_id, plant_id, part_number, defect_code, window_start, window_end
ORDER BY defect_count DESC;
Step 3: Alerts and Aggregations
-- Real-time defect spike: defect rate > 5% in the last 10 minutes
CREATE MATERIALIZED VIEW defect_spike_alerts AS
SELECT
machine_id,
line_id,
defect_code,
COUNT(*) FILTER (WHERE good_unit = false) AS defect_count_10m,
COUNT(*) AS total_count_10m,
ROUND(
COUNT(*) FILTER (WHERE good_unit = false) * 100.0 / NULLIF(COUNT(*), 0),
2
) AS defect_rate_pct,
window_start,
window_end
FROM TUMBLE(machine_cycles, event_ts, INTERVAL '10 minutes')
GROUP BY machine_id, line_id, defect_code, window_start, window_end
HAVING COUNT(*) FILTER (WHERE good_unit = false) * 100.0 / NULLIF(COUNT(*), 0) > 5.0;
CREATE SINK defect_spike_sink
AS SELECT * FROM defect_spike_alerts
WITH (
connector = 'kafka',
topic = 'shopfloor.alerts.quality',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
-- Cycle time anomaly: machine running > 20% over average cycle time
CREATE MATERIALIZED VIEW cycle_time_anomalies AS
SELECT
mc.machine_id,
mc.line_id,
mc.cycle_time_ms,
oh.avg_cycle_time_ms AS baseline_avg_ms,
ROUND(
(mc.cycle_time_ms - oh.avg_cycle_time_ms) * 100.0 /
NULLIF(oh.avg_cycle_time_ms, 0),
1
) AS pct_over_baseline,
mc.event_ts
FROM machine_cycles mc
JOIN oee_hourly oh
ON mc.machine_id = oh.machine_id
AND mc.part_number = oh.part_number
WHERE mc.cycle_time_ms > oh.avg_cycle_time_ms * 1.2;
Comparison Table
| Metric | Historian + Batch ETL | Spark Streaming | RisingWave |
| Ingest throughput per node | 50K events/s | 200K events/s | 500K+ events/s |
| Dashboard query latency | Minutes | Seconds | Sub-second |
| OEE window freshness | 1 hour | 5 minutes | Continuous |
| Stream-table join support | No | Yes (complex) | Yes (SQL) |
| Operational complexity | High (ETL jobs) | High (Spark cluster) | Low (SQL only) |
FAQ
How does RisingWave handle the fan-out from thousands of machine IDs?
RisingWave partitions materialized view state by the GROUP BY keys. For a view grouped by machine_id, each partition holds state for a subset of machines. Adding more RisingWave compute nodes redistributes partitions automatically, so throughput scales horizontally.
Can I query individual machine cycle history alongside aggregates?
Yes. You can create one materialized view that retains recent raw events (e.g., the last 24 hours using a time-bounded filter) and another that computes hourly aggregates. Dashboards can join them at query time over the PostgreSQL interface.
Does RisingWave support exactly-once semantics when writing to Kafka sinks?
RisingWave provides at-least-once delivery for Kafka sinks by default, with idempotent producer settings available. For exactly-once guarantees end-to-end, you can enable Kafka transactions on the sink connector.
Key Takeaways
- RisingWave ingests high-volume MES and machine-cycle events from Kafka and maintains OEE, defect, and downtime aggregates as continuously updated materialized views.
- Tumbling windows on
INTERVAL '1 hour'andINTERVAL '8 hours'match shift and hourly OEE reporting cadences without scheduled batch jobs. - Defect-spike and cycle-time-anomaly alerts are pure SQL
HAVINGclauses that produce Kafka sink output in sub-second latency. - Grafana or any PostgreSQL-compatible BI tool reads materialized views directly—no intermediate data warehouse required.

