A streaming data warehouse built on RisingWave and Apache Iceberg combines the freshness of a real-time pipeline with the analytical depth of a traditional data warehouse—without the cost of either a dedicated streaming engine or a commercial warehouse. RisingWave continuously transforms data from Kafka and databases, materializes it into star schema or OBT structures, and sinks the results to Iceberg on S3 where any SQL engine can query them.
Why Build a Streaming Data Warehouse?
Traditional data warehouses process data in daily or hourly batch windows. Business decisions increasingly need data that is minutes or seconds old, not hours. But real-time databases like Redis or Cassandra lack the SQL expressiveness that analysts need.
The streaming data warehouse fills this gap. It looks like a data warehouse to analysts (SQL, schemas, tables, joins) but operates like a stream processor internally (continuous ingestion, incremental computation, no batch jobs).
RisingWave is the streaming computation layer. Apache Iceberg is the durable storage layer. Together they deliver a warehouse that is always fresh, infinitely scalable in storage, and queryable from any modern analytics tool.
System Architecture
The streaming data warehouse has five layers:
- Sources: Operational databases (Postgres, MySQL), event streams (Kafka), and external APIs
- Ingestion: RisingWave CDC sources and Kafka connectors
- Transformation: RisingWave materialized views implementing business logic
- Storage: Apache Iceberg tables on S3 (raw, ODS, DWD, DWS, ADS layers)
- Serving: Trino, Spark, DuckDB, or RisingWave v2.8 for ad hoc analytics
Building the Data Warehouse Layers
ODS Layer: Operational Data Store
The ODS layer captures raw changes from source systems with minimal transformation.
-- Capture raw orders from MySQL
CREATE SOURCE raw_orders_mysql
WITH (
connector = 'mysql-cdc',
hostname = 'orders-db.prod',
port = '3306',
username = 'cdc_reader',
password = 'secret',
database.name = 'ecommerce',
table.name = 'orders'
);
-- Capture raw products from Kafka enrichment stream
CREATE SOURCE raw_products_kafka (
product_id VARCHAR,
name VARCHAR,
category VARCHAR,
price DECIMAL(10,2),
brand VARCHAR,
updated_at TIMESTAMPTZ
)
WITH (
connector = 'kafka',
topic = 'catalog.products',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE JSON;
DWD Layer: Data Warehouse Detail
The DWD layer standardizes data types, applies business rules, and joins related entities.
CREATE MATERIALIZED VIEW dwd_order_details AS
SELECT
o.order_id,
o.customer_id,
o.product_id,
p.name AS product_name,
p.category AS product_category,
p.brand AS product_brand,
o.quantity,
o.unit_price,
o.quantity * o.unit_price AS line_total,
o.discount_pct,
o.quantity * o.unit_price
* (1 - COALESCE(o.discount_pct, 0) / 100) AS net_revenue,
o.order_status,
o.region,
o.created_at AS order_time,
DATE_TRUNC('day', o.created_at) AS order_date
FROM raw_orders_mysql AS o
LEFT JOIN raw_products_kafka FOR SYSTEM_TIME AS OF o.created_at AS p
ON o.product_id = p.product_id;
The temporal join (FOR SYSTEM_TIME AS OF) ensures the product details match what was in the catalog at the time of the order, not today's version.
DWS Layer: Data Warehouse Summary
CREATE MATERIALIZED VIEW dws_daily_sales AS
SELECT
order_date,
region,
product_category,
product_brand,
COUNT(DISTINCT order_id) AS order_count,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(quantity) AS units_sold,
SUM(net_revenue) AS total_revenue,
AVG(net_revenue) AS avg_order_value,
SUM(CASE WHEN discount_pct > 0
THEN 1 ELSE 0 END) AS discounted_orders
FROM dwd_order_details
GROUP BY order_date, region, product_category, product_brand;
Sinking Each Layer to Iceberg
-- ODS sink (raw CDC data, append-only)
CREATE SINK ods_orders_sink AS
SELECT * FROM raw_orders_mysql
WITH (
connector = 'iceberg',
type = 'append-only',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://data-warehouse/iceberg',
s3.region = 'us-east-1',
database.name = 'ods',
table.name = 'orders'
);
-- DWS sink (aggregated, upsert)
CREATE SINK dws_daily_sales_sink AS
SELECT * FROM dws_daily_sales
WITH (
connector = 'iceberg',
type = 'upsert',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-catalog:8181',
warehouse.path = 's3://data-warehouse/iceberg',
s3.region = 'us-east-1',
database.name = 'dws',
table.name = 'daily_sales'
);
Layer Comparison
| Layer | Update Frequency | Iceberg Sink Type | Primary Consumers | Retention |
| ODS (raw) | Per CDC event | append-only | Data engineers, auditors | 90 days |
| DWD (detail) | Per CDC event | upsert | Exploratory analytics | 365 days |
| DWS (summary) | Per window close | upsert | BI dashboards | 3 years |
| ADS (application) | Per window close | upsert | Applications, APIs | 1 year |
Connecting BI Tools to Your Streaming Warehouse
Once data lands in Iceberg, any SQL engine can serve as the query layer:
Trino: Best for complex ad hoc SQL across large datasets. Configure the Iceberg connector with your REST catalog URI.
Apache Spark: Best for machine learning feature pipelines and large-scale historical analysis.
DuckDB: Best for single-analyst workloads, local data exploration, and lightweight dashboards.
RisingWave v2.8: Best for queries that mix live streaming data (materialized views) with historical Iceberg data in a single query.
Performance Optimization for Warehouse Queries
Two patterns dramatically improve query performance on Iceberg-backed warehouses:
Sorted compaction: Sort data within Parquet files by the most common filter column (e.g., region, product_category). Iceberg's min/max statistics per file enable entire-file skipping, reducing effective scan size by 5–20x.
Materialized view precomputation: Keep the DWS layer in RisingWave's in-memory state for sub-second dashboard queries. The Iceberg sink provides durability and historical access; RisingWave serves the hot path.
FAQ
Q: How does this architecture compare to dbt + a batch warehouse? A: dbt + Snowflake/BigQuery processes data in batch windows (typically hourly). The RisingWave + Iceberg architecture updates continuously. For teams that need data fresher than 1 hour, RisingWave is the better choice. For teams primarily running historical reports, traditional warehouses may be simpler.
Q: Can I use existing dbt models with RisingWave?
A: RisingWave is PostgreSQL-compatible, so many dbt models work without modification. The key difference is that CREATE MATERIALIZED VIEW in RisingWave updates incrementally, not on a dbt schedule run.
Q: How do I implement slowly changing dimensions (SCD) in this architecture? A: Use RisingWave's temporal join for SCD Type 2 lookups. For SCD Type 1 (overwrite), use an upsert Iceberg sink. For SCD Type 6 (hybrid), combine upsert for current state with append-only for history.
Q: What is the data freshness SLA with this architecture? A: End-to-end latency from source change to Iceberg table is typically 60–120 seconds (one to two RisingWave checkpoint intervals). For sub-second serving, query RisingWave materialized views directly rather than Iceberg.
Q: How do I handle source schema changes without pipeline downtime?
A: RisingWave's CDC sources automatically propagate schema changes for ADD COLUMN operations. For DROP COLUMN or renames, update the affected materialized views and recreate the sinks. Iceberg handles the underlying table schema migration.
Get Started
Build your streaming data warehouse today with the RisingWave quickstart guide. Connect with data engineers building production lakehouses in the RisingWave Slack community.

