Temporal filters and time travel queries let you interrogate streaming data at any point in time, not just the present snapshot. In streaming SQL, a temporal filter restricts a query to events within a specific time window using columns like event_time or processing_time. A time travel query goes further: it reconstructs what a materialized view contained at a past timestamp, enabling point-in-time analysis over continuously changing data. RisingWave, a PostgreSQL-compatible streaming database, supports both patterns natively with standard SQL.
Two Clocks in Every Streaming Pipeline
Every event in a streaming system carries at least two timestamps:
- Event time: when the event actually occurred in the real world (the click happened, the sensor fired, the transaction settled)
- Processing time: when the event arrived in the streaming system
The gap between these two timestamps is called event skew. On a fast network with low-latency producers, event skew is milliseconds. For mobile apps that go offline and sync later, skew can be hours or even days.
| Dimension | Event Time | Processing Time |
| Source | Application/device clock | Database ingestion clock |
| Stability | Fixed once the event occurs | Changes based on when data arrives |
| Late data | Common; offline devices, retries | Rare; reflects actual arrival order |
| Best for | Business logic correctness | Operational monitoring, throughput tracking |
| SQL reference | event_time, occurred_at | now(), proctime(), ingest_time |
Choosing the wrong time domain produces silently wrong results. Revenue reports filtered by processing time will include yesterday's mobile transactions that arrived this morning. Fraud detection filtered by event time will correctly group a user's actions as they actually happened, not in the arbitrary order they hit the system.
Setting Up the Example Tables
All SQL in this post was tested against RisingWave 2.8.0. Run these setup statements first:
-- Clickstream with both time columns
CREATE TABLE tmp_clickstream (
click_id INT,
user_id INT,
page VARCHAR,
event_time TIMESTAMP, -- when the click happened
ingest_time TIMESTAMP -- when RisingWave received it
);
INSERT INTO tmp_clickstream VALUES
-- On-time events: event_time close to ingest_time
(1, 42, '/home', '2026-04-01 10:00:00', '2026-04-01 10:00:01'),
(2, 42, '/products', '2026-04-01 10:00:30', '2026-04-01 10:00:31'),
(3, 43, '/home', '2026-04-01 10:01:00', '2026-04-01 10:01:02'),
-- Late events: mobile app was offline, synced at 10:01
(4, 44, '/checkout', '2026-04-01 09:45:00', '2026-04-01 10:01:30'),
(5, 44, '/confirm', '2026-04-01 09:47:00', '2026-04-01 10:01:32'),
-- Very late: offline for over an hour
(6, 45, '/home', '2026-04-01 09:00:00', '2026-04-01 10:02:00');
Temporal Filters: Event Time vs. Processing Time
A temporal filter adds a WHERE clause that bounds results to a time range. The choice of time column determines which question you are answering.
Processing-Time Filter
-- What records arrived in the system between 09:02 and 10:02?
SELECT 'ingest_time' AS filter_type,
click_id, user_id, page, event_time, ingest_time
FROM tmp_clickstream
WHERE ingest_time >= '2026-04-01 09:02:00'
ORDER BY ingest_time;
filter_type | click_id | user_id | page | event_time | ingest_time
-------------+----------+---------+-----------+---------------------+---------------------
ingest_time | 1 | 42 | /home | 2026-04-01 10:00:00 | 2026-04-01 10:00:01
ingest_time | 2 | 42 | /products | 2026-04-01 10:00:30 | 2026-04-01 10:00:31
ingest_time | 3 | 43 | /home | 2026-04-01 10:01:00 | 2026-04-01 10:01:02
ingest_time | 4 | 44 | /checkout | 2026-04-01 09:45:00 | 2026-04-01 10:01:30
ingest_time | 5 | 44 | /confirm | 2026-04-01 09:47:00 | 2026-04-01 10:01:32
ingest_time | 6 | 45 | /home | 2026-04-01 09:00:00 | 2026-04-01 10:02:00
(6 rows)
All six rows are returned because all of them arrived after 09:02. Processing-time filters tell you what the pipeline ingested, regardless of when the events occurred.
Event-Time Filter
-- What events actually HAPPENED after 09:02?
SELECT 'event_time' AS filter_type,
click_id, user_id, page, event_time, ingest_time
FROM tmp_clickstream
WHERE event_time >= '2026-04-01 09:02:00'
ORDER BY event_time;
filter_type | click_id | user_id | page | event_time | ingest_time
-------------+----------+---------+-----------+---------------------+---------------------
event_time | 4 | 44 | /checkout | 2026-04-01 09:45:00 | 2026-04-01 10:01:30
event_time | 5 | 44 | /confirm | 2026-04-01 09:47:00 | 2026-04-01 10:01:32
event_time | 1 | 42 | /home | 2026-04-01 10:00:00 | 2026-04-01 10:00:01
event_time | 2 | 42 | /products | 2026-04-01 10:00:30 | 2026-04-01 10:00:31
event_time | 3 | 43 | /home | 2026-04-01 10:01:00 | 2026-04-01 10:01:02
(5 rows)
Five rows. The 09:00 event from user 45 is excluded because it happened before 09:02, even though it arrived late. Event-time filters answer the business question: what did users actually do during this period?
Rolling Temporal Filters with now()
In a live streaming pipeline, you typically want a rolling filter relative to the current moment. RisingWave evaluates now() at query time, which means a materialized view using now() will return different rows as the clock advances.
-- Events from the last hour, evaluated at query time
SELECT order_id, user_id, amount, event_time
FROM tmp_orders
WHERE event_time > now()::TIMESTAMP - INTERVAL '1 hour'
ORDER BY event_time DESC;
When you embed this in a materialized view, RisingWave updates the result set continuously as time passes, automatically dropping events that fall outside the window without any cron jobs or manual refresh logic.
Handling Late Data with Watermarks
Late data is the gap between event time and processing time. Without a policy for handling late events, your streaming aggregations will be perpetually incomplete.
RisingWave uses watermarks to track event-time progress. A watermark is a monotonically increasing timestamp that declares: "all events with an event_time before this point have arrived." Events that arrive after the watermark has advanced beyond their window are considered late.
You define watermarks at the source level:
-- 5-second watermark: tolerate up to 5 seconds of lateness
CREATE TABLE tmp_kafka_events (
event_id INT,
event_type VARCHAR,
payload VARCHAR,
event_time TIMESTAMP,
WATERMARK FOR event_time AS event_time - INTERVAL '5 seconds'
) APPEND ONLY;
-- 30-second watermark: mobile apps have more network variability
CREATE TABLE tmp_mobile_events (
event_id INT,
device_id VARCHAR,
action VARCHAR,
event_time TIMESTAMP,
WATERMARK FOR event_time AS event_time - INTERVAL '30 seconds'
) APPEND ONLY;
The watermark expression event_time - INTERVAL '5 seconds' means the watermark lags 5 seconds behind the maximum observed event_time. When an event arrives with event_time = 10:00:30, the watermark advances to 10:00:25. Any subsequent events with event_time < 10:00:25 are treated as late.
Choosing the Right Watermark Tolerance
| Data Source | Recommended Tolerance | Reason |
| Server-side logs over LAN | 1-5 seconds | Network latency is minimal |
| Kafka consumer with rebalancing | 5-15 seconds | Brief delays during partition reassignment |
| Mobile app events | 30-300 seconds | Connectivity gaps and batch sync |
| IoT devices with poor signal | 1-5 minutes | Intermittent connectivity |
| Offline-capable applications | 1-24 hours | Full offline sessions |
A tighter watermark means faster results but more late data dropped. A looser watermark captures more late events but increases result latency. Measure your actual event skew distribution before picking a value.
EMIT ON WINDOW CLOSE: Finalized Results Over Event Time
By default, RisingWave materialized views emit updated results as each new event arrives, including partial results for windows that have not closed yet. The EMIT ON WINDOW CLOSE clause changes this behavior: results appear only once the watermark has advanced past the window boundary, guaranteeing that the window is complete.
Here is the full pattern with a sensor stream:
-- Source with watermark
CREATE TABLE tmp_sensor_stream (
sensor_id INT,
temperature DOUBLE PRECISION,
event_time TIMESTAMP,
WATERMARK FOR event_time AS event_time - INTERVAL '10 seconds'
) APPEND ONLY;
After inserting six readings (including one late event at 09:59:45):
INSERT INTO tmp_sensor_stream VALUES
(1, 22.5, '2026-04-01 10:00:00'),
(2, 23.1, '2026-04-01 10:00:05'),
(1, 22.8, '2026-04-01 10:00:10'),
(2, 23.5, '2026-04-01 10:00:15'),
(3, 21.0, '2026-04-01 09:59:45'), -- late event
(1, 23.0, '2026-04-01 10:00:20');
Build the aggregation with EMIT ON WINDOW CLOSE:
CREATE MATERIALIZED VIEW tmp_mv_sensor_1min AS
SELECT
window_start,
window_end,
sensor_id,
ROUND(AVG(temperature)::numeric, 2) AS avg_temp,
COUNT(*) AS reading_count
FROM TUMBLE(tmp_sensor_stream, event_time, INTERVAL '1 minute')
GROUP BY window_start, window_end, sensor_id
EMIT ON WINDOW CLOSE;
After the watermark advances past 10:00:00 (triggered by new events arriving at 10:01:00+), the closed windows appear:
SELECT * FROM tmp_mv_sensor_1min ORDER BY window_start, sensor_id;
window_start | window_end | sensor_id | avg_temp | reading_count
---------------------+---------------------+-----------+----------+---------------
2026-04-01 09:59:00 | 2026-04-01 10:00:00 | 3 | 21 | 1
2026-04-01 10:00:00 | 2026-04-01 10:01:00 | 1 | 22.77 | 3
2026-04-01 10:00:00 | 2026-04-01 10:01:00 | 2 | 23.3 | 2
The late event (sensor 3 at 09:59:45) was correctly placed in the 09:59-10:00 window, not discarded, because it arrived within the 10-second watermark tolerance.
Time Travel Queries on Materialized Views
"Time travel" in streaming SQL means querying what data looked like at a specific past point in time. RisingWave materialized views over windowed queries retain historical window results, making this pattern straightforward.
Building a Historical Revenue View
CREATE TABLE tmp_metrics (
metric_name VARCHAR,
value DOUBLE PRECISION,
region VARCHAR,
event_time TIMESTAMP
);
INSERT INTO tmp_metrics VALUES
('revenue', 1200.00, 'us-east', '2026-04-01 07:15:00'),
('revenue', 850.00, 'us-west', '2026-04-01 07:30:00'),
('revenue', 3400.00, 'us-east', '2026-04-01 08:20:00'),
('revenue', 920.00, 'eu-west', '2026-04-01 08:45:00'),
('revenue', 2100.00, 'us-east', '2026-04-01 09:10:00'),
('revenue', 1560.00, 'us-west', '2026-04-01 09:50:00'),
('revenue', 780.00, 'eu-west', '2026-04-01 09:55:00');
CREATE MATERIALIZED VIEW tmp_mv_hourly_revenue AS
SELECT
window_start,
window_end,
region,
SUM(value) AS total_revenue,
COUNT(*) AS transaction_count
FROM TUMBLE(tmp_metrics, event_time, INTERVAL '1 hour')
WHERE metric_name = 'revenue'
GROUP BY window_start, window_end, region;
The materialized view now holds a complete record of hourly revenue by region. Query it like a regular table:
SELECT * FROM tmp_mv_hourly_revenue ORDER BY window_start, region;
window_start | window_end | region | total_revenue | transaction_count
---------------------+---------------------+---------+---------------+-------------------
2026-04-01 07:00:00 | 2026-04-01 08:00:00 | us-east | 1200 | 1
2026-04-01 07:00:00 | 2026-04-01 08:00:00 | us-west | 850 | 1
2026-04-01 08:00:00 | 2026-04-01 09:00:00 | eu-west | 920 | 1
2026-04-01 08:00:00 | 2026-04-01 09:00:00 | us-east | 3400 | 1
2026-04-01 09:00:00 | 2026-04-01 10:00:00 | eu-west | 780 | 1
2026-04-01 09:00:00 | 2026-04-01 10:00:00 | us-east | 2100 | 1
2026-04-01 09:00:00 | 2026-04-01 10:00:00 | us-west | 1560 | 1
(7 rows)
Querying a Specific Past Window
To reconstruct what the 8 AM hour looked like, filter on window_start:
SELECT
window_start,
window_end,
region,
total_revenue,
transaction_count
FROM tmp_mv_hourly_revenue
WHERE window_start = '2026-04-01 08:00:00'
ORDER BY region;
window_start | window_end | region | total_revenue | transaction_count
---------------------+---------------------+---------+---------------+-------------------
2026-04-01 08:00:00 | 2026-04-01 09:00:00 | eu-west | 920 | 1
2026-04-01 08:00:00 | 2026-04-01 09:00:00 | us-east | 3400 | 1
(2 rows)
This is the fundamental time travel pattern: the materialized view accumulates one row per closed window, and you can filter to any past window using a simple WHERE window_start = '...' clause. No snapshot mechanism, no separate historical store, no batch recomputation.
Cross-Window Comparison
A common operational pattern is comparing two time windows side by side to detect trends or regressions. First, build the windowed clickstream MV:
CREATE MATERIALIZED VIEW tmp_mv_click_hourly AS
SELECT
window_start,
window_end,
page,
COUNT(DISTINCT user_id) AS unique_users,
COUNT(*) AS total_clicks
FROM TUMBLE(tmp_clickstream, event_time, INTERVAL '30 minutes')
GROUP BY window_start, window_end, page;
Then query across time buckets:
SELECT
window_start,
page,
unique_users,
total_clicks,
CASE
WHEN window_start = '2026-04-01 09:00:00' THEN '09:00-09:30'
WHEN window_start = '2026-04-01 09:30:00' THEN '09:30-10:00'
WHEN window_start = '2026-04-01 10:00:00' THEN '10:00-10:30'
END AS time_bucket
FROM tmp_mv_click_hourly
ORDER BY window_start, total_clicks DESC;
window_start | page | unique_users | total_clicks | time_bucket
---------------------+-----------+--------------+--------------+-------------
2026-04-01 09:00:00 | /home | 1 | 1 | 09:00-09:30
2026-04-01 09:30:00 | /confirm | 1 | 1 | 09:30-10:00
2026-04-01 09:30:00 | /checkout | 1 | 1 | 09:30-10:00
2026-04-01 10:00:00 | /home | 2 | 2 | 10:00-10:30
2026-04-01 10:00:00 | /products | 1 | 1 | 10:00-10:30
(5 rows)
Temporal Joins: Matching Events to State at Event Time
A temporal join enriches a stream of events with slowly changing reference data, looking up the value that was valid at the moment the event occurred. The classic example is applying the correct price to a purchase based on when the order was placed, not today's price.
CREATE TABLE tmp_price_history (
product_id INT,
price DOUBLE PRECISION,
valid_from TIMESTAMP,
valid_to TIMESTAMP
);
INSERT INTO tmp_price_history VALUES
(1, 19.99, '2026-04-01 00:00:00', '2026-04-01 08:00:00'),
(1, 24.99, '2026-04-01 08:00:00', '2026-04-01 12:00:00'),
(1, 29.99, '2026-04-01 12:00:00', '9999-12-31 23:59:59'),
(2, 49.99, '2026-04-01 00:00:00', '2026-04-01 10:00:00'),
(2, 59.99, '2026-04-01 10:00:00', '9999-12-31 23:59:59');
CREATE TABLE tmp_purchase_events (
purchase_id INT,
product_id INT,
quantity INT,
event_time TIMESTAMP
);
INSERT INTO tmp_purchase_events VALUES
(101, 1, 2, '2026-04-01 07:30:00'), -- before price increase
(102, 1, 1, '2026-04-01 09:00:00'), -- after first price change
(103, 2, 3, '2026-04-01 11:00:00'), -- after second product's price change
(104, 1, 5, '2026-04-01 13:00:00'); -- after second price change
The temporal join uses the event_time to select the correct price row:
SELECT
p.purchase_id,
p.product_id,
p.quantity,
p.event_time,
ph.price AS unit_price,
p.quantity * ph.price AS total_price
FROM tmp_purchase_events p
JOIN tmp_price_history ph
ON p.product_id = ph.product_id
AND p.event_time >= ph.valid_from
AND p.event_time < ph.valid_to
ORDER BY p.purchase_id;
purchase_id | product_id | quantity | event_time | unit_price | total_price
-------------+------------+----------+---------------------+------------+-------------
101 | 1 | 2 | 2026-04-01 07:30:00 | 19.99 | 39.98
102 | 1 | 1 | 2026-04-01 09:00:00 | 24.99 | 24.99
103 | 2 | 3 | 2026-04-01 11:00:00 | 59.99 | 179.97
104 | 1 | 5 | 2026-04-01 13:00:00 | 29.99 | 149.95
(4 rows)
Purchase 101 correctly uses the 19.99 price that was active at 07:30, not the current 29.99. This is impossible to achieve with a standard equi-join, which would always return the most recent price row.
Processing Time with proctime()
For scenarios where you want to watermark and window on arrival time rather than event time, RisingWave provides the proctime() computed column. Unlike event_time, which you populate from your data, proctime() is assigned automatically by the database at ingestion.
CREATE TABLE tmp_proc_time_events (
event_id INT,
event_type VARCHAR,
event_time TIMESTAMP,
proc_time TIMESTAMPTZ AS proctime()
) APPEND ONLY;
INSERT INTO tmp_proc_time_events (event_id, event_type, event_time) VALUES
(1, 'click', '2026-04-01 10:00:00'),
(2, 'purchase', '2026-04-01 10:01:00'),
(3, 'view', '2026-04-01 10:05:00');
SELECT event_id, event_type, event_time, proc_time FROM tmp_proc_time_events ORDER BY event_id;
event_id | event_type | event_time | proc_time
----------+------------+---------------------+-------------------------------
1 | click | 2026-04-01 10:00:00 | 2026-04-02 07:39:29.456+00:00
2 | purchase | 2026-04-01 10:01:00 | 2026-04-02 07:39:29.456+00:00
3 | view | 2026-04-01 10:05:00 | 2026-04-02 07:39:52.456+00:00
(3 rows)
Events 1 and 2 share the same proc_time because they were inserted in the same batch, while event 3 was inserted later. The proc_time column reflects wall-clock ingestion time, independent of the event_time values in your data.
Use proctime() for:
- Operational monitoring where you care about system throughput, not user activity timing
- Debugging pipeline latency by comparing
proc_timetoevent_time - Watermarking when your data sources do not carry reliable event timestamps
Putting It All Together: A Decision Framework
| Question | Time Domain | Approach |
| What happened in a past time window? | Event time | Temporal filter on event_time column |
| What did my system process recently? | Processing time | Temporal filter on proc_time or ingest_time |
| What was the state at 3 PM yesterday? | Event time | Query MV with WHERE window_start = '...' |
| How much data did we ingest this hour? | Processing time | Temporal filter on proctime() column |
| How late is my data arriving? | Both | ingest_time - event_time comparison |
| What price applied when an order was placed? | Event time | Temporal join with valid_from/valid_to |
| Finalized aggregates for closed windows? | Event time | EMIT ON WINDOW CLOSE with watermark |
Related Topics
For a deeper look at the window functions referenced in this post, see Windowing in Stream Processing: Tumbling, Hopping, Sliding, and Session Windows.
If you are ingesting event streams from Kafka and want to understand how sources connect to temporal filters, Processing Kafka Streams Without Flink or Java walks through that end-to-end setup.
For operational use cases where temporal filters drive real-time alert conditions, Building a Real-Time Alerting System on Streaming Data shows how to compose temporal filters with threshold logic.
Frequently Asked Questions
What is the difference between event time and processing time in streaming SQL?
Event time is when an event actually occurred, recorded in the data itself (for example, a timestamp in a log line or a sensor reading). Processing time is when the streaming system received and processed the event. These diverge when data sources have delays, go offline, or batch-send records. Event-time semantics produce correct business results even with late arrivals; processing-time semantics reflect what the pipeline saw and when, which is useful for operational monitoring.
How does a watermark handle late data in RisingWave?
A watermark is a monotonically increasing timestamp that tracks event-time progress. You define it as an expression like event_time - INTERVAL '30 seconds', which creates a tolerance window. Events arriving within that tolerance are counted in the correct window. Events arriving after the watermark has advanced beyond their window boundary are considered late; RisingWave either drops them or, if EMIT ON WINDOW CLOSE is not used, may still include them in live queries against the base table. The watermark controls when closed windows are emitted as finalized results.
Can I query a materialized view for data at a specific point in the past?
Yes. When a materialized view groups by window boundaries (using TUMBLE, HOP, or similar functions), each closed window is stored as a separate row. Filtering on window_start reconstructs what the data looked like during that time interval. This is the time travel pattern in streaming SQL: the MV accumulates historical windows that you can query at any time with a standard WHERE clause, without maintaining a separate historical snapshot store.
When should I use EMIT ON WINDOW CLOSE instead of the default behavior?
Use EMIT ON WINDOW CLOSE when your downstream consumers expect finalized, append-only results rather than continuously updated values. This is common when writing to event stores, data lakehouses (Apache Iceberg, Delta Lake), or Kafka topics where retractions are not supported. It requires a watermark on the source so RisingWave knows when a window is complete. Without EMIT ON WINDOW CLOSE, the MV emits incremental updates with every new event, which is better for dashboards that need low-latency partial results.
Conclusion
Temporal reasoning is what separates a streaming database from a conventional one. The key ideas from this post:
- Event time vs. processing time: choose event time for business correctness, processing time for operational observability.
- Temporal filters (
WHERE event_time > now() - INTERVAL '1 hour') restrict queries to specific time ranges using either event or processing timestamps. - Watermarks define how much lateness you tolerate, allowing the system to close windows and produce finalized results without waiting forever.
EMIT ON WINDOW CLOSEdelivers append-only, finalized window results once the watermark advances past a window boundary.- Time travel queries on windowed materialized views reconstruct past state by filtering on
window_start, with no extra infrastructure. - Temporal joins apply time-bounded reference data to events using overlapping validity periods.
All of these patterns run as plain SQL in RisingWave with no custom operators or Java code.
Ready to try temporal queries on your own data? Get started with RisingWave in 5 minutes. Quickstart
Join the RisingWave Slack community to ask questions and share what you are building.

