A real-time NFT market analytics platform tracks floor prices, collection volume, rarity scores, and marketplace events across OpenSea, Blur, and other venues as they happen — not 10 minutes later when the market has moved. With RisingWave, a PostgreSQL-compatible streaming database, you can build continuously updated materialized views over NFT marketplace event streams using standard SQL, powering dashboards and alert systems that respond to market movements instantly.
Why Real-Time NFT Analytics Matters
The NFT market moves in minutes, not hours. A collection can see its floor price collapse by 30% in the time it takes a batch analytics job to run. During high-activity periods — a major mint, a celebrity endorsement, or a sudden liquidity crisis — stale data is useless data.
Critical metrics that require real-time tracking:
- Floor price: The cheapest listed item in a collection — changes continuously as listings are created, updated, and filled
- Collection volume: Total trading value in a rolling window — a leading indicator of market interest and manipulation
- Rarity-adjusted pricing: Whether rare trait combinations are trading at appropriate premiums relative to floor
- Wash trading signals: Coordinated buy/sell patterns that artificially inflate volume metrics
- Royalty revenue: Creator earnings on secondary sales — trackable per collection in real time
For traders, late floor price data means buying into a falling market. For creators and projects, delayed volume data makes it impossible to distinguish organic growth from manufactured hype. For analytics platforms competing on data quality, batch latency is a product liability.
How Streaming SQL Solves This
Marketplace event indexers publish sale, listing, offer, and cancel events to Kafka. RisingWave consumes these events and maintains materialized views: per-collection floor price trackers, volume aggregators, rarity-weighted price views, and wash trading detection. All views update incrementally as new events arrive — there's no scheduled reprocessing.
Building It Step by Step
Step 1: Connect the Data Source
CREATE SOURCE nft_marketplace_events (
event_id VARCHAR,
event_type VARCHAR, -- 'sale', 'listing', 'offer', 'cancel', 'transfer'
marketplace VARCHAR, -- 'opensea', 'blur', 'looksrare', 'x2y2'
contract_address VARCHAR, -- NFT collection contract
collection_name VARCHAR,
token_id VARCHAR,
seller_address VARCHAR,
buyer_address VARCHAR,
price_eth NUMERIC,
price_usd NUMERIC,
rarity_score NUMERIC, -- pre-computed rarity rank (lower = rarer)
rarity_rank INTEGER,
royalty_eth NUMERIC,
tx_hash VARCHAR,
block_number BIGINT,
event_time TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'nft.marketplace.events',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 2: Build the Core Materialized View
-- Real-time floor price: cheapest active listing per collection
CREATE MATERIALIZED VIEW collection_floor AS
SELECT DISTINCT ON (contract_address)
contract_address,
collection_name,
marketplace,
token_id AS floor_token_id,
price_eth AS floor_price_eth,
price_usd AS floor_price_usd,
event_time AS listed_at
FROM nft_marketplace_events
WHERE event_type = 'listing'
ORDER BY contract_address, price_eth ASC, event_time DESC;
-- Collection volume and sales stats (rolling 24 hours)
CREATE MATERIALIZED VIEW collection_volume_24h AS
SELECT
contract_address,
collection_name,
window_start,
window_end,
COUNT(*) AS sale_count,
SUM(price_eth) AS volume_eth,
SUM(price_usd) AS volume_usd,
AVG(price_eth) AS avg_sale_price_eth,
MIN(price_eth) AS min_sale_price_eth,
MAX(price_eth) AS max_sale_price_eth,
SUM(royalty_eth) AS royalties_earned_eth,
COUNT(DISTINCT buyer_address) AS unique_buyers,
COUNT(DISTINCT seller_address) AS unique_sellers
FROM TUMBLE(nft_marketplace_events, event_time, INTERVAL '24 HOURS')
WHERE event_type = 'sale'
GROUP BY contract_address, collection_name, window_start, window_end;
-- Rarity-adjusted price analysis: sales by rarity tier
CREATE MATERIALIZED VIEW rarity_price_analysis AS
SELECT
contract_address,
collection_name,
window_start,
window_end,
CASE
WHEN rarity_rank <= 100 THEN 'top_1pct'
WHEN rarity_rank <= 500 THEN 'top_5pct'
WHEN rarity_rank <= 1000 THEN 'top_10pct'
ELSE 'common'
END AS rarity_tier,
COUNT(*) AS sale_count,
AVG(price_eth) AS avg_price_eth,
AVG(price_eth / NULLIF(rarity_rank, 0)) AS price_per_rarity_point
FROM TUMBLE(nft_marketplace_events, event_time, INTERVAL '1 HOUR')
WHERE event_type = 'sale'
AND rarity_rank IS NOT NULL
GROUP BY contract_address, collection_name, window_start, window_end,
CASE
WHEN rarity_rank <= 100 THEN 'top_1pct'
WHEN rarity_rank <= 500 THEN 'top_5pct'
WHEN rarity_rank <= 1000 THEN 'top_10pct'
ELSE 'common'
END;
Step 3: Add Alerts and Detection Logic
-- Floor price crash alert: floor drops more than 20% in 1 hour
CREATE MATERIALIZED VIEW floor_price_alerts AS
SELECT
curr.contract_address,
curr.collection_name,
curr.floor_price_eth AS current_floor,
prev_hour.avg_floor_eth AS previous_floor,
(curr.floor_price_eth - prev_hour.avg_floor_eth)
/ NULLIF(prev_hour.avg_floor_eth, 0) * 100
AS floor_change_pct,
curr.event_time AS alert_time
FROM collection_floor curr
JOIN (
SELECT
contract_address,
window_start,
AVG(price_eth) AS avg_floor_eth
FROM TUMBLE(
(SELECT * FROM nft_marketplace_events WHERE event_type = 'listing'),
event_time,
INTERVAL '1 HOUR'
)
GROUP BY contract_address, window_start
) prev_hour
ON curr.contract_address = prev_hour.contract_address
WHERE (curr.floor_price_eth - prev_hour.avg_floor_eth)
/ NULLIF(prev_hour.avg_floor_eth, 0) < -0.20;
-- NFT wash trading detection: same buyer/seller pair trading the same token
CREATE MATERIALIZED VIEW nft_wash_trading_flags AS
SELECT
s1.contract_address,
s1.collection_name,
s1.token_id,
s1.seller_address,
s1.buyer_address,
s1.price_eth AS sale_1_price,
s2.price_eth AS sale_2_price,
s1.event_time AS sale_1_time,
s2.event_time AS sale_2_time
FROM nft_marketplace_events s1
JOIN nft_marketplace_events s2
ON s1.contract_address = s2.contract_address
AND s1.token_id = s2.token_id
AND s1.buyer_address = s2.seller_address
AND s1.seller_address = s2.buyer_address
AND s2.event_time > s1.event_time
AND s2.event_time <= s1.event_time + INTERVAL '24 HOURS'
WHERE s1.event_type = 'sale'
AND s2.event_type = 'sale';
-- Sink floor alerts and wash trading flags to Kafka
CREATE SINK nft_alert_sink AS
SELECT
contract_address, collection_name, 'FLOOR_CRASH' AS alert_type,
current_floor AS value, alert_time
FROM floor_price_alerts
UNION ALL
SELECT
contract_address, collection_name, 'WASH_TRADING' AS alert_type,
sale_1_price AS value, sale_1_time AS alert_time
FROM nft_wash_trading_flags
WITH (
connector = 'kafka',
topic = 'nft.alerts',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
Step 4: Query for Real-Time Insights
-- Top collections by 24-hour volume with current floor
SELECT
v.collection_name,
ROUND(f.floor_price_eth, 4) AS floor_eth,
ROUND(v.volume_eth, 2) AS volume_24h_eth,
v.sale_count,
v.unique_buyers,
ROUND(v.royalties_earned_eth, 4) AS royalties_eth
FROM collection_volume_24h v
JOIN collection_floor f
ON v.contract_address = f.contract_address
WHERE v.window_end = (SELECT MAX(window_end) FROM collection_volume_24h)
ORDER BY v.volume_eth DESC
LIMIT 10;
-- Rarity premium analysis: do rare items trade at expected multiples?
SELECT
collection_name,
rarity_tier,
sale_count,
ROUND(avg_price_eth, 4) AS avg_price_eth
FROM rarity_price_analysis
WHERE collection_name = 'Bored Ape Yacht Club'
AND window_end = (SELECT MAX(window_end) FROM rarity_price_analysis)
ORDER BY avg_price_eth DESC;
Comparison: Batch vs Streaming
| Aspect | Batch ETL | Streaming SQL (RisingWave) |
| Latency | Minutes | Sub-second |
| Floor Price Accuracy | Stale by minutes | Per-listing update |
| Volume Metrics | Historical only | Rolling real-time |
| Wash Trading Detection | Post-hoc analysis | Real-time flagging |
| Rarity Analytics | Scheduled reports | Continuously maintained |
| Alert Speed | Next batch | Milliseconds |
FAQ
Q: How do you handle listing cancellations and floor price computation?
When a listing is canceled, the marketplace indexer emits a cancel event. The collection_floor materialized view should filter for active listings by joining against a set of open listings — tracking creations and cancellations as a delta. An alternative simpler approach: query the materialized view of recent listing events excluding tokens that subsequently have a matching cancel or sale event, using the DISTINCT ON pattern with event_type ordering. In practice, most floor price analytics platforms re-derive the floor from a snapshot of active listings rather than pure event-driven tracking.
Q: Can RisingWave power a real-time rarity sniper — alerting when a rare item lists below fair value?
Yes. Build a view that joins new listing events with the rarity tier average price from rarity_price_analysis. When a listing price for a rare token is below the average for its tier, emit an alert. This is a standard stream-to-view join: new listings are the stream, rarity-adjusted price averages are the continuously maintained view, and the filter condition triggers the alert.
Q: How do you track royalty revenue accurately when different marketplaces handle royalties differently?
Royalty enforcement varies: OpenSea enforces creator royalties, Blur made them optional. The most accurate approach is to track royalty payments directly from on-chain event data (ERC-2981 royalty payment events or protocol-specific settlement events) rather than relying on marketplace-reported fields. Include royalty transfer events in the Kafka topic and join them to sale events by tx_hash to get actual royalty paid per transaction.
Key Takeaways
- A Kafka source captures sale, listing, offer, and cancel events from all NFT marketplaces in a unified stream
- The
collection_floormaterialized view continuously tracks the cheapest active listing per collection - Rolling 24-hour volume views update with every new sale, giving real-time collection momentum data
- Wash trading detection uses a self-join pattern matching token round-trips between the same address pair
- Floor crash alerts and wash trading flags sink to Kafka within milliseconds of detection
Ready to try this? Get started with RisingWave. Join our Slack community.

