Join our Streaming Lakehouse Tour!
Register Now.->
Fresh Data, Faster: Stream Directly from RisingWave to Snowflake

Fresh Data, Faster: Stream Directly from RisingWave to Snowflake

Getting fresh, analysis-ready data into your Snowflake warehouse can often feel like a race against time. Traditional batch ETL processes, while reliable, can introduce delays, meaning your insights are always a few hours (or more) behind reality. What if you could shrink that latency window significantly, and have data arrive in Snowflake already transformed and prepared for your BI tools and queries?

That's exactly what RisingWave's new Snowflake sink connector enables.

For those new to us, RisingWave is a unified real-time data platform designed to ingest, process, and transform event stream data as it arrives. It's not just about where the data goes; RisingWave is also incredibly flexible about where it comes from. It can readily ingest data from popular streaming platforms like Apache Kafka or Apache Pulsar, tap into change data capture (CDC) streams from your transactional databases (like PostgreSQL or MySQL), or consume event streams generated by interactions between your applications and microservices. Think of it as a powerful engine that can perform complex SQL-based computations (joins, aggregations, windowing) on all this diverse data in real-time.

Now, with our Snowflake sink connector, you can build a continuous bridge from RisingWave directly into your Snowflake tables. So, what does this mean for your data in Snowflake and how you work with it?

  • Get Data Sooner, Act Faster: The most immediate impact is on data freshness. Instead of waiting for batch jobs to periodically collect, transform, and load, RisingWave streams processed data continuously. This means the data landing in Snowflake is always fresh, reflecting the latest events, and making your dashboards and reports truly current.

  • Simplify Your Data Prep: A lot of the heavy lifting happens before your data even reaches Snowflake. RisingWave excels at handling complex data transformations, enrichments, and aggregations in-flight. Imagine joining multiple streams, calculating running totals, or filtering events based on intricate logic – all this is done within RisingWave. The result? Data arriving in Snowflake is clean, structured, and immediately usable. This can significantly simplify your transformation logic within Snowflake and potentially reduce your compute costs there.

How Does It Work Under the Hood?

It's a smooth hand-off. RisingWave ingests data from your chosen sources, performs the real-time transformations you've defined, and then writes the processed data in JSON format to an S3 bucket you manage. From there, Snowflake's Snowpipe service automatically and continuously loads this data into your target Snowflake tables. We've designed it to integrate seamlessly with Snowpipe's existing automation capabilities for S3.

Why Use Risingwave if Snowpipe Is Involved?

That's a fair question: if the data ultimately lands in S3 and Snowpipe takes over for loading into Snowflake, why add RisingWave to the mix instead of just piping raw data to S3 and using Snowpipe directly?

Snowpipe excels at efficiently and automatically loading files from a stage (like S3) into Snowflake tables. It's fantastic for getting raw or semi-structured data into the warehouse.

The crucial difference, and the power RisingWave brings, is what happens before the data even hits that S3 bucket for Snowpipe:

  1. Real-time, Complex Transformations: RisingWave processes and transforms data as it streams. This means you can perform SQL-based joins across different event streams (e.g., joining user clickstreams from Kafka with user dimension data from a CDC stream), run complex aggregations, apply intricate filtering logic, or enrich data before it's written out. Snowpipe, by itself, loads data; transformations in Snowflake typically happen after loading, adding latency and requiring separate compute.

  2. Stateful Processing and Materialized Views: RisingWave can maintain materialized views that are incrementally and efficiently updated with low latency as new data arrives. This allows you to pre-compute complex analytical views on your streaming data (like sessionization or real-time leaderboards). These refined, ready-to-query views are then what get sunk to Snowflake, not just raw events.

  3. Direct Source Integration & Initial Processing: RisingWave directly connects to diverse sources like Kafka, Pulsar, or CDC streams from databases. It handles the ingestion protocols, deserialization, and can perform initial filtering or schema enforcement before any complex transformations. To use Snowpipe directly for these sources, you'd often need another system or custom code to first extract the data from these systems and land it as files in S3.

  4. Reduced Data Prep and Latency in Snowflake: Because data arrives in Snowflake already transformed, aggregated, and in a more analytics-ready state, you can simplify or reduce the transformation steps needed within Snowflake itself. This means insights are available faster, and potentially with less Snowflake compute.

In essence, RisingWave acts as a powerful real-time ingestion, pre-processing, and transformation engine. It ensures the data arriving in Snowflake via Snowpipe isn't just raw data, but valuable, refined, and timely insights ready for immediate use.

Getting Started

Setting it up involves a few key steps. You'll need your S3 bucket details and to ensure Snowpipe is configured to watch that S3 location (Snowflake has great guides on Automating Snowpipe for Amazon S3).

Then, within RisingWave, creating the sink is straightforward. Let's say you have a materialized view ss_mv in RisingWave that's already crunching your streaming data:

CREATE SINK snowflake_sink
FROM ss_mv -- Your materialized view or source
WITH (
    connector = 'snowflake',
    type = 'append-only', -- Or handle upserts! More on that below.
    s3.bucket_name = 'your-s3-bucket-name',
    s3.credentials.access = 'your-aws-access-key',
    s3.credentials.secret = 'your-aws-secret-key',
    s3.region_name = 'your-s3-bucket-region',
    s3.path = 'path/to/data/', -- Optional: a specific path within your bucket
    force_append_only = 'true' -- If you only need appends
);

RisingWave will then start populating your S3 bucket with data, and Snowpipe will take it from there into Snowflake.

What About Updates and Deletes (Upserts)?

This is where things get really interesting. Snowflake tables don't natively support direct "upsert" operations via Snowpipe in the same way a transactional database might. But RisingWave has a clever solution.

When you define a materialized view in RisingWave using AS CHANGELOG, RisingWave keeps track of not just new rows, but also updates and deletes. It provides special columns like __op (indicating insert, delete, update_before, and update_after) and __row_id (for ordering changes).

CREATE SINK snowflake_sink as WITH sub AS changelog FROM user_behaviors
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp,
changelog_op AS __op,
_changelog_row_id::bigint AS __row_id
FROM
sub WITH (
connector = 'snowflake',
type = 'append-only',
s3.bucket_name = 'EXAMPLE_S3_BUCKET',
s3.credentials.access = 'EXAMPLE_AWS_ACCESS',
s3.credentials.secret = 'EXAMPLE_AWS_SECRET',
s3.region_name = 'EXAMPLE_REGION',
s3.path = 'EXAMPLE_S3_PATH',
);

Our Snowflake sink sends these detailed change logs to S3. Then, in Snowflake, you can use its powerful Dynamic Tables feature to automatically merge these incremental changes and present the latest state of your data. It’s a neat way to maintain an up-to-date, queryable view of your data, even as it changes frequently upstream.

For example, with the Snowflake sink created in RisingWave, you might create a Dynamic Table in Snowflake like this:

-- In Snowflake
CREATE OR REPLACE DYNAMIC TABLE current_user_behaviors
TARGET_LAG = '1 minute' -- How fresh you want it
WAREHOUSE = your_snowflake_warehouse
AS
SELECT *
FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY primary_key_column ORDER BY __row_id DESC) as rn FROM your_staging_table_fed_by_snowpipe -- This table gets data from RisingWave
)
WHERE rn = 1 AND (__op = 1 OR __op = 3); -- __op=1 (Insert), __op=3 (Update_after);

This dynamic table intelligently picks the latest version of each row based on the changelog data provided by RisingWave.

Availability

The Snowflake sink connector is a premium feature available in RisingWave. It's included out-of-the-box with RisingWave Cloud. For self-hosted deployments leveraging more than 4 cores, a license key is required.

Ready to Accelerate Your Data-To-Insight Pipeline?

Stop waiting for stale data. With RisingWave and the new Snowflake sink connector, you can power your Snowflake analytics with fresh, continuously updated, and pre-processed data.

We're excited to see how you'll use this to unlock new possibilities with your real-time data in Snowflake!

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.