Real-time data distribution and streaming applications have been extremely popular and in demand for the last few years. Firms across industries have realized the value of processing data in real-time as opposed to what they had been doing for decades - batch processing.
As more and more data is generated, firms have started investing in appropriate tools to help them drive insight from the data in real-time. Airlines process real-time data and notify their passengers in real-time to provide them with pleasant onboarding experience, and retail stores ensure their inventory data is updated in real-time to ensure their inventory is always up-to-date when customers are shopping in their stores. Similarly, capital markets firms leverage streaming technologies to take advantage of real-time data available to them to trade as efficiently as possible.
However, capital markets is different from other industries in many ways, especially when it comes to their performance requirements, abundance of available real-time data, reliability and redundancy requirements and security and governance oversight from regulators. Any tool that would be needed to distribute and process real-time data in capital markets would need to be highly performant, secure, reliable, and redundant.
Some popular use cases for real-time data in capital markets include:
distributing large volumes of market data from vendors such as Refinitiv or Bloomberg,
analyzing market data in real-time and calculating important metrics such as HLOC (high, low, open, close), VWAP (volume weighted average price), TWAP (time weighted average price) etc.
distributing trade orders about to be executed at exchanges
distributing and capturing trade order updates once the trades have been executed
performing transaction cost analysis (TCA) on trades to evaluate their execution
distributing live data (market data, trade orders) to Trader UIs
With so much data required to be distributed globally across trading hubs such as New York, London, and Singapore, companies rely on robust tools such as Solace PubSub+ Platform to distribute data efficiently across datacenters and cloud providers. This data is then analyzed by powerful real-time analytics tools such as RisingWave to derive meaningful insights in real-time.
Overview of the Solace
Solace PubSub+ Platform is a complete event-driven management and streaming platform for the real-time enterprise. The platform helps enterprises design, deploy, integrate, and manage event-driven architectures (EDAs) across hybrid cloud, multi-cloud, and IoT environments, so enterprises can be more integrated and event-driven.
While Solace PubSub+ Platform is deployed across numerous industries ranging from retail to aviation, it is heavily used by Capital Markets firms for several years to power their mission critical flows relating to market data and trade order distribution.
Top tier investment banks and hedge funds leverage Solace’s PubSub+ brokers to distribute data across several applications such as market data, tick data, OMS (order management system), EMS (execution management system), PNL, and Trader UIs. Solace’s native support for open protocols such as AMQP, MQTT, REST, and WebSockets allow a variety of applications in an organization to easily publish and consume data from Solace without having to worry about protocol translation. Backend market data feed handlers can publish market data to Solace using Solace’s proprietary protocol, SMF, and the same data can be consumed by a Trader UI using WebSockets and by a PNL applications using AMQP.
Overview of RisingWave
RisingWave is a SQL data platform that is fully compatible with PostgreSQL, designed to meet the needs of modern data-driven applications. It is tailored for building event-driven applications, real-time ETL pipelines, continuous analytics services, and feature stores for AI applications. It excels at extracting fresh and consistent insights from diverse data sources, including real-time event streams, database change data capture (CDC), and time series data. With its high-performance architecture, RisingWave enables insights to be derived within sub-seconds, making it ideal for scenarios where speed is critical. By unifying streaming and batch processing, RisingWave empowers users to seamlessly ingest, join, and analyze both live and historical data. Additionally, it is built to operate effortlessly at cloud scale, ensuring reliability and scalability for even the most demanding workloads.
RisingWave in capital markets
RisingWave transforms capital markets, enabling real-time portfolio monitoring and risk management with millisecond-level precision. It ensures that financial institutions can dynamically assess portfolio exposures, respond to regulatory limits, and mitigate systemic risks in fast-paced trading environments. By seamlessly integrating live trade flows, market prices, and historical data, RisingWave empowers portfolio managers to stay ahead of market shifts. Its materialized views automatically refresh exposure metrics and risk thresholds, ensuring timely alerts and enabling automated adjustments, which are critical for compliance and operational efficiency.
Additionally, RisingWave excels in handling the massive data volumes inherent in modern trading systems with unmatched scalability and reliability. Its native integration with data streaming platforms like Kafka and Solace, and its support for sliding window analytics streamline complex workflows, such as recalculating portfolio valuations and tracking rolling trade metrics. By reducing latency and simplifying real-time data processing with SQL-based queries, RisingWave eliminates the need for error-prone batch pipelines, helping financial professionals focus on delivering actionable insights in milliseconds where it matters most.
The technical stack
In this blog post, we would like to demonstrate how both Solace PubSub+ Platform and RisingWave can be used to capture real-time market data and analyze it.
Set up Solace
To get started with Solace PubSub+ event broker, you can either choose the free Software version using Docker or Solace PubSub+ Cloud. We recommend using PubSub+ Cloud’s free trial to quickly spin up a broker service.
Data generation
We will use a sample market data simulator, written by Himanshu Gupta from Solace, which would simulate a market data feed handler responsible for publishing both L1 quotes and trades data combined to Solace PubSub+ broker. In practice, there would be separate feeds for L1 quotes and L1 trades but for simulation purposes, the two feeds have been combined. These feeds can either by direct feeds from exchanges such as NYSE or NASDAQ, or consolidated feeds from market data vendors such as Refinitiv, Bloomberg, ICE, and FactSet.
Code and instructions to run the simulator can be found on GitHub.
Publishing data to Solace topic
The simulator will publish sample records in JSON format to a well-defined, hierarchal, and dynamic Solace topic: <assetClass>/MARKETDATA/v1/<country>/<exchange>/<ticker>
For example, the following sample message for AAPL would be published to the topic:
EQ/MARKETDATA/v1/US/NASDAQ/AAPL
{
"symbol":"AAPL",
"askPrice":250.3121,
"bidSize":630,
"tradeSize":180,
"exchange":"NASDAQ",
"currency":"USD",
"tradePrice":249.9996,
"askSize":140,
"bidPrice":249.6871,
"date":"2020-03-23",
"time":"09:32:10.610764-04:00"
}
A different message can be published to:
EQ/MARKETDATA/v1/UK/LSE/VOD
This specific topic hierarchy is used to take full advantage of Solace PubSub+'s rich hierarchical topics which provide strong wildcard support and advance filtering logic. In comparison to flat topics leveraged by log-based tools such as kafka, Solace’s hierarchal topics allow consumers to request filtered streams from the broker using topic wildcards. This puts the responsibility to filter at the broker level once instead of requiring each consumer to implement their own filtering logic.
Here are some examples on how consuming applications can use wildcards. They can subscribe to:
EQ/>
- to subscribe to all equitiesEQ/*/*/NYSE/>
- to subscribe to all NYSE securitiesEQ/MARKETDATA/v1/US/>
- to subscribe to all US equities
You can learn more about Solace's wildcards here.
This is extremely useful in capital markets where each market data record needs to fan-out to multiple downstream consumers such as PNL, Tick Data, TCA, OMS/EMS, Risk, and Trader UIs. It would be extremely wasteful for the broker to push raw data feed to each of these applications and have the applications implement filtering logic themselves. Instead these applications just request the subset of the market data that they are interested in and let the broker do the heavy-lifting. This capability to filtering messages at the source broker level becomes even more important with hybrid/multi-cloud deployments where ingress/egress costs and compute costs can easily get out of hand. Instead of wasting bandwidth on sending raw data feed from on-prem to cloud and having application team waste compute resources to filter data, it’s more cost effective and secure to simply send filtered stream to cloud application.
What’s more - consumers can subscribe to one or more topics and have ordering guaranteed across topics. With other brokers, ordering is only maintain within a single topic partition which is simply not acceptable in capital markets use cases.
Data ingestion process
We’ll use RisingWave to ingest and analyze the events from PubSub+ broker. In this case, we use RisingWave Cloud, which provides a user-friendly experience and simplifies the operational aspects of managing and utilizing RisingWave for our real-time capital markets data analysis.
To set up a RisingWave cluster in RisingWave Cloud for this use case, refer to this documentation guide.
Create a source table
Once you have deployed the RisingWave cluster, create a source table in the Workspace using the following SQL query:
This query creates a market_data
table in RisingWave, configured to ingest real-time market data from a PubSub+ broker. It connects to the Solace topic EQ/MARKETDATA/v1/#
hosted at the specified broker URL using the provided demo
credentials. Messages are consumed with a QoS level of at_least_once
and are expected in JSON format.
CREATE TABLE market_data (
date DATE,
symbol VARCHAR,
askPrice NUMERIC,
bidSize INT,
tradeSize INT,
exchange VARCHAR,
currency VARCHAR,
time TIMESTAMP
tradePrice NUMERIC,
askSize INT,
bidPrice NUMERIC
)
WITH (
connector = 'mqtt',
topic = 'EQ/MARKETDATA/v1/#',
url = 'tcp://mr-connection-xrkpmgiz510.messaging.solace.cloud:1883',
username='demo',
password='demo',
qos = 'at_least_once'
) FORMAT PLAIN ENCODE JSON;
Materialized Views: Overview and Benefits
Materialized views (MV) store pre-computed results of complex queries, enabling faster data retrieval and reducing computational overhead compared to regular views. Unlike regular views, which dynamically fetch and process data each time they're queried, materialized views maintain a static "snapshot" of the results, updated incrementally. This trade-off between up-to-date data and performance makes materialized views ideal for infrequent updates, resource-intensive queries, or frequently accessed datasets. Their benefits include improved query performance, cost efficiency, streamlined data management, and faster analysis, particularly in large-scale or time-sensitive event-driven applications.
Adoption and RisingWave's Innovations
Materialized views are supported across many databases, with implementation varying by system. PostgreSQL, Redshift, and Oracle offer manual or automated refresh options, while RisingWave specializes in real-time, incrementally updated materialized views tailored for streaming workloads. RisingWave ensures consistency, high availability, and concurrency, integrating advanced stream-processing semantics like time windows and watermarks. Its architecture supports resource isolation, making it a robust choice for demanding applications. These capabilities simplify stream processing and enhance the efficiency of modern data pipelines, showcasing materialized views as a cornerstone of optimized streaming database performance.
Creating materialized views for real-time data analysis
This query creates a materialized view enriched_market_data
in RisingWave, transforming and enriching the market_data
table by combining the date
and time
fields into a single event_timestamp
. It selects and renames key fields such as prices, sizes, exchange, and currency for simplified and structured access to real-time market data.
CREATE MATERIALIZED VIEW enriched_market_data AS
SELECT
("date" || ' ' || TO_CHAR("time", 'HH24:MI:SS.MS' ))::timestamptz AS event_timestamp,
symbol AS symbol,
askPrice AS ask_price,
bidPrice AS bid_price,
askSize AS ask_size,
bidSize AS bid_size,
tradePrice AS trade_price,
tradeSize AS trade_size,
exchange AS exchange,
currency AS currency
FROM
market_data;
The materialized view can be queried to retrieve the latest data:
SELECT * FROM enriched_market_data LIMIT 5;
The result of the above query is similar to the following:
event_timestamp | symbol | ask_price | bid_price | ask_size | bid_size | trade_price | trade_size | exchange | currency
---------------------+--------+------------+------------+----------+----------+-------------+------------+----------+---------
2024-11-22T08:42:56Z| FB | 94.50678 | 92.17329 | 780 | 40 | 93.340034 | 470 | NASDAQ | USD
2024-11-22T08:42:56Z| INTC | 10.377003 | 10.351093 | 630 | 50 | 10.364048 | 10 | NASDAQ | USD
2024-11-22T08:42:56Z| UOB | 97.710526 | 96.25578 | 410 | 520 | 96.983154 | 40 | SGX | SGD
2024-11-22T08:42:56Z| BARC | 1746.387 | 1703.2664 | 770 | 50 | 1724.8267 | 70 | LSE | GBP
2024-11-22T08:42:56Z| XOM | 36.714695 | 35.987675 | 770 | 310 | 36.351185 | 390 | NYSE | USD
(5 rows)
1. Minutely Market Statistics Aggregation
This MV named minutely_market_stats_mv
calculates key trading metrics for each symbol within 1-minute time windows. It computes the price range (max - min), average price (avg), price volatility (std deviation), volume-weighted average price (VWAP), high, and low prices. The results are grouped by symbol and time window.
CREATE MATERIALIZED VIEW minutely_market_stats_mv AS
SELECT
symbol,
MAX(trade_price) - MIN(trade_price) AS price_range,
AVG(trade_price) AS avg_price,
STDDEV_POP(trade_price) AS std_price,
SUM(trade_price * trade_size) / SUM(trade_size) AS vwap,
MAX(trade_price) AS high_price,
MIN(trade_price) AS low_price,
window_start,
window_end
FROM TUMBLE (market_data_enhanced, event_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end,symbol;
The materialized view can be queried to retrieve the latest data:
SELECT * FROM minutely_market_stats_mv LIMIT 5;
The result of the above query is similar to the following:
symbol | price_range | avg_price | std_price | vwap | high_price | low_price | window_start | window_end
-------+-------------+-----------+-----------+----------+------------+-----------+---------------------+---------------------
AAPL | 82.7294 | 321.8184 | 16.2368 | 322.1932 | 351.6677 | 268.9384 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
AAPL | 191.7060 | 209.1672 | 40.1470 | 209.2698 | 342.3187 | 150.6127 | 2024-11-22T08:43:00Z | 2024-11-22T08:44:00Z
AAPL | 133.0454 | 196.9737 | 29.4262 | 196.5627 | 279.4089 | 146.3635 | 2024-11-22T08:44:00Z | 2024-11-22T08:45:00Z
AAPL | 298.3134 | 271.2218 | 82.0130 | 270.2820 | 438.7182 | 140.4048 | 2024-11-22T08:45:00Z | 2024-11-22T08:46:00Z
AAPL | 549.7584 | 429.0530 | 118.2820 | 428.3447 | 805.4681 | 255.7098 | 2024-11-22T08:46:00Z | 2024-11-22T08:47:00Z
(5 rows)
2. Spot abnormally large trades
This MV detects real-time trade size outliers by identifying trades where the tradeSize
exceeds the average by more than two standard deviations within the last 5 minutes. It uses a 5-minute rolling window to calculate the moving average and standard deviation of trade_size
for each symbol and identifies trades exceeding this threshold within 1-minute tumbling windows.
CREATE MATERIALIZED VIEW outlier_detection_mv AS
WITH outlier_detection_data AS (
SELECT
symbol,
trade_size,
AVG(trade_size) OVER (PARTITION BY symbol ORDER BY event_timestamp RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW) AS avg_trade_size,
STDDEV_POP(trade_size) OVER (PARTITION BY symbol ORDER BY event_timestamp RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW) AS stddev_trade_size,
window_start,
window_end
FROM TUMBLE(enriched_market_data, event_timestamp, INTERVAL '1' MINUTE)
)
SELECT *
FROM outlier_detection_data
WHERE trade_size > avg_trade_size + 2 * stddev_trade_size;
The materialized view can be queried to retrieve the latest data:
SELECT * FROM outlier_detection_mv LIMIT 5;
The result of the above query is similar to the following:
symbol | trade_size | avg_trade_size | stddev_trade_size | window_start | window_end
-------+------------+----------------+-------------------+---------------------+---------------------
UOB | 500 | 174.5833 | 154.9457 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
DBS | 300 | 120.4250 | 98.7654 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
BARC | 700 | 252.6732 | 189.2345 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
BAC | 450 | 200.3456 | 134.5678 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
AAPL | 600 | 312.7890 | 212.3456 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
(5 rows)
3. Top performing stocks based on VWAP over a period
This MV ranks stocks by their volume-weighted average price (VWAP) within 1-minute tumbling windows. VWAP is calculated as the weighted average of trade_price
by trade_size
. Results are grouped by symbol and time window.
CREATE MATERIALIZED VIEW vwap_minutely_mv AS
SELECT
symbol,
SUM(trade_price * trade_size) / SUM(trade_size) AS vwap,
window_start,
window_end
FROM TUMBLE (enriched_market_data, event_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end, symbol;
The materialized view can be queried to retrieve the latest data:
SELECT * FROM vwap_minutely_mv LIMIT 5;
The result of the above query is similar to the following:
symbol | vwap | window_start | window_end
-------+-----------+---------------------+---------------------
AAPL | 322.1932 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
BAC | 103.1632 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
BARC | 1718.1215 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
DBS | 17.4540 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
FB | 90.0187 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
(5 rows)
4. Exchange-level statistics
This creates an MV that calculates the high, low, and average trade prices, along with total trade volume, for each exchange within 1-minute tumbling windows. Results are grouped by exchange and time window.
CREATE MATERIALIZED VIEW exchange_stats_minutely_mv AS
SELECT
exchange,
MAX(trade_price) AS high_price,
MIN(trade_price) AS low_price,
AVG(trade_price) AS avg_price,
SUM(trade_size) AS total_volume,
window_start,
window_end
FROM TUMBLE (enriched_market_data, event_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end, exchange;
The materialized view can be queried to retrieve the latest data:
SELECT * FROM exchange_stats_minutely_mv LIMIT 5;
The result of the above query is similar to the following:
exchange | high_price | low_price | avg_price | total_volume | window_start | window_end
---------+------------+-----------+------------+--------------+---------------------+---------------------
LSE | 1855.0760 | 39.7162 | 1288.9153 | 107180 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
NASDAQ | 351.6677 | 9.4449 | 140.6360 | 239870 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
NYSE | 156.7350 | 30.1253 | 108.4764 | 187570 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
SGX | 103.5962 | 1.6752 | 37.7726 | 241230 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
LSE | 1841.8691 | 36.1376 | 1184.8491 | 240230 | 2024-11-22T08:43:00Z | 2024-11-22T08:44:00Z
(5 rows)
Create sinks to send data from RisingWave to Solace
In RisingWave, a sink is an external target where processed data can be sent. You can use the CREATE SINK
command to define a sink, which can be created from either a table or a materialized view, enabling efficient data integration with external systems. To learn more about sinks in RisingWave, refer the documentation.
RisingWave also supports subscriptions to pull data change records from a specific table or materialized view (MV). A subscription captures both the existing data at creation and incremental changes afterward. You can use a subscription cursor to retrieve the full dataset or changes from a specific point.
The creates a sink named market_stats_mqtt_sink
, which streams data from the materialized view minutely_market_stats_mv
to an MQTT topic (market_stats/<symbol>
). It connects to the Solace broker using the specified URL, username, and password, ensuring the data is published with at-least-once delivery (qos = 'at_least_once'
) and retained in the topic (retain = 'true'
). The sink outputs JSON-encoded data in an append-only format.
CREATE SINK market_stats_mqtt_sink
FROM minutely_market_stats_mv
WITH (
connector = 'MQTT',
topic = 'market_stats/symbol',
url = 'tcp://mr-connection-xrkpmgiz510.messaging.solace.cloud:1883',
username ='demo',
password ='demo',
type = 'append-only',
retain = 'true',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);
This creates a sink named outlier_detection_mqtt_sink
, which streams data from the materialized view outlier_detection_mv
to an MQTT topic (outliers_detection/<symbol>
). It connects to the Solace broker using the specified URL, username, and password, ensuring at-least-once delivery (qos = 'at_least_once'
) and retaining messages in the topic (retain = 'true'
). The sink outputs JSON-encoded data in an append-only format.
CREATE SINK outlier_detection_mqtt_sink
FROM outlier_detection_mv
WITH (
connector = 'MQTT',
topic = 'outliers_detection/symbol',
url = 'tcp://mr-connection-xrkpmgiz510.messaging.solace.cloud:1883',
username ='demo',
password ='demo',
type = 'append-only',
retain = 'true',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);
This creates a sink named vwap_minutely_mqtt_sink
, which streams data from the materialized view vwap_minutely_mv
to an MQTT topic (financial_vwap/<symbol>
). It connects to the Solace broker using the specified URL, username, and password, ensuring at-least-once delivery (qos = 'at_least_once'
) and retaining messages in the topic (retain = 'true'
). The sink outputs JSON-encoded data in an append-only format.
CREATE SINK vwap_minutely_mqtt_sink
FROM vwap_minutely_mv
WITH (
connector = 'MQTT',
topic = 'financial_vwap/symbol',
url = 'tcp://mr-connection-xrkpmgiz510.messaging.solace.cloud:1883',
username ='demo',
password ='demo',
type = 'append-only',
retain = 'true',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);
This SQL statement creates a sink named exchange_stats_minutely_mqtt_sink
, which streams data from the materialized view exchange_stats_minutely_mv
to an MQTT topic (exchange_stats/<exchange>
). It connects to the Solace broker using the specified URL, username, and password, ensuring at-least-once delivery (qos = 'at_least_once'
) and retaining messages in the topic (retain = 'true'
). The sink outputs JSON-encoded data in an append-only format.
CREATE SINK exchange_stats_minutely_mqtt_sink
FROM exchange_stats_minutely_mv
WITH (
connector = 'MQTT',
topic = 'exchange_stats/exchange',
url = 'tcp://mr-connection-xrkpmgiz510.messaging.solace.cloud:1883',
username ='demo',
password ='demo',
type = 'append-only',
retain = 'true',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);
Efficient Message Routing with Solace Using Dynamic and Hierarchical Topics
Solace supports dynamic and hierarchical topics, enabling efficient and flexible message routing in real-time messaging scenarios. By leveraging these features, messages can be precisely categorized and routed to the appropriate systems or teams, streamlining processes and improving operational efficiency.
For example:
The sink
market_stats_mqtt_sink
publishes messages to the topicmarket_stats/<symbol>
, where each symbol can represent a specific financial instrument, allowing downstream systems to subscribe selectively based on the symbol of interest.Similarly, the
exchange_stats_minutely_mqtt_sink
targets the topicexchange_stats/exchange
, where messages can be filtered based on individual exchanges.
By organizing messages into hierarchical topics such as market_stats/<symbol>
or exchange_stats/<exchange>
, Solace ensures that subscribers can efficiently access only the data they need, reducing unnecessary processing and improving the scalability of message-driven systems. This approach highlights Solace's ability to handle complex routing requirements dynamically in high-performance, real-time environments.
Conclusion
In this post, we covered some popular use cases for real-time data distribution in capital markets and how firms can leverage Solace PubSub+ brokers for efficiently distributing data and RisingWave to analyze that data in real-time.
Solace’s support for multiple open protocol and APIs, hierarchal topics that enable dynamic routing and filtering, robust and resilient architecture and most importantly, existing large deployments by investment banks and hedge funds prove that Solace is the ideal choice for real-time data distribution in capital markets. Similarly, RisingWave’s support for variety of sources and sinks, ability to handle millisecond precisions, and compute complex queries in real-time make it a powerful platform for analytics in capital markets.
This post proves how powerful the two platforms can be when used together!