MongoDB and the challenge of streaming joins
As an operational database, MongoDB excels in providing fast data manipulation and query performance. However, it does have limitations when it comes to built-in support for maintaining real-time views or performing stream processing tasks. For example, MongoDB lacks native capabilities for joining two collections and refreshing the results in real-time, especially at a high frequency like every second.
In MongoDB, joins are performed using the $lookup
aggregation operator instead of traditional SQL-style joins:
db.users.aggregate([{
$lookup: {
from: "products",
localField: "product_id",
foreignField: "_id",
as: "products"
}
}])
The current method is effective but becomes cumbersome and inconvenient when joining more than two collections.
MongoDB promotes denormalization, which allows you to avoid joins in many cases if you have an optimal schema. However, real-world scenarios can be messy, making joins sometimes necessary. Instead of relying on complex MongoDB aggregations, it is recommended to delegate this task to a dedicated stream processing system such as RisingWave.
The RisingWave solution
A practical solution involves propagating MongoDB change streams into RisingWave through Kafka, enabling flexible real-time joins.
RisingWave serves as a central hub for real-time data sources. Data from MongoDB and Kafka can be loaded into RisingWave and joined together. RisingWave’s state backend, Hummock, leverages cloud object storage, providing elasticity and ample capacity. This enables the execution of large joins on multiple sources, making it feasible to handle 10+ multi-way joins.
In this data stack, Debezium plays a vital role. It extracts MongoDB oplog entries and publishes them to a Kafka topic, which is then consumed by RisingWave. For example:
CREATE TABLE source_name (
_id jsonb PRIMARY KEY,
payload jsonb
)
WITH (
connector='kafka',
topic='debezium_mongo_json_customers',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM_MONGO ENCODE JSON;
With MongoDB sources registered, we can create materialized views that join data from MongoDB and other sources in real-time.
For detailed steps on this process, refer to the RisingWave documentation.
Handling MongoDB’s JSON data
RisingWave is able to analyze MongoDB's JSON (BSON, in fact) data through its JSONB support inherited from PostgreSQL. This makes it straightforward to ingest and query even in non-relational schemas. SQL is more productive for analytics compared to MongoDB's JavaScript API or other non-standard programming languages. While MongoDB's native API excels at data manipulation, especially when integrated into Node.js web applications, analytical workloads require different expertise. SQL remains the most popular language for data analysis. As of this writing, RisingWave provides over 30 JSON functions and operators, including the support for JSONPath. This eliminates the needs for users to write custom UDFs when transforming MongoDB data.
RisingWave has the capability to analyze JSON (BSON) data from MongoDB using its JSONB support inherited from PostgreSQL. This feature enables straightforward ingestion and querying, even in non-relational schemas. While MongoDB's native API excels in data manipulation, particularly in Node.js web applications, analytical workloads require different expertise. SQL remains the most popular language for data analysis. Currently, RisingWave offers over 30 JSON functions, including support for JSONPath. This eliminates the need for users to write custom UDFs when transforming MongoDB data.
Here are some examples of handling JSONB in RisingWave:
Finding users whose age is between 25 and 30:
SELECT *
FROM users
WHERE (payload->>'age')::int BETWEEN 25 AND 30;
Finding users by their name and email:
SELECT *
FROM users
WHERE payload @> '{"name": "Bob"}'
AND payload->>'email' LIKE 'bob@example.com';
RisingWave adeptly parses MongoDB change streams extracted by Debezium. Its cloud-native storage enables the joining of multiple MongoDB collections, creating a unified stream that can be utilized by other services. With its JSONB support, RisingWave effortlessly handles MongoDB documents, providing a robust solution for real-time data processing challenges.