Your Airflow DAGs run every hour. Your stakeholders want data in five seconds. That gap is not a tuning problem. It is an architecture problem.
Batch ETL served data teams well for a decade. Apache Airflow became the standard orchestrator: write a Python DAG, schedule it on a cron, retry on failure, move on. But modern data products - real-time dashboards, fraud detection, personalized recommendations - cannot wait for the next scheduled run. Every hour of latency is an hour of stale decisions.
Streaming ETL pipelines solve this by processing data continuously as it arrives. Instead of scheduling extract-transform-load cycles, you declare what the pipeline should do and the system keeps results up to date automatically. In this article, you will build a complete streaming ETL pipeline using SQL in RisingWave, replacing a typical Airflow batch workflow with continuous CDC ingestion, transformation via materialized views, and delivery to a downstream data warehouse.
The Batch ETL Problem: Why Airflow Falls Short for Real-Time
Apache Airflow is a workflow orchestrator, not a data processor. It excels at coordinating tasks - triggering a Spark job, calling an API, moving files between systems. But when teams use Airflow to build ETL pipelines, they inherit several structural limitations.
Latency is baked into the architecture
Airflow DAGs run on schedules. The tightest practical interval is about one minute, and most production DAGs run every 15 minutes to an hour. That means your downstream tables are always at least one scheduling interval behind the source. For use cases like inventory tracking, dynamic pricing, or user-facing analytics, this delay is unacceptable.
DAGs grow complex fast
A typical batch ETL DAG includes tasks for extraction, staging, transformation, quality checks, and loading. Each task needs error handling, retries, and dependency management. A pipeline that starts as 50 lines of Python grows to 500 as edge cases accumulate. When the business adds a new data source, you write another DAG. When a schema changes upstream, you debug task failures across multiple DAG runs.
Resource efficiency drops at scale
Batch pipelines reprocess entire datasets (or large partitions) on every run. If your orders table has 10 million rows and 500 changed since the last run, a batch pipeline still scans and loads the full partition. CDC-based streaming pipelines process only the 500 changes, using a fraction of the compute and putting less load on the source database.
Operational overhead compounds
Airflow requires its own infrastructure: a web server, a scheduler, a metadata database, workers (Celery or Kubernetes). Teams spend significant engineering time maintaining Airflow itself, not just the pipelines running on it. Debugging a failed DAG run means checking Airflow logs, task instance states, and often the underlying compute engine's logs too.
Streaming ETL Architecture: How It Works
A streaming ETL pipeline replaces scheduled batch runs with a continuous data flow. The architecture has three stages, each running concurrently:
graph LR
A[PostgreSQL / MySQL] -->|CDC| B[RisingWave]
B -->|Materialized Views| C[Transform & Enrich]
C -->|Sink| D[Snowflake / Iceberg / ClickHouse]
Stage 1: Continuous CDC ingestion
Change Data Capture reads the transaction log of your source database (PostgreSQL WAL, MySQL binlog) and streams every INSERT, UPDATE, and DELETE to RisingWave in real time. Unlike batch extraction, CDC captures only the changes, with sub-second latency and minimal impact on the source.
Stage 2: Transformation via materialized views
In RisingWave, you define transformations as SQL materialized views. These are not the static materialized views you know from PostgreSQL. RisingWave materialized views are incrementally maintained: when a new CDC event arrives, only the affected rows in the materialized view are recomputed. You can chain multiple materialized views together to build multi-step transformation logic, from cleaning and filtering to joins and aggregations.
Stage 3: Sinking to the data warehouse
RisingWave sink connectors push transformed results to downstream systems continuously. Supported targets include Snowflake, Apache Iceberg, ClickHouse, BigQuery, PostgreSQL, Kafka, and over a dozen more. Data arrives in your warehouse seconds after it changes in the source, not hours.
Side-by-Side Comparison: Airflow DAG vs. RisingWave SQL
To make the difference concrete, here is the same ETL pipeline implemented both ways. The pipeline ingests orders and customer data from PostgreSQL, joins and cleans them, computes regional revenue summaries, and delivers results to a data warehouse.
The Airflow approach
# airflow_etl_dag.py - ~120 lines for a basic pipeline
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'order_summary_etl',
default_args=default_args,
schedule_interval='@hourly', # <-- Latency floor: 1 hour
start_date=datetime(2026, 1, 1),
catchup=False,
)
# Task 1: Extract orders from source PostgreSQL
extract_orders = PostgresOperator(
task_id='extract_orders',
postgres_conn_id='source_postgres',
sql="""
COPY (
SELECT * FROM orders
WHERE updated_at >= '{{ prev_ds }}'
AND updated_at < '{{ ds }}'
) TO '/tmp/orders_{{ ds }}.csv' WITH CSV HEADER;
""",
dag=dag,
)
# Task 2: Extract customers
extract_customers = PostgresOperator(
task_id='extract_customers',
postgres_conn_id='source_postgres',
sql="COPY (SELECT * FROM customers) TO '/tmp/customers.csv' ...",
dag=dag,
)
# Task 3: Load to staging
load_staging = SnowflakeOperator(
task_id='load_staging',
snowflake_conn_id='snowflake_wh',
sql="COPY INTO staging.raw_orders FROM @stage/orders_{{ ds }}.csv ...",
dag=dag,
)
# Task 4: Transform - join, filter, aggregate
transform = SnowflakeOperator(
task_id='transform_orders',
snowflake_conn_id='snowflake_wh',
sql="""
INSERT INTO analytics.order_summary
SELECT region, customer_tier,
COUNT(*) AS total_orders,
SUM(quantity * unit_price) AS total_revenue,
AVG(quantity * unit_price) AS avg_order_value
FROM staging.raw_orders o
JOIN staging.raw_customers c ON o.customer_id = c.customer_id
WHERE o.order_status != 'cancelled'
GROUP BY region, customer_tier;
""",
dag=dag,
)
# Task 5: Data quality check
quality_check = PythonOperator(
task_id='quality_check',
python_callable=run_quality_checks,
dag=dag,
)
# Define dependencies
extract_orders >> load_staging >> transform >> quality_check
extract_customers >> load_staging
This DAG requires: an Airflow installation (scheduler, webserver, metadata DB, workers), connection management for PostgreSQL and Snowflake, file staging infrastructure, incremental extraction logic with watermarks, error handling and alerting for each task, and someone to maintain it when schemas change.
The RisingWave approach
The same pipeline in RisingWave is pure SQL, with zero Python, no scheduler, and no file staging:
-- Step 1: Connect to the source database via CDC
CREATE SOURCE pg_source WITH (
connector = 'postgres-cdc',
hostname = 'source-db.example.com',
port = '5432',
username = 'cdc_reader',
password = 'secure_password',
database.name = 'production',
schema.name = 'public'
);
-- Step 2: Ingest the orders and customers tables
CREATE TABLE raw_orders (
order_id INT PRIMARY KEY,
customer_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL,
order_status VARCHAR,
order_timestamp TIMESTAMPTZ
) FROM pg_source TABLE 'public.orders';
CREATE TABLE raw_customers (
customer_id INT PRIMARY KEY,
customer_name VARCHAR,
customer_email VARCHAR,
customer_tier VARCHAR,
region VARCHAR
) FROM pg_source TABLE 'public.customers';
Once these tables exist, RisingWave continuously ingests every change from the source PostgreSQL database. No extraction scripts. No scheduling.
-- Step 3: Clean and join (runs continuously)
CREATE MATERIALIZED VIEW cleaned_orders AS
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.customer_tier,
c.region,
o.product_id,
o.quantity,
o.unit_price,
o.quantity * o.unit_price AS total_amount,
o.order_status,
o.order_timestamp
FROM raw_orders o
JOIN raw_customers c ON o.customer_id = c.customer_id
WHERE o.order_status != 'cancelled';
-- Step 4: Aggregate (runs continuously)
CREATE MATERIALIZED VIEW order_summary_mv AS
SELECT
region,
customer_tier,
COUNT(*) AS total_orders,
SUM(total_amount) AS total_revenue,
AVG(total_amount) AS avg_order_value
FROM cleaned_orders
GROUP BY region, customer_tier;
The materialized views above were verified against RisingWave 2.8.0. Here is the actual output after inserting sample data:
region | customer_tier | total_orders | total_revenue | avg_order_value
---------+---------------+--------------+---------------+-----------------
us-east | enterprise | 3 | 899.72 | 299.91
eu-west | enterprise | 2 | 699.79 | 349.90
us-west | mid-market | 2 | 409.87 | 204.94
(3 rows)
When a new order is inserted in the source PostgreSQL, the order_summary_mv updates within seconds, with no manual trigger.
-- Step 5: Deliver results to Snowflake continuously
CREATE SINK order_summary_to_snowflake FROM order_summary_mv WITH (
connector = 'snowflake_v2',
type = 'upsert',
table.name = 'order_summary',
warehouse = 'ANALYTICS_WH',
database = 'PRODUCTION',
schema = 'ANALYTICS',
with_s3 = true,
s3.bucket_name = 'your-etl-bucket',
s3.region_name = 'us-east-1',
s3.path = '/risingwave-sink/',
s3.credentials.access = '<AWS_ACCESS_KEY>',
s3.credentials.secret = '<AWS_SECRET_KEY>',
jdbc.url = 'jdbc:snowflake://your-account.snowflakecomputing.com/',
username = 'rw_service_account',
password = '<SNOWFLAKE_PASSWORD>'
);
That is the entire pipeline. Five SQL statements replace an Airflow DAG with dozens of tasks, file-staging infrastructure, and a scheduler.
Feature Comparison: Batch Airflow vs. Streaming RisingWave
| Dimension | Airflow Batch ETL | RisingWave Streaming ETL |
| Latency | Minutes to hours (schedule-bound) | Seconds (continuous) |
| Language | Python DAGs + SQL | SQL only |
| Extraction | Full table scans or watermark queries | CDC (transaction log, changes only) |
| Transformation | Runs in external engine (Spark, Snowflake) | Incrementally maintained materialized views |
| Scheduling | Cron-based, requires scheduler infrastructure | None - runs continuously |
| Infrastructure | Airflow cluster + compute engine + staging | RisingWave instance |
| Error handling | Per-task retries, alerting, backfill logic | Built-in fault tolerance, automatic recovery |
| Schema changes | Manual DAG updates, potential pipeline breaks | Alter streaming job with ALTER commands |
| Incremental logic | Developer-managed watermarks and merge logic | Automatic incremental computation |
| Source load | High (repeated full/partition scans) | Low (reads only transaction log) |
Building the Pipeline Step by Step
This section walks through each component in detail so you can adapt the pattern to your own data.
Setting up CDC ingestion
RisingWave uses built-in CDC connectors to read directly from PostgreSQL or MySQL transaction logs. The shared source model means you create one connection per source database and then define multiple tables from it:
-- One connection, multiple tables
CREATE SOURCE ecommerce_cdc WITH (
connector = 'postgres-cdc',
hostname = 'db.prod.internal',
port = '5432',
username = 'cdc_user',
password = 'cdc_password',
database.name = 'ecommerce'
);
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT,
product_id INT,
quantity INT,
unit_price DECIMAL,
order_status VARCHAR,
order_timestamp TIMESTAMPTZ
) FROM ecommerce_cdc TABLE 'public.orders';
CREATE TABLE products (
product_id INT PRIMARY KEY,
product_name VARCHAR,
category VARCHAR,
supplier_id INT
) FROM ecommerce_cdc TABLE 'public.products';
CREATE TABLE customers (
customer_id INT PRIMARY KEY,
customer_name VARCHAR,
customer_email VARCHAR,
customer_tier VARCHAR,
region VARCHAR
) FROM ecommerce_cdc TABLE 'public.customers';
RisingWave first takes an initial snapshot of each table, then switches to streaming mode to capture ongoing changes. The snapshot parameter (default true) controls this behavior.
Chaining materialized views for multi-step transforms
One of the most powerful patterns in RisingWave is chaining materialized views. Each view is an incremental transformation step, and they compose together into a pipeline:
-- Layer 1: Clean and enrich
CREATE MATERIALIZED VIEW enriched_orders AS
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.customer_tier,
c.region,
p.product_name,
p.category,
o.quantity,
o.unit_price,
o.quantity * o.unit_price AS line_total,
o.order_status,
o.order_timestamp
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE o.order_status != 'cancelled';
-- Layer 2: Aggregate by region and category
CREATE MATERIALIZED VIEW revenue_by_region_category AS
SELECT
region,
category,
COUNT(*) AS order_count,
SUM(line_total) AS total_revenue,
AVG(line_total) AS avg_order_value
FROM enriched_orders
GROUP BY region, category;
-- Layer 3: Rank top categories per region
CREATE MATERIALIZED VIEW top_categories_by_region AS
SELECT
region,
category,
total_revenue,
order_count,
RANK() OVER (PARTITION BY region ORDER BY total_revenue DESC) AS revenue_rank
FROM revenue_by_region_category;
Each layer processes only the delta of changes from the layer below it. When a single order arrives, only the affected rows propagate through the chain. This is fundamentally different from batch ETL, where every layer reprocesses its entire input on each run.
Sinking to multiple destinations
RisingWave can deliver results to multiple downstream systems simultaneously. For example, you might want real-time data in both your data warehouse and a lakehouse:
-- Sink aggregated data to Snowflake for BI dashboards
CREATE SINK revenue_to_snowflake FROM revenue_by_region_category WITH (
connector = 'snowflake_v2',
type = 'upsert',
table.name = 'revenue_by_region_category',
warehouse = 'ANALYTICS_WH',
database = 'PRODUCTION',
schema = 'ANALYTICS',
with_s3 = true,
s3.bucket_name = 'etl-staging',
s3.region_name = 'us-east-1',
s3.path = '/snowflake-sink/',
s3.credentials.access = '<AWS_ACCESS_KEY>',
s3.credentials.secret = '<AWS_SECRET_KEY>',
jdbc.url = 'jdbc:snowflake://account.snowflakecomputing.com/',
username = 'rw_sink_user',
password = '<PASSWORD>'
);
-- Sink detailed records to Iceberg for long-term analytics
CREATE SINK orders_to_iceberg FROM enriched_orders WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'order_id',
catalog.type = 'rest',
catalog.uri = 'http://iceberg-rest:8181',
warehouse.path = 's3://lakehouse/warehouse',
database.name = 'ecommerce',
table.name = 'enriched_orders',
s3.endpoint = 'https://s3.amazonaws.com',
s3.region = 'us-east-1',
s3.access.key = '<AWS_ACCESS_KEY>',
s3.secret.key = '<AWS_SECRET_KEY>'
);
Both sinks receive updates continuously from their upstream materialized views. There is no need to coordinate delivery order or manage cross-system consistency manually.
When to Keep Airflow (and When to Replace It)
Streaming ETL is not the right answer for every workload. Here is a practical decision framework:
Replace Airflow with streaming ETL when:
- Your pipeline is a straightforward extract-transform-load from databases to a warehouse
- Stakeholders need data freshness measured in seconds, not hours
- You are spending engineering time maintaining watermark logic and incremental extraction
- Your source databases support CDC (PostgreSQL, MySQL, MongoDB)
Keep Airflow when:
- Your pipeline involves non-database sources (APIs, file uploads, ML model training)
- You need complex workflow orchestration with conditional branching and human-in-the-loop steps
- Your transformations require Python, R, or other non-SQL logic
- Your data processing is inherently bounded (monthly reports, quarterly audits)
Use both together when:
- Airflow orchestrates the broader workflow (triggering ML training, managing deployments)
- RisingWave handles the real-time data movement and transformation
- Airflow triggers periodic quality checks on the streaming pipeline's output
What is a streaming ETL pipeline?
A streaming ETL pipeline is a data integration architecture that continuously extracts data from source systems, transforms it in flight, and loads it into target systems without batch scheduling. Unlike traditional ETL that runs on a cron schedule (every hour, every day), a streaming ETL pipeline processes each data change as it happens. In RisingWave, you build streaming ETL pipelines using SQL: CDC sources handle extraction, materialized views handle transformation, and sink connectors handle loading.
How does streaming ETL compare to Apache Airflow for data pipelines?
Streaming ETL and Apache Airflow solve different problems. Airflow is a workflow orchestrator that coordinates batch tasks on a schedule. It is not a data processing engine itself, and it relies on external systems (Spark, Snowflake, dbt) to run transformations. Streaming ETL with RisingWave is a self-contained pipeline: CDC ingests changes continuously, materialized views transform data incrementally, and sinks deliver results in real time. The key tradeoff is latency versus flexibility. Airflow handles diverse workflows (APIs, ML, file processing). Streaming ETL delivers lower latency for database-to-warehouse pipelines with simpler operations.
Can RisingWave replace Airflow entirely?
RisingWave replaces the ETL pipeline portion of Airflow, not all of Airflow's functionality. If your Airflow DAGs primarily move data from databases to warehouses with SQL transformations, RisingWave can replace them entirely with continuous SQL pipelines. However, Airflow also handles workflow orchestration (conditional logic, external API calls, ML model training, human approvals) that falls outside the scope of a streaming database. Many teams run RisingWave for real-time data movement alongside Airflow for broader workflow coordination.
What databases and sinks does RisingWave support for streaming ETL?
RisingWave supports CDC ingestion from PostgreSQL, MySQL, MongoDB, SQL Server, and several other databases. For sinks, it delivers data to over 15 downstream systems including Snowflake, Apache Iceberg, ClickHouse, Google BigQuery, Apache Kafka, PostgreSQL, Delta Lake, Elasticsearch, and more. Both sources and sinks use the same SQL syntax (CREATE SOURCE, CREATE TABLE, CREATE SINK), so adding a new ingestion or delivery target is a single SQL statement.
Conclusion
Batch ETL pipelines built on Airflow served the industry well, but they carry inherent latency, complexity, and operational overhead that streaming architectures eliminate. Here are the key takeaways:
- CDC replaces batch extraction. Reading the transaction log is more efficient and lower-latency than repeated table scans. Your source database stays healthy, and your pipeline gets every change.
- Materialized views replace scheduled transforms. Incremental computation means only changed data is processed, automatically, without watermark logic or partition management.
- SQL replaces Python DAGs. A complete streaming ETL pipeline is five SQL statements, not a hundred-line Python DAG with infrastructure dependencies.
- Continuous sinks replace scheduled loads. Data arrives in your warehouse seconds after it changes in the source. Dashboards and models always have fresh data.
- Operational simplicity compounds. No scheduler to maintain, no task dependencies to debug, no file-staging infrastructure to manage.
The shift from batch to streaming ETL is not about adopting new technology for its own sake. It is about removing accidental complexity that batch architectures force onto data teams, and giving stakeholders the fresh data they actually need.
Ready to try this yourself? Get started with RisingWave in 5 minutes. Quickstart
Join our Slack community to ask questions and connect with other stream processing developers.

