Cryptocurrency markets are known for their volatility, and unfortunately, this can be exploited through "pump and dump" schemes. These manipulations, where a token's price is artificially inflated ("pumped") before being sold off ("dumped") en masse, can happen in minutes, leaving unsuspecting traders with significant losses.
Detecting these events in real-time is a classic data-intensive challenge. You need to process a massive stream of trade data, compute complex analytics on the fly, and trigger alerts instantly.
In this post, we'll walk through how to build a powerful, real-time pump-and-dump detection system using only SQL in RisingWave. We will build a system that can:
Ingest a live stream of crypto trades.
Aggregate data into standardized minute-by-minute bars.
Compute key anomaly signals: rapid price changes, unusual volume spikes, and one-sided buy/sell pressure.
Combine these signals into a clear rule to trigger alerts.
Deliver these alerts to downstream systems like Kafka or a webhook, all with a latency of seconds.
Let's get started.
Step 1: Laying the Foundation - Ingesting and Structuring the Data
Before we can perform any analysis, we need to get data into our system and give it a structure. Our raw data is a stream of individual trades for various market pairs.
Creating the Trade Source
First, we define a connection to our data source—in this case, a Kafka topic named trades. The CREATE SOURCE statement declares the schema of the incoming trade data.
A crucial element here is the WATERMARK. Trade data, like any real-world data stream, can arrive slightly out of order. The WATERMARK FOR ts AS ts - INTERVAL '5 seconds' clause tells RisingWave to expect events to be at most 5 seconds late. This is fundamental for ensuring correct and timely calculations in a streaming environment.
CREATE SOURCE trades_src (
pair_id BIGINT,
symbol VARCHAR,
ts TIMESTAMP, -- event time
side VARCHAR, -- 'BUY'/'SELL'
price DOUBLE PRECISION,
qty DOUBLE PRECISION,
-- allow mild disorder; tune to your P95 latency
WATERMARK FOR ts AS ts - INTERVAL '5 seconds'
) WITH (
connector = 'kafka',
topic = 'trades',
properties.bootstrap.server = 'localhost:9092',
scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;
Aggregating into Minute Bars
Analyzing every single trade is inefficient. The standard approach in financial analysis is to aggregate trades into time-based "bars" or "candles" (e.g., Open, High, Low, Close, Volume). We'll create a 1-minute bar using a materialized view.
A materialized view in RisingWave is a query whose results are stored and kept up-to-date automatically and incrementally as new data arrives. This is the foundation of our analysis.
CREATE MATERIALIZED VIEW bar_1m AS
SELECT
t.pair_id,
window_start AS bucket_start,
FIRST_VALUE(price ORDER BY ts) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST_VALUE(price ORDER BY ts) AS close,
SUM(qty) AS vol,
SUM(CASE WHEN UPPER(side)='BUY' THEN qty ELSE 0 END) AS buy_vol,
SUM(CASE WHEN UPPER(side)='SELL' THEN qty ELSE 0 END) AS sell_vol,
COUNT(*) AS trades
FROM TUMBLE(trades_src, ts, INTERVAL '1 minute') t
GROUP BY t.pair_id, window_start;
Here, the TUMBLE function groups trades into fixed, non-overlapping 1-minute windows. We then calculate the open, high, low, and close prices, as well as the total, buy, and sell volumes for that minute.
Tip: To save computational resources, you can pre-filter for active markets. This view ensures you only perform calculations for pairs that have traded in the last 24 hours.
CREATE MATERIALIZED VIEW active_pairs_24h AS
SELECT pair_id
FROM (
SELECT pair_id, MAX(ts) AS last_ts
FROM trades_src
GROUP BY pair_id
) t
WHERE last_ts >= NOW() - INTERVAL '24 hours';
You would then simply JOIN the bar_1m view with active_pairs_24h.
Step 2: Developing the Signals - The Core Detection Features
With our 1-minute bars ready, we can now build the "senses" of our detector. A classic pump-and-dump has three key signatures: a rapid price increase, a surge in volume, and heavy buy-side pressure.
Signal #1: Rapid Price Changes (Returns)
We need to measure how much the price has changed recently. We can calculate the 1-minute and 5-minute returns using the LAG window function, which lets us look at the close price from previous rows.
CREATE MATERIALIZED VIEW bar_1m_with_returns AS
SELECT
pair_id,
bucket_start,
open, high, low, close, vol, buy_vol, sell_vol, trades,
(close / NULLIF(LAG(close, 1) OVER (PARTITION BY pair_id ORDER BY bucket_start), 0) - 1) AS ret_1m,
(close / NULLIF(LAG(close, 5) OVER (PARTITION BY pair_id ORDER BY bucket_start), 0) - 1) AS ret_5m
FROM bar_1m;
Signal #2: Unusual Volume (Volume Spikes)
A price move is only significant if it's backed by volume. But what counts as "high volume"? This is relative to the market's recent activity. We can quantify this "unusualness" using a Z-score, which measures how many standard deviations the current volume is from the recent average.
First, we calculate a 30-minute rolling baseline of the average volume and standard deviation.
CREATE MATERIALIZED VIEW vol_baseline_30m AS
SELECT
pair_id,
bucket_start,
AVG(vol) OVER (PARTITION BY pair_id ORDER BY bucket_start ROWS BETWEEN 30 PRECEDING AND 1 PRECEDING) AS vol_mean_30m,
STDDEV_POP(vol) OVER (PARTITION BY pair_id ORDER BY bucket_start ROWS BETWEEN 30 PRECEDING AND 1 PRECEDING) AS vol_std_30m
FROM bar_1m;
Signal #3: One-Sided Pressure (Buy/Sell Ratio)
A pump is driven by aggressive buying, while a dump is driven by selling. The buy_ratio (buy volume / total volume) gives us a clear indicator of this pressure. A ratio near 1.0 suggests a buying frenzy, while a ratio near 0 suggests a sell-off.
Combining the Features
Finally, we join these signals into a single, comprehensive feature view. This view computes the final Z-score for volume and the buy ratio, giving us all the necessary ingredients for our rule engine.
CREATE MATERIALIZED VIEW flow_features AS
SELECT
b.pair_id,
b.bucket_start,
b.ret_1m, b.ret_5m,
b.vol, b.buy_vol, b.sell_vol,
CASE WHEN (b.buy_vol + b.sell_vol) > 0
THEN b.buy_vol / (b.buy_vol + b.sell_vol) ELSE NULL END AS buy_ratio,
v.vol_mean_30m, v.vol_std_30m,
CASE
WHEN v.vol_std_30m IS NULL OR v.vol_std_30m = 0 THEN NULL
ELSE (b.vol - v.vol_mean_30m) / v.vol_std_30m
END AS z_vol
FROM bar_1m_with_returns b
LEFT JOIN vol_baseline_30m v
ON v.pair_id = b.pair_id AND v.bucket_start = b.bucket_start;
Step 3: Making the Call - Scoring and Triggering Alerts
Now that we have our features, we can define a simple and explainable rule to flag suspicious activity.
The Pump/Dump Rule
Our rule is straightforward and transparent:
A Pump is likely if: the 1-minute return is ≥ 2% AND the volume Z-score is ≥ 3 AND the buy ratio is ≥ 0.65.
A Dump is likely if: the 1-minute return is ≤ -2% AND the volume Z-score is ≥ 3 AND the buy ratio is ≤ 0.35.
These thresholds (2%, 3, 0.65) are tunable parameters that you can adjust based on market conditions. We can implement this logic with a CASE WHEN statement.
CREATE MATERIALIZED VIEW pump_dump_signals AS
SELECT
pair_id,
bucket_start,
ret_1m, ret_5m, vol, z_vol, buy_ratio,
CASE WHEN ret_1m IS NOT NULL AND z_vol IS NOT NULL AND buy_ratio IS NOT NULL
AND ret_1m >= 0.02 AND z_vol >= 3 AND buy_ratio >= 0.65
THEN 1 ELSE 0 END AS is_pump,
CASE WHEN ret_1m IS NOT NULL AND z_vol IS NOT NULL AND buy_ratio IS NOT NULL
AND ret_1m <= -0.02 AND z_vol >= 3 AND buy_ratio <= 0.35
THEN 1 ELSE 0 END AS is_dump
FROM flow_features;
Preventing Alert Fatigue (The Cooldown)
A single pump event might trigger our rule for several consecutive minutes. To avoid flooding a user or system with duplicate alerts, we must implement a "cooldown" or "debouncing" mechanism. We will only issue an alert for a given market if it hasn't already had one in the last 15 minutes.
This query is more advanced, but it elegantly handles the stateful logic of checking for the last alert time for each pair.
CREATE MATERIALIZED VIEW pump_dump_alerts AS
WITH raw AS (
SELECT * FROM pump_dump_signals
WHERE is_pump = 1 OR is_dump = 1
),
ranked AS (
SELECT
pair_id,
bucket_start,
is_pump, is_dump,
ROW_NUMBER() OVER (PARTITION BY pair_id ORDER BY bucket_start DESC) AS rn
FROM raw
)
SELECT r.*
FROM ranked r
LEFT JOIN LATERAL (
-- last alert time for this pair
SELECT MAX(bucket_start) AS last_ts
FROM raw r2
WHERE r2.pair_id = r.pair_id AND r2.bucket_start < r.bucket_start
) prev ON TRUE
WHERE r.rn = 1 AND (prev.last_ts IS NULL OR r.bucket_start >= prev.last_ts + INTERVAL '15 minutes');
Step 4: Taking Action - Delivering the Final Alerts
Once a potential pump or dump event is confirmed and debounced by our pump_dump_alerts view, the final step is to deliver this information to a downstream system where it can be acted upon. RisingWave provides two powerful mechanisms for this: a direct push model with Subscriptions and a data integration model with Sinks.
Use Subscriptions for tightly-coupled, low-latency, event-driven services where you want to react to alerts instantly.
Use a Sink for broader data integration, when you need to reliably deliver alerts to multiple systems, or when you want the durability and replayability of a message queue like Kafka.
Option 1: Direct Push with RisingWave Subscriptions
For the lowest latency and simplest architecture, you can use a Subscription. This feature allows your downstream applications (like a notification service, a Telegram bot, or a live dashboard) to "subscribe" directly to changes in a materialized view. When a new alert is generated in our pump_dump_alerts view, RisingWave pushes the change directly to your connected application.
This approach is ideal for building event-driven services because it eliminates the need for an intermediary message queue, reducing both latency and operational overhead. Your application would use a standard PostgreSQL driver to connect to RisingWave and listen for new alert rows.
To enable this, you would first create a subscription on your final alerts view:
CREATE SUBSCRIPTION alert_sub FROM alerts_payload;
Your application could then connect and fetch new alerts as they happen.
Option 2: Sinking Data to a Message Queue
The more traditional approach is to use a Sink, which pushes data changes from RisingWave into an external system like Apache Kafka. This method is excellent for decoupling systems, providing persistent storage in the message queue, and fanning out alerts to multiple, independent consumer applications.
First, we create a clean, enriched payload for our alert.
CREATE MATERIALIZED VIEW alerts_payload AS
SELECT
a.pair_id, a.bucket_start,
a.is_pump, a.is_dump,
f.ret_1m, f.ret_5m, f.vol, f.z_vol, f.buy_ratio
FROM pump_dump_alerts a
JOIN flow_features f
ON f.pair_id = a.pair_id AND f.bucket_start = a.bucket_start;
Next, we create the sink to send the data to a Kafka topic. This could just as easily be configured for a webhook, an object store, or another database.
CREATE SINK pump_dump_alerts_sink
FROM alerts_payload
WITH (
connector = 'kafka',
topic = 'alerts.pump_dump',
properties.bootstrap.server = 'localhost:9092'
) FORMAT PLAIN ENCODE JSON (
force_append_only = 'true'
);
From Code to System: Production-Ready Considerations
While this provides a complete and working system, here are a few points to consider for a production environment:
Handling Data Imperfections: Watermarks are essential for managing late-arriving data and ensuring correctness. Tune the watermark interval based on your source's typical latency.
Tuning and Backtesting: The thresholds used in the scoring rule are not universal. They should be tuned based on historical data. You can sink all the flow_features data into an analytical database to find the optimal parameters that balance precision and recall for different markets.
Sparse Markets: For illiquid pairs, volume and return metrics can be very noisy. Consider using the active_pairs filter we discussed, or use more robust statistics like median and Mean Absolute Deviation (MAD) instead of mean and standard deviation for the Z-score calculation.
Conclusion & Next Steps
In just a few declarative SQL queries, we've built a sophisticated, real-time event detection system. We ingested a high-volume data stream, performed stateful, time-windowed calculations, and pushed actionable alerts to an external system. This showcases the power of a streaming database like RisingWave to handle complex, real-time analytical tasks that were once the domain of complex, bespoke code.
From here, this system can be extended even further:
Incorporate Order Book Data: Analyze features like bid-ask spread expansion, market depth, and price slippage for more robust signals.
Integrate External Signals: Combine price/volume action with social media sentiment or news announcement streams.
Apply Machine Learning: Use the calculated features to train a lightweight classification model (like logistic regression or a gradient-boosted tree) to produce a more nuanced pump-and-dump score.
Get Started with RisingWave
Try RisingWave Today:
Download the open-sourced version of RisingWave to deploy on your own infrastructure.
Get started quickly with RisingWave Cloud for a fully managed experience.
Talk to Our Experts: Have a complex use case or want to see a personalized demo? Contact us to discuss how RisingWave can address your specific challenges.
Join Our Community: Connect with fellow developers, ask questions, and share your experiences in our vibrant Slack community.
If you’d like to see a personalized demo or discuss how this could work for your use case, please contact our sales team.

