Jupyter Notebook is an intuitive platform for creating and sharing data-driven projects. It combines code, output, and visualizations in a readable format. You can easily import libraries like Pandas, Plotly Express, and TensorFlow for advanced data science and machine learning tasks.
While Jupyter Notebook typically works with static data (e.g., CSV files), handling data streams requires a different approach. Stream processing is crucial for scenarios like real-time financial transactions or live sensor data, where new information is constantly generated and consumed.
RisingWave is a distributed streaming database that supports real-time data processing, ingestion, and transformation. It allows you to:
- Create materialized views using SQL commands
- Store and incrementally update query results
- Provide up-to-date analysis results
- Use PostgreSQL-compatible drivers for easy integration
This blog demonstrates how to use Jupyter Notebook with RisingWave to transform and visualize streaming data. The process involves:
- Creating periodically updated visualizations
- Connecting to RisingWave using Psycopg2
- Ingesting data from a messaging queue with SQL commands
- Transforming and analyzing the data
Set up RisingWave
In this tutorial, we will use RisingWave Cloud. You can sign up for a free account to test this process out. Alternatively, there is an option to deploy locally available. See the Quick start for detailed instructions.
Create a RisingWave Cloud project
To get started with RisingWave Cloud, visit RisingWave Cloud and its corresponding Quick start guide for instructions on creating a project.
Obtain host address
Before connecting to Jupyter Notebook, find the relevant host address of RisingWave. To do so, navigate to the project page and select the project containing the target database. Click on Connect and select Connection String under the startup mode option. Save this connection for future use. The host address component of the connection string is highlighted below.

With the RisingWave Cloud project configured, we can now establish a connection from Jupyter Notebook.
Jupyter Notebook
There are multiple ways to deploy a Jupyter Notebook project. For more information on how to get started, see Jupyter.
Connect to RisingWave with Psycopg2
To connect to the RisingWave database from a Jupyter Notebook project, we will use the Psycopg2 library, which is a PostgreSQL database adapter for Python applications. Additionally, the Pandas library will be used to format the data.
If these libraries are not already installed, use the pip install
commands to install them.
pip install psycopg2-binary
pip install pandas
import psycopg2
import pandas as pd
To establish a connection with the RisingWave database, use the psycopg2.connect
method. In the connection parameters, the host string is extracted from the connection string previously mentioned. For RisingWave, the default database is dev
and the default port is 4566
. Fill in the appropriate user and password. The default user is root
, which does not require a password.
conn = psycopg2.connect(
host="rwc-xyz.prod-aws-xyz-xyz-xyz.risingwave.cloud",
port=4566,
user="user",
password='password',
dbname="dev")
If there are issues when establishing this connection, see Connection errors for potential solutions.
Create a source
Once the connection is successfully established, we can use SQL queries to ingest and transform data. In a new code cell, run the following snippet of code to connect to a Kafka topic. The topic live_stream_metrics
contains data describing the video quality of a video livestream.
with conn.cursor() as cur:
cur.execute("""
CREATE SOURCE IF NOT EXISTS live_stream_metrics (
client_ip VARCHAR,
user_agent VARCHAR,
user_id VARCHAR,
room_id VARCHAR,
video_bps BIGINT,
video_fps BIGINT,
video_rtt BIGINT,
video_lost_pps BIGINT,
video_total_freeze_duration BIGINT,
report_timestamp TIMESTAMPTZ,
country VARCHAR
) WITH (
connector = 'kafka',
topic = 'live_stream_metrics',
properties.bootstrap.server = 'b0-kafka.demosrc.risingwave.internal:9092',
scan.startup.mode = 'latest'
) ROW FORMAT JSON;""")
Here is an example of connecting to a RisingWave database and creating a Kafka source.
Create materialized views
To transform the streaming data, we'll create a materialized view using SQL. This materialized view calculates video quality metrics for each stream in 10-minute windows, including the total freeze duration, average number of packets lost per second, and average round-trip time.
Materialized views contain the results of a view expression and are updated incrementally, saving on computational resources. Whenever new data arrives, the results are instantly updated, ensuring that you always have access to the freshest data.
with conn.cursor() as cur:
cur.execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS live_video_qos_1min AS
SELECT
window_start AS report_ts,
room_id,
SUM(video_total_freeze_duration) AS video_total_freeze_duration,
AVG(video_lost_pps) AS video_lost_pps,
AVG(video_rtt) AS video_rtt
FROM
TUMBLE(
live_stream_metrics,
report_timestamp,
INTERVAL '1' MINUTE
)
GROUP BY
window_start,
room_id;""")
Then we can query from the materialized view to display the results. The query results are loaded into a Pandas DataFrame for better formatting and compatibility with visualization tools.
with conn.cursor() as cur:
cur.execute("""SELECT * FROM live_video_qos_1min ORDER BY room_id, report_ts;""")
data = cur.fetchall()
video_df = pd.DataFrame(data,
columns=['report_ts', 'room_id', 'video_total_freeze_duration', 'video_lost_pps', 'video_rtt'])
video_df.tail()
Here is an example of what the process and results look like.
Create visualizations
Once we have the data, we can create visualizations by employing additional data visualization libraries. Working in Jupyter Notebook provides us access to all Python libraries, enabling us to easily create complex data visualizations.
Visualizations play a crucial role in understanding trends and finding anomalies in data. They also provide a quick overview of the current status and effectively convey a story to a wider audience.
In this demonstration, we will use the following libraries. If they are not installed, install them using the pip install
command.
import plotly.express as px
from datetime import datetime
from dash import Dash, html, dcc
from dash.dependencies import Input, Output
import time
Additionally, the nbformat
library must be installed to render the visualizations.
pip install nbformat
Now, we will create a visualization based on the materialized view live_video_qos_1min
. Using Pandas, we store the query results in a DataFrame and group by report_ts
to find the average freeze duration across all streams. To better showcase the visualization, we are only visualizing records from today’s date.
Using the Dash
library, we will set this graph to refresh every minute.
app = Dash(__name__)
app.layout = html.Div([
dcc.Graph(id='real-time-plot'),
dcc.Interval(
id='interval-component',
interval=1*1000, # refresh interval in miliseconds
n_intervals=0
)
])
@app.callback(Output('real-time-plot', 'figure'),
[Input('interval-component', 'n_intervals')])
def update_graph(n_intervals):
with conn.cursor() as cur:
cur.execute("""SELECT * FROM live_video_qos_1min ORDER BY room_id, report_ts;""")
data = cur.fetchall()
new_video_df = pd.DataFrame(data,
columns=['report_ts', 'room_id', 'video_total_freeze_duration', 'video_lost_pps', 'video_rtt'])
avg_freeze_duration = new_video_df.groupby('report_ts')['video_total_freeze_duration'].mean().reset_index()
avg_freeze_duration['report_ts'] = pd.to_datetime(avg_freeze_duration['report_ts'])
avg_freeze_duration = avg_freeze_duration[~(avg_freeze_duration['report_ts'] <= time.strftime("%Y"+"-"+"%m"+"-"+"%d"))]
fig = px.line(avg_freeze_duration, x="report_ts", y="video_total_freeze_duration", title='Live Stream Performance: Average Stream Freeze Duration',
labels = {"report_ts": "Report Timestamp", "video_total_freeze_duration": "Average Freeze Duration"})
return fig
if __name__ == '__main__':
app.run_server(debug=True)
The visualization may look like this. The video has been sped up to show the graph updating after a minute. We can see that the average video freeze is between 200 and 400 milliseconds.
This approach facilitates proactive monitoring and decision-making by allowing us to quickly discover when the platform is underperforming.
>
We explored the process of setting up and utilizing RisingWave Cloud and Jupyter Notebook for efficient data analysis and real-time visualization. > >
>
This integration enhances your ability to work with streaming databases. By combining RisingWave with Jupyter Notebook, you can: > >
>
1.
- Incrementally develop and test stream processing projects > 3.
4.
- Use familiar external data science tools and libraries > 6.
7.
- Easily share your projects with others > 9.
10.
- Enable others to recreate your processes and visualizations > 12.
>
>
If you would like to learn more about the features that RisingWave offers, see the official documentation site. You can also sign up for RisingWave Cloud for free to test it out. If you need additional help with setting up this integration, join our active Slack community. > >
###