How to Use dbt with RisingWave for Streaming Transformations

How to Use dbt with RisingWave for Streaming Transformations

·

9 min read

TL;DR

RisingWave has a dedicated dbt adapter called dbt-risingwave. Install it with pip install dbt-risingwave, point your profiles.yml at RisingWave's PostgreSQL-compatible endpoint, and use the materialized_view materialization to make dbt models that update continuously in real time without any cron jobs or batch runs.

Why dbt + RisingWave?

Data engineers love dbt because it brings software engineering practices to SQL: version control, modular models, automated testing, and generated documentation. But dbt was designed for batch databases. You schedule a run, it transforms yesterday's data, and the results sit in a table until the next run.

RisingWave is a PostgreSQL-compatible streaming database. It processes data as it arrives and maintains materialized views that are always current. The catch is that raw SQL at this scale gets complex fast. You end up with hundreds of interdependent views and no clear way to test them or track lineage.

dbt + RisingWave solves both problems at once. dbt handles organization, testing, and documentation. RisingWave handles continuous execution. The result is a streaming pipeline that is both always fresh and maintainable by a normal data team.

How It Works: dbt Meets Streaming SQL

When you run dbt run against a traditional database, dbt issues CREATE TABLE AS SELECT or CREATE VIEW AS SELECT statements. In a batch database, those results are static snapshots.

In RisingWave, the same statements behave differently:

  • CREATE VIEW creates a logical view (not materialized, not maintained).
  • CREATE MATERIALIZED VIEW creates a continuously-updated view that RisingWave keeps synchronized with the underlying source data in real time.
  • CREATE TABLE AS SELECT creates a static snapshot like in PostgreSQL.

The dbt-risingwave adapter introduces a materialized_view materialization that maps dbt models directly to CREATE MATERIALIZED VIEW statements. Your dbt model becomes a live, incrementally-maintained query that RisingWave updates automatically whenever new data arrives upstream.

This is a fundamental shift from batch. There are no scheduled runs to maintain your transformed data. The transformation is always running.

Setting Up dbt with RisingWave

Install the adapter

pip install dbt-risingwave

Starting with dbt 1.8, installing an adapter no longer automatically installs dbt-core. If you are starting from scratch:

pip install dbt-core dbt-risingwave

Verify the installation:

dbt --version
# Should list risingwave as an installed adapter

Configure profiles.yml

Add a RisingWave profile to ~/.dbt/profiles.yml:

my_streaming_project:
  target: dev
  outputs:
    dev:
      type: risingwave
      host: localhost
      port: 4566
      user: root
      pass: ""
      dbname: dev
      schema: public

The default connection uses port 4566, which is RisingWave's PostgreSQL-compatible endpoint. For a cloud deployment, swap in the appropriate host, port, and credentials.

Test the connection:

dbt debug

Initialize a project

dbt init my_streaming_project

Select risingwave when prompted for the database type.

Your First Streaming dbt Model

Source data

First, create a source table in RisingWave. In a production setup this would typically be a source connected to Kafka or a CDC stream. For this walkthrough, use a regular table:

CREATE TABLE raw_orders (
    order_id    BIGINT,
    customer_id BIGINT,
    status      VARCHAR,
    amount      DECIMAL,
    created_at  TIMESTAMPTZ
);

Define the dbt source

In models/sources.yml:

version: 2

sources:
  - name: public
    tables:
      - name: raw_orders
        description: "Incoming orders from the ingestion layer"

Write a streaming model

Create models/customer_order_stats.sql:

{{ config(materialized='materialized_view') }}

SELECT
    customer_id,
    COUNT(*)                                       AS total_orders,
    SUM(amount)                                    AS lifetime_value,
    COUNT(*) FILTER (WHERE status = 'completed')   AS completed_orders,
    MAX(created_at)                                AS last_order_at
FROM {{ source('public', 'raw_orders') }}
GROUP BY customer_id

Run it:

dbt run --select customer_order_stats

dbt translates this into:

CREATE MATERIALIZED VIEW customer_order_stats AS
SELECT
    customer_id,
    COUNT(*)                                       AS total_orders,
    SUM(amount)                                    AS lifetime_value,
    COUNT(*) FILTER (WHERE status = 'completed')   AS completed_orders,
    MAX(created_at)                                AS last_order_at
FROM public.raw_orders
GROUP BY customer_id;

From this point forward, customer_order_stats stays current automatically. Insert a new order into raw_orders and the stats update within seconds -- no dbt run required.

Build a layered pipeline

dbt's modular model system works naturally with RisingWave's materialized views. You can chain models just like in a batch pipeline:

models/completed_orders.sql:

{{ config(materialized='materialized_view') }}

SELECT *
FROM {{ source('public', 'raw_orders') }}
WHERE status = 'completed'

models/vip_customers.sql:

{{ config(materialized='materialized_view') }}

SELECT
    customer_id,
    lifetime_value,
    last_order_at
FROM {{ ref('customer_order_stats') }}
WHERE lifetime_value > 500

Each layer is a separate materialized view in RisingWave. Updates propagate through the chain automatically as upstream data changes. RisingWave's incremental computation engine ensures that only changed rows are recomputed, not the full dataset.

Testing and Documentation

One of dbt's biggest strengths carries over completely to streaming pipelines.

Schema tests

In models/schema.yml:

version: 2

models:
  - name: customer_order_stats
    description: "Per-customer order aggregations, updated in real time"
    columns:
      - name: customer_id
        description: "Unique customer identifier"
        tests:
          - not_null
          - unique
      - name: total_orders
        tests:
          - not_null
      - name: lifetime_value
        tests:
          - not_null

Run tests:

dbt test --select customer_order_stats

dbt issues SELECT queries against the live materialized views to validate constraints. If a test fails, you know immediately -- because the view reflects current data, not a stale snapshot.

Generate documentation

dbt docs generate
dbt docs serve

dbt's lineage graph shows exactly which tables feed which materialized views. For a team maintaining dozens of streaming models, this is essential for understanding blast radius when a source schema changes.

Supported Materializations

The dbt-risingwave adapter ships eight materializations:

MaterializationRisingWave ObjectUse For
materialized_viewCREATE MATERIALIZED VIEWStreaming transformations (primary use case)
tableCREATE TABLE AS SELECTStatic snapshots
viewCREATE VIEWLogical views (not maintained)
ephemeralCTEIntermediate logic, not persisted
sourceCREATE SOURCEDefining streaming sources (Kafka, CDC, etc.)
table_with_connectorCREATE TABLE ... WITH (...)Append-only sources with raw data retention
sinkCREATE SINKWriting results to external systems
incrementalDeprecatedUse materialized_view instead

For most streaming transformation work, materialized_view is the right choice. The source and sink materializations are valuable when you want dbt to manage the full pipeline from ingestion to output.

Limitations and Workarounds

Understanding where dbt-risingwave diverges from the standard dbt experience helps you plan ahead.

No MERGE or incremental updates

Standard dbt incremental models use INSERT, UPDATE, or MERGE to update rows. RisingWave's materialized views do not support these operations directly -- they are maintained by the streaming engine, not by explicit write statements. Use materialized_view instead of incremental.

No unique indexes

RisingWave does not support the unique index type from the PostgreSQL adapter. The dbt-risingwave adapter skips index creation silently. If your downstream BI tools require primary key semantics, enforce uniqueness at the application layer or in the sink.

No user-defined function updates

The adapter cannot replace an existing function body. If you update a UDF definition, you need to drop and recreate it manually in RisingWave before running dbt.

Schema changes require dropping views

In RisingWave, altering a materialized view requires dropping it and all downstream views that depend on it, then recreating them. This is an architectural property of streaming systems, not a limitation of the adapter. The adapter supports zero_downtime rebuilds via:

{{ config(materialized='materialized_view', zero_downtime={'enabled': true}) }}

This creates a replacement view, validates it, then swaps the names -- reducing downtime for model updates in production.

dbt Cloud vs. dbt Core

The dbt-risingwave adapter works with dbt Core. dbt Cloud support depends on Hashicorp having certified the adapter for their hosted environment. For production use today, dbt Core running via a scheduled job (or a CI/CD pipeline) is the standard approach.

Key Takeaways

  • RisingWave has a dedicated dbt-risingwave adapter, not just PostgreSQL adapter compatibility.
  • The materialized_view materialization is the primary tool. It maps a dbt model to a continuously-updated view in RisingWave.
  • dbt testing, documentation, and lineage tracking work without modification against streaming materialized views.
  • Schema changes require dropping and recreating dependent views. Use the zero_downtime option for production deployments.
  • The source and sink materializations let dbt manage the full pipeline from Kafka ingestion through to output.

For teams already using dbt, adding RisingWave is a path to real-time pipelines without rewriting models or adopting a new transformation framework. The same SQL, the same testing discipline, the same documentation habits -- running continuously instead of on a schedule.

FAQ

Does dbt-risingwave use the PostgreSQL adapter under the hood?

No. dbt-risingwave is a standalone adapter built specifically for RisingWave. It is not a wrapper around dbt-postgres. It introduces RisingWave-specific materializations like materialized_view, source, and sink that have no equivalent in the PostgreSQL adapter.

Can I use dbt seeds and snapshots with RisingWave?

dbt seeds (loading CSV files as tables) work with RisingWave because they use standard INSERT statements. dbt snapshots rely on MERGE or update logic that RisingWave does not support in the same way as a transactional database. For slowly-changing dimension tracking in a streaming context, a materialized view over a CDC source is the recommended approach.

How do I connect a Kafka topic as a dbt source?

Use the source materialization to define a RisingWave source connected to Kafka, then reference it with {{ source(...) }} in downstream models:

-- models/kafka_orders_source.sql
{{ config(
    materialized='source',
    connector='kafka',
    properties={
        'kafka.brokers': 'localhost:9092',
        'kafka.topic': 'orders',
        'kafka.scan.startup.mode': 'earliest'
    },
    data_format='JSON'
) }}

Downstream models reference this source exactly like any other dbt model, while RisingWave handles continuous ingestion from Kafka.


RisingWave is an open-source streaming database (Apache 2.0). You can run it locally with Docker or connect to RisingWave Cloud for a managed deployment. The dbt-risingwave adapter is available on PyPI and GitHub.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.