RisingWave is a streaming database. It can connect to event streaming platforms like Apache Kafka and can be queried with SQL. Unlike traditional relational databases, the primary use case for a streaming database like RisingWave is for analytical purposes. In that sense, RisingWave behaves much like other OLAP (online analytical processing) products like Amazon Redshift or Snowflake.
In modern applications, however, OLTP (online transaction processing), i.e., relational databases might not be the single source of truth. Think of event-based systems based on Apache Kafka or others. Here, RisingWave can process such streams in real-time and offer a Postgres-compatible interface to this data. Luckily, RisingWave can also persist data on its own, which is what we will use in this article.
Django’s ORM is designed for traditional relational databases. Data is expected to be normalized, constraints are enforced, and relations have an integrity guarantee.
So, why on earth would anyone use an Analytical database, that in its nature cannot enforce constraints, such as NOT NULL
, or UNIQUE
in a Django application?
The answer to this is simple: Dashboarding. The concept of RisingWave lies in providing (materialized) views on the data for analytical purposes. Given the power and flexibility of Django’s ORM, we could leverage the performance of analytical databases. In this blog post we are not going to use RisingWave as a primary datastore. Rather, we will use a RisingWave database to build a dashboard for the analytical part of our application.
Other products like Amazon Redshift or Snowflake already have connectors for Django. Since RisingWave doesn’t have one yet, we will try to implement our own and learn about the internals of Django ORM by the way. Luckily, RisingWave is mostly Postgres compatible, so we will start from the original Django Postgres driver. However, PostgreSQL compatibility refers to the DQL (Data Query Language) of SQL – or in simple terms: SELECT
statements. In this example we will only read from RisingWave, not write to it. We will also avoid migrations. This is because RisingWave’s concept of materialized views with different connectors (Apache Kafka being just one of them) cannot be created with the semantics of Django’s Model
classes. Don’t worry though: we have some sample data to play with.
By the end of this blog post, you will have
- set up a simple
docker-compose
setup with a Python webserver, a PostgreSQL instance and a RisingWave instance - built a Django ORM connector for RisingWave
- seen a sample dashboard inside of a Django application built on RisingWave
docker-compose
Setup
For our setup, we need three services: One for the Django application, one for Postgres (to store the transactional data like user accounts in Django), and, of course, one RisingWave service. Here is a simple docker-compose
setup that does the trick for us.
version: '3.8'
services:
app:
image: python
postgres:
image: postgres:latest
restart: unless-stopped
volumes:
- postgres-data:/var/lib/postgresql/data
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
risingwave:
image: risingwavelabs/risingwave:latest
restart: unless-stopped
volumes:
postgres-data:
To make things easy, we can leverage VSCode’s charming devcontainer feature that happens to support docker-compose
setups, too. Within the comfort of our editor, we can access all these services effortlessly. If you check out the GitHub repository and open it in VSCode, it will offer you to re-open the project in a container. If you do so, the whole setup will spin up in almost no time. Note that you need Docker and the VSCode Devcontainer extension installed.
In the setup above, we only persist the Postgres database, not RisingWave. You could add a volume mount for the risingwave
service as well, but I prefer to have a clean database on every start to explore its features.
Once you have started the three containers from the docker-compose.yaml
file, you can access the RisingWave database with the psql
command line tool. As for fetching data from the database, RisingWave is protocol-compatible with Postgres.
We use this connection to ingest the test data into RisingWave.
Here is a simple Python script that generates some random order data. We will use this just like in any other relational database to insert data in RisingWave. Note, however, that this is not the primary use case for RisingWave. It really shines when you connect external data sources like Kafka or S3. For the purposes of our blog post, this data is sufficient, however. If you want to learn more about the connectivity features of RisingWave, have a look at their excellent documentation.
Within our Devcontainer we can use the psql
command to connect to the RisingWave service:
psql -h risingwave -p 4566 -d dev -U root
Create table
CREATE TABLE orders (orderid int, itemid int, qty int, price float, dt date, state varchar);
Now, we run gen_data.py
and insert the generated fake data into RisingWave:
INSERT INTO orders (orderid, itemid, qty, price, dt, state) VALUES (1000001, 100001, 3, 993.99, '2020-06-25', 'FM');
INSERT INTO orders (orderid, itemid, qty, price, dt, state) VALUES (1000001, 100002, 3, 751.99, '2020-06-25', 'FM');
INSERT INTO orders (orderid, itemid, qty, price, dt, state) VALUES (1000001, 100003, 4, 433.99, '2020-06-25', 'FM');
…
Now, that we have the data ingested in our RisingWave instance, we should take care of retrieving this data from within a Django application. Django comes with built-in connectors to different relational databases, including PostgreSQL. The ORM (read: the Model
classes in your Django app) communicate with the connector which in turn communicates with a database drive (e.g., psycopg2
for PostgreSQL). The connector will generate vendor-specific SQL and provides special functionality not provided by other database products.
In the case of an analytical database, such like RisingWave, we would rather need to disable certain functionalities, such like constraint checking or even primary keys. RisingWave purposefully doesn’t provide such features (and neither do other streaming databases). The reason for this is that such analytical databases provide a read (for faster analytics) and write (for faster ingestion of data) optimized storage. The result usually is a denormalized schema in which constraints doesn’t need to be checked as the source of the data (read: the transactional databases or event streaming sources in your application) are considered satisfy integrity needs of the business logic.
We can just start with copying Django’s PostgreSQL connector to a new package called django-risingwave
as a start. Further down the line, we are going to use the new connector just for read operations (i.e., SELECT
s). However, at least in theory we want to implement at least part of some functioning management (for creating models) and write operations code in our module. Due to the very nature of the difference of scope between transactional and analytical DB engines, this might not work as the primary datastore for Django, but we will learn some of the internals of RisingWave while doing so.
As an ad-hoc test, we want at least the initial migrations to run through with the django-risingwave
connector. This is, however, already more than we will need – since our intention is to use the connector for unmanaged, and read-only models.
In base.py
we find a function that gets the column datatype for varchar
columns. RisingWave doesn’t support a constraint on the length, so we just get rid of the parameter:
def _get_varchar_column(data): return “varchar”
Also, we need to get rid of the data_types_suffix
attribute.
features.py
is one of the most important files for Django ORM connectors. Basically, it holds a configuration of the database capabilities. For any generated code that is not vendor specific, Django ORM will consult this file to turn on or off specific features. We have to disable quite a lot of them to make RisingWave work with Django. Here are some highlights, you’ll find the whole file in the GitHub Repo.
First, we need to set the minimum required database version down to 9.5
– that’s RisingWave’s version, not Postgres`.
minimum_database_version = (9,5)
Next, we disable some features that are mostly needed for constraint checking which RisingWave does not support:
enforces_foreign_key_constraints = False
enforces_unique_constraints = False
allows_multiple_constraints_on_same_fields = False
indexes_foreign_keys = False
supports_column_check_constraints = False
supports_expression_indexes = False
supports_ignore_conflicts = False
supports_indexes = False
supports_index_column_ordering = False
supports_partial_indexes = False
supports_tz_offsets = False
uses_savepoints = False
In schema.py
we need to override the _iter_column_sql
method which is not found in the Postgres backend but inherited from BaseDatabaseSchemaEditor
. In particular, we get rid of all the part that is put in place to check NOT NULL
constraints.
In introspection.py
we need to change the SQL generated by the get_table_list
method. We treat all views and tables as just tables for our demo purposes.
In operations.py
, we get rid of the DEFERRABLE
SQL part of our queries.
def deferrable_sql(self): return ""
Django supports the use of multiple database connections in one project out of the box. This way, we can have the transactional part of our database needs in Postgres, and the analytical part in RisingWave. Exactly what we want here!
DATABASES = {
"default": {
"NAME": "postgres",
"ENGINE": "django.db.backends.postgresql",
"USER": "postgres",
"PASSWORD": "postgres",
"HOST": "postgres"
},
"risingwave": {
"ENGINE": "django_risingwave",
"NAME": "dev",
"USER": "root",
"HOST": "risingwave",
"PORT": "4566",
},
}
Now, let’s create a model to represent the data in our RisingWave service. We want this model to be unmanaged, so that it doesn’t get picked up by Django’s migration framework. Also, we will use it for analytical purposes exclusively, so that we diable its ability to save data.
class Order(models.Model):
class Meta:
managed = False
orderid = models.IntegerField()
itemid = models.IntegerField()
qty = models.IntegerField()
price = models.FloatField()
date = models.DateField()
state = models.CharField(max_length=2)
def save(self, *args, **kwargs):
raise NotImplementedError
As an example for our dashboard, we create four Top-5-rankings:
- The Top5 states by total turnover
- The Top5 products with the highest average quantity per order
- The Top5 overall best-selling items by quantity
- The top5 overall best-selling items by turnover
Let’s take a moment to think about how the corresponding SQL queries would look like. These queries will be very simple as they contain a simple aggregation, such like SUM
or AVG
, a GROUP BY
clause and an ORDER BY
clause.
Here are the queries we come up with:
select sum(qty*price) as turnover, state from orders group by state order by turnover desc;
select avg(qty), itemid from orders group by itemid order by avg(itemid) desc;
select sum(qty), itemid from orders group by itemid order by sum(itemid) desc;
select sum(qty*price) as turnover, itemid from orders group by itemid order by turnover desc;
How would these queries translate to Django’s ORM?
Django does not have a group_by
function on its model but it will automatically add a GROUP BY
clause for the values in the values()
function. So, the above queries can be written with Django ORM as follows:
Order.objects.values('state').annotate(turnover=Sum(F('price')*F('qty'))).order_by('-turnover')[:5]
Order.objects.values('itemid').annotate(avg_qty=Avg('qty')).order_by('-avg_qty')[:5]
Order.objects.values('itemid').annotate(total_qty=Sum('qty')).order_by('-total_qty')[:5]
Order.objects.values('itemid').annotate(turnover=Sum(F('price')*F('qty'))).order_by('-turnover')[:5]
On top of that, we need to instruct Django to read these queries not from the default
connection but from the risingwave
service. We can do so by adding a call to the using
method like so:
Order.objects.using('risingwave').values('state').annotate(turnover=Sum(F('price')*F('qty'))).order_by('-turnover')[:5]
The data in our Order
model isn’t present in the Postgres database at all, so it would be nice if any query to this model would be routed through the RisingWave backend.
Django offers a solution to that problem. We can create a database routing class that needs to implement db_for_read
, db_for_write
, allow_relation
and allow_migrate
. We only need the db_for_read
method here. By not implementing, i.e., pass
ing, the other methods, we can do so. Since we could add multiple routers to the settings.py
file, using the database connection in the return
statement isn’t enforced but treated as a hint by Django’s ORM.
from .models import Order
class OlapOltpRouter():
def db_for_read(self, model, **hints):
if model == Order:
return 'risingwave'
def db_for_write(self, model, **hints):
pass
def allow_relation(self, obj1, obj2, **hints):
pass
def allow_migrate(self, db, app_label, model_name=None, **hints):
pass
To enable this router (which lives in dashboard/dbrouter.py
), we need to add it to the DATABASE_ROUTERS
setting in our settings.py
:
DATABASE_ROUTERS = ['dashboard.dbrouter.OlapOltpRouter']
- RisingWave is a powerful analytical database engine that can ingest streaming data sources and offers a performant way to query data > > > - Since RisingWave offers PostgreSQL compatibility over the wire, we can quickly hack a connector for Django’s ORM based upon the original Postgres connector > > > - Django’s ORM allows building analytical queries and can be used to create dashboards integrated in a Django application easily > > > - The connector is released in public and still needs a lot of polishing. Check out the GitHub repo. > > Originally published at Building A RisingWave Connector for Django ORM