Industrial monitoring platforms collect telemetry from thousands of devices simultaneously. You need two things at once: a real-time dashboard that tells you whether a device is behaving normally right now, and accurate historical reports that tell you what the monthly uptime was for a device in Q4. These two requirements pull in opposite directions, and patching one system to handle both usually makes each worse.
This post describes the RisingWave streaming batch reconciliation pattern as applied to IoT and industrial telemetry. The core idea is a split-horizon architecture: RisingWave owns the intraday streaming lane, and a nightly batch pipeline owns the historical lane. Each lane serves different consumers, covers a different time horizon, and makes different accuracy promises. The two lanes cooperate through a shared Iceberg lakehouse, but they never compete to produce the same output.
The Fundamental Tension
Streaming pipelines process sensor readings as they arrive. That makes them fast, but it also means they process the world as it was when each reading was received. Calibration tables, alert thresholds, and device metadata all update on a schedule. If a calibration coefficient changes after a batch of readings has already been processed, the streaming pipeline has already enriched those readings with the old value.
Batch pipelines run against a complete, stable snapshot of all reference data. They produce results that reflect the authoritative state of every calibration and threshold at query time. But they are slow. A nightly job that finishes at 3 AM cannot tell you whether a motor on the factory floor is running hot at 11 AM.
The right response is to stop treating these as competing approaches and start treating them as a division of labor.
- Streaming handles recency: current device health, intraday anomaly counts, live dashboards.
- Batch handles accuracy: completed-day uptime summaries, monthly compliance reports, rolling reliability windows.
Neither lane retroactively corrects the other. That constraint is what keeps the architecture operationally simple.
The Split-Horizon Architecture
graph TD
subgraph Sources
K[Sensor Events - Kafka]
IC[Calibration + Thresholds - Iceberg]
end
subgraph Streaming Lane - RisingWave
S1[Temporal Join: readings + current calibration]
S2[Intraday device health - fast, approximate]
S3[Real-time anomaly detection]
end
subgraph Batch Lane - dbt + Spark
B1[Full join: readings + authoritative calibration]
B2[Daily uptime summaries - exact]
B3[Monthly reliability reports]
end
subgraph Consumers
DASH[Operations Dashboard - live]
RPT[Compliance Reports - historical]
end
K --> S1
IC -->|current snapshot| S1
S1 --> S2
S2 --> S3
S3 --> DASH
K --> B1
IC -->|full history| B1
B1 --> B2
B2 --> B3
B3 --> RPT
B3 -->|nightly Iceberg refresh| IC
The link between the two lanes is the nightly Iceberg refresh. After the batch job completes, it writes updated calibration tables and device metadata back to the lakehouse. RisingWave reloads those tables, and from that point forward, all new sensor readings are enriched with fresh reference data.
What "Approximate" Means for Telemetry
Streaming aggregates in this architecture are exact for known devices and potentially incomplete for newly commissioned devices. When a reading arrives for a device that already exists in the calibration table, the enrichment is correct. When a reading arrives for a device that was just added to the fleet and has not yet appeared in the calibration table, enrichment fields will be NULL until the next reference data refresh.
This is not random noise. It has a specific cause (reference data lag) and a specific scope (only readings from recently commissioned devices, during the intraday window before the next batch refresh). For use cases like live dashboards and real-time anomaly detection, this is acceptable. You are not producing auditable maintenance records. You are producing signals for operators watching a screen.
For completed days where the batch job has already run, the batch output is the authoritative source. Streaming aggregates for those days are considered superseded.
Implementation
Step 1: Define Reference Tables with Full Reload
Calibration tables and device metadata in RisingWave are loaded from Iceberg after each nightly batch run. The refresh_mode = 'FULL_RELOAD' setting tells RisingWave to replace the table contents completely on each refresh, matching the authoritative state of the batch pipeline.
CREATE TABLE device_calibration (
device_id VARCHAR PRIMARY KEY,
calibration_offset DOUBLE,
alert_threshold_temp DOUBLE,
site_id VARCHAR,
calibration_date DATE
) WITH (
connector = 'iceberg',
catalog.type = 'glue',
database.name = 'iot_platform',
table.name = 'dim_device_calibration',
refresh_mode = 'FULL_RELOAD'
);
CREATE TABLE device_metadata (
device_id VARCHAR PRIMARY KEY,
device_type VARCHAR,
location VARCHAR,
owner_team VARCHAR,
commissioned_date DATE
) WITH (
connector = 'iceberg',
catalog.type = 'glue',
database.name = 'iot_platform',
table.name = 'dim_device_metadata',
refresh_mode = 'FULL_RELOAD'
);
See the RisingWave Iceberg source connector documentation for the full list of catalog and authentication parameters.
Step 2: Enrich the Sensor Stream with a Temporal Join
Raw sensor readings arriving in Kafka are enriched with the current snapshot of both reference tables. The FOR SYSTEM_TIME AS OF PROCTIME() syntax tells RisingWave to look up the reference tables at processing time. This is stateless on the reference side and produces no retroactive corrections, which is the correct behavior for a streaming lane.
CREATE SOURCE sensor_readings (
device_id VARCHAR,
reading_ts TIMESTAMPTZ,
raw_temperature DOUBLE,
is_alert BOOLEAN,
uptime_minutes INT
) WITH (
connector = 'kafka',
topic = 'sensor-telemetry',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
CREATE MATERIALIZED VIEW sensor_readings_enriched AS
SELECT
r.device_id,
r.reading_ts,
r.raw_temperature + COALESCE(c.calibration_offset, 0) AS avg_temperature,
r.is_alert,
r.uptime_minutes,
c.alert_threshold_temp,
m.device_type,
m.location,
m.site_id
FROM sensor_readings r
LEFT JOIN device_calibration FOR SYSTEM_TIME AS OF PROCTIME() c
ON r.device_id = c.device_id
LEFT JOIN device_metadata FOR SYSTEM_TIME AS OF PROCTIME() m
ON r.device_id = m.device_id;
The LEFT JOIN is intentional. An inner join would silently drop readings for unknown devices. With a left join, those readings appear with NULL enrichment fields, which is the correct behavior: the event is visible and the NULL rate is observable. A spike in NULL enrichment is an operational signal that a new batch of devices was commissioned without a corresponding reference data refresh.
Step 3: Build Intraday Device Health Aggregates
With enriched readings flowing through, you can build streaming aggregates for the current day.
CREATE MATERIALIZED VIEW device_health_current AS
SELECT
device_id,
DATE_TRUNC('day', reading_ts) AS reading_date,
AVG(avg_temperature) AS avg_temperature,
COUNT(CASE WHEN is_alert THEN 1 END) AS alert_count,
SUM(uptime_minutes) AS uptime_minutes,
COUNT(CASE WHEN avg_temperature IS NULL THEN 1 END) AS null_calibration_count
FROM sensor_readings_enriched
WHERE reading_ts >= DATE_TRUNC('day', NOW())
GROUP BY device_id, DATE_TRUNC('day', reading_ts);
CREATE MATERIALIZED VIEW device_metrics_1d AS
SELECT
DATE_TRUNC('hour', reading_ts) AS reading_hour,
site_id,
device_type,
AVG(avg_temperature) AS avg_temperature,
COUNT(CASE WHEN is_alert THEN 1 END) AS alert_count,
SUM(uptime_minutes) AS uptime_minutes
FROM sensor_readings_enriched
WHERE reading_ts >= DATE_TRUNC('day', NOW())
GROUP BY DATE_TRUNC('hour', reading_ts), site_id, device_type;
The null_calibration_count column in device_health_current is your early warning system. Set an alert threshold (for example, more than 1% of readings with NULL calibration) and page on it.
Step 4: The Incremental Append Pattern for Cumulative Metrics
For lifetime or multi-day metrics such as total uptime or cumulative alert counts, the most reliable approach is the incremental append pattern: an authoritative batch-produced historical summary that covers all completed days, combined with a streaming increment that covers activity since the historical cutoff.
-- Batch-produced historical summary (loaded from Iceberg after nightly run)
CREATE TABLE device_uptime_history (
device_id VARCHAR,
total_uptime_minutes BIGINT,
total_alerts INT,
history_through_date DATE,
PRIMARY KEY (device_id)
) WITH (
connector = 'iceberg',
catalog.type = 'glue',
database.name = 'iot_platform',
table.name = 'fact_device_uptime_history',
refresh_mode = 'FULL_RELOAD'
);
-- Streaming incremental layer: readings after the history cutoff
CREATE MATERIALIZED VIEW device_uptime_incremental AS
SELECT
device_id,
SUM(uptime_minutes) AS incremental_uptime_minutes,
COUNT(CASE WHEN is_alert THEN 1 END) AS incremental_alerts
FROM sensor_readings_enriched
WHERE reading_ts::DATE > (SELECT MAX(history_through_date) FROM device_uptime_history)
GROUP BY device_id;
-- Combined view: historical total + streaming increment
CREATE MATERIALIZED VIEW device_uptime_current AS
SELECT
COALESCE(h.device_id, i.device_id) AS device_id,
COALESCE(h.total_uptime_minutes, 0) + COALESCE(i.incremental_uptime_minutes, 0) AS total_uptime_minutes,
COALESCE(h.total_alerts, 0) + COALESCE(i.incremental_alerts, 0) AS total_alerts
FROM device_uptime_history h
FULL OUTER JOIN device_uptime_incremental i ON h.device_id = i.device_id;
The core formula is straightforward:
cumulative_metric = authoritative_historical_total + streaming_increment_since_cutoff
When the nightly batch job completes and updates device_uptime_history, the device_uptime_incremental view automatically shifts its date filter forward, and device_uptime_current reflects the new combined totals. No manual intervention is required.
This pattern is covered in more detail in the RisingWave materialized view documentation.
Step 5: Routing Dashboards to the Right Lane
Dashboards that serve both historical and current-day data need to pull from the right lane for each time period.
-- Unified daily metrics view: batch for completed days, streaming for today
CREATE MATERIALIZED VIEW device_metrics_unified AS
-- Completed days: authoritative batch output
SELECT
reading_date,
device_id,
avg_temperature,
alert_count,
uptime_minutes,
'batch' AS source
FROM batch_daily_device_metrics -- loaded from Iceberg after nightly run
WHERE reading_date < CURRENT_DATE
UNION ALL
-- Current day: streaming aggregates
SELECT
reading_date,
device_id,
avg_temperature,
alert_count,
uptime_minutes,
'streaming' AS source
FROM device_health_current
WHERE reading_date = CURRENT_DATE;
Any BI tool connecting to RisingWave via the PostgreSQL wire protocol can query this view directly with no additional middleware.
Operational Notes
A split-horizon architecture stays reliable when the following practices are in place.
Document the cutoff date for each historical table. Every Iceberg-backed table loaded from the batch pipeline should carry a history_through_date or equivalent column. This makes it easy to see which days are covered by the batch output and which days are covered by the streaming increment.
Monitor NULL rates in streaming enrichment. A sudden increase in NULL calibration values means a reference table is missing data or a refresh did not complete. Alert on it before operators start seeing blank fields in dashboards.
Trigger reference data reload after each nightly batch run, not on a fixed schedule. Batch job completion times vary. Reloading on a fixed schedule risks a race condition where RisingWave reloads before the batch write finishes. A completion signal (API call, Kafka control message, or direct SQL command) is more reliable.
Never show streaming values for completed days as the primary number. Once the batch output is available for a day, that becomes the authoritative value. The streaming value for that day is superseded and should be excluded from historical reports.
Test the cutover weekly. Verify that device_uptime_current produces the same result as a direct query against the batch output for devices that had zero readings today. This confirms that the historical table is loading correctly and the incremental filter is working as expected.
Frequently Asked Questions
Does this resemble the lambda architecture?
It does in structure, but without the complexity of a separate serving layer that merges results at query time. The two lanes serve different time ranges and different consumers. Streaming serves the current day; batch serves completed days. The merge is a UNION ALL or date-routing query in SQL, both of which run inside RisingWave as a materialized view.
What happens when the nightly batch job is delayed or fails?
The streaming increment continues to grow. Consumers see data sourced entirely from streaming for a longer window than usual, but they do not see incorrect data. The history_through_date value in the historical table will show that the historical layer has not advanced. Alert on batch job failures independently of the streaming pipeline.
Can I use this pattern with a data warehouse instead of Iceberg?
Yes. RisingWave can load reference data from PostgreSQL, MySQL, and other relational sources in addition to Iceberg. The key requirement is that the historical tables reload atomically after the batch job completes, so the streaming increment's date filter shifts in one step rather than incrementally.
How does this interact with RisingWave's exactly-once guarantees?
RisingWave provides exactly-once semantics for all materialized views through internal checkpointing. Each sensor reading in the streaming increment is counted exactly once. The approximation in this architecture comes from reference data lag, not from event duplication or loss.
What is the latency of the streaming lane?
With Kafka as the source, end-to-end latency from event arrival to query-visible result is typically under one second, bounded by RisingWave's internal checkpoint interval. This is fast enough for live equipment dashboards and real-time anomaly detection.
Conclusion
The split-horizon architecture does not paper over the limitations of streaming or batch. It assigns each system the work it does best.
RisingWave handles the current day: joining sensor readings with calibration tables through temporal joins, maintaining intraday device health as materialized views, and serving sub-second queries to live operations dashboards. The batch pipeline handles completed days: running full joins against authoritative reference data, producing exact uptime and alert summaries, and writing results back to the lakehouse where RisingWave can reload them.
The incremental append pattern gives you a cumulative metric that is always current without requiring the streaming layer to carry the full historical burden. The nightly reference data refresh cycle gives you a predictable accuracy contract: readings after the last refresh are approximate; everything before it is exact.
For engineering teams building industrial monitoring platforms, predictive maintenance systems, or equipment reliability dashboards, this architecture provides both the real-time visibility and the historical accuracy that production operations require.
Try it yourself. Get RisingWave running in minutes with the quickstart guide. If you find it useful, a star on GitHub helps the project grow. For architecture questions, the RisingWave Slack community is active and the engineering team answers questions directly.

