Real-Time Anomaly Detection with SQL and Machine Learning
Continuous ML training pipelines use streaming data to keep models fresh — retraining or fine-tuning as new data arrives rather than waiting for scheduled batch jobs. Streaming databases prepare and deliver training data in real time, ensuring models learn from the latest patterns.
Streaming Training Pipeline
Event Streams → RisingWave (feature computation) → Iceberg (training data) → ML Training
│
Real-time features (serving via PG)
Feature Computation for Training
-- Compute training features continuously
CREATE MATERIALIZED VIEW training_features AS
SELECT user_id, ts,
COUNT(*) FILTER (WHERE ts > NOW()-INTERVAL '1 hour') as activity_1h,
AVG(amount) FILTER (WHERE ts > NOW()-INTERVAL '30 days') as avg_spend_30d,
label -- from labeled data source
FROM user_events GROUP BY user_id, ts, label;
-- Sink to Iceberg for training
CREATE SINK features_to_lake AS SELECT * FROM training_features
WITH (connector='iceberg', type='append-only', ...);
ML frameworks (PyTorch, TensorFlow) read training data from Iceberg. Features are always fresh.
Frequently Asked Questions
How often should I retrain models with streaming data?
Depends on concept drift. Financial models may need daily retraining. Recommendation models weekly. Monitor model accuracy over time and retrain when performance degrades.
Can RisingWave train ML models?
No. RisingWave computes features and delivers training data. Training happens in ML frameworks (PyTorch, Spark MLlib) reading from Iceberg or querying RisingWave via PostgreSQL.

