With the widespread adoption of machine learning, engineers began facing data challenges beyond algorithms. For example, due to ineffective management, data previously partitioned by other teams during training had to be re-partitioned; to extract and transform features embedded in the data, complex feature pipelines had to be rebuilt from scratch; after models were trained and online, the prediction results always lagged behind due to the low timeliness of feature data; lack of real-time monitoring of online data led to predictions using abnormal features, resulting in decreased accuracy of results, and so on. To better solve these data problems, people started using feature stores to manage various features.
So, what is a feature store? How does RisingWave work in a feature store?
Start from an example
Let's dive into the workings of a simple machine learning model using a classic example: predicting the total fare for a New York taxi ride based on the pickup and drop-off locations. You can read more details about this dataset in the introduction offered by Microsoft.
The first step for machine learning is data processing. In this dataset, there are many columns. They can generally be divided into three categories:
- Data related to user inputs and the desired results. In this example, the inputs are the pick-up and drop-off locations of the taxi (the pick-up location is
pu_location_id
, the drop-off location isdo_location_id
. Below we will uselocation_id
to refer to them together). The result is the total fare amount (fare_amount
). We need to save these data untouched for training. - Data used to extract features. Features refer to the user attributes that are related to prediction. For example, if we need to predict the total fare, then the average tip starting from a certain location would be an important feature. We need to extract features from these data and save the features.
- Data unrelated to prediction or flags identifying whether data is valid. We need to filter out these data to prevent feature pollution.
After processing the data according to the above process, we can then train a machine learning model using the features and corresponding total fare data. We can also import the features into the trained model for prediction.
What is a feature store?
As shown in the example above, data processing in machine learning essentially involves the transformation, storage, and retrieval of features. This is where a feature store helps us. The main function of a feature store is to transform and store various complex features, and then provide specific feature retrieval based on demand, serving as a data service.
Specifically, a feature store needs to provide two types of services for different scenarios: offline feature services and online feature services. The main differences between them are as follows:
- Offline features are generally used for the training process of machine learning, which often requires a large amount of data. This kind of data only needs to include specific required features. Since training is conducted offline, the execution time for feature queries is not highly demanding.
- Online features mainly serve the prediction process of machine learning. They typically return all or a subset of features for a few records. The data volume for online features is much smaller than that of offline features, but there is a higher requirement for query speed and real-time data.
Here, let's consider a simple example, as shown in the diagram below, of a well-functioning feature store.
This feature store has accumulated many features after running for a long time, denoted as f1
, f2
, ..., and fn
. It is connected to upstream business processes or other data sources, which continuously inject real-world data into the feature store. Once injected, the data undergoes predefined transformation logic to extract and save features.
Now an engineer wants to train a new model by this feature store, predicting the total fare for a trip based on the pickup and drop-off locations, in other words, based on the location_id
. He does not need to access all the features, but only all the data under columns f1
and f3
, along with their corresponding total fare data, for training purposes. The service offered here by the feature store is the offline feature service.
When the model is trained and deployed for online use, users will want to predict their total fare based on their location_id
. At this point, the user's location_id
will be imported into the feature store. The feature store will then retrieve the f1
and f3
feature data corresponding to the location_id
. These feature data will be injected into the machine learning model for prediction. Here the feature store provides online feature services.
How to use RisingWave
As known by those who have read our previous articles, RisingWave is a distributed stream database designed for high-throughput, real-time streaming data processing and storage. This aligns seamlessly with the requirements for online features in a Feature Store. Therefore, RisingWave can be used as a component of the feature store for online feature processing.
So, how can we achieve it? In general, we use RisingWave to handle the extraction, transformation, storage, and monitoring of online data features. The specific details are shown in the diagram below. Engineers first create the corresponding streaming processing pipeline and Materialized Views (MVs) in RisingWave using SQL statements based on the feature requirements of the model. Here, different MVs can be created to meet different needs.
When the system starts running, user behaviors generate continuous streams of data, which are then sent to RisingWave downstream via Kafka. RisingWave, based on the pre-defined streaming processing pipeline, extracts features in real-time and performs transformations. The features are then stored internally in the form of MVs. If the algorithm model needs to use the related features for online prediction, it can directly interact with the server and obtain the real-time features by querying the MVs.
As for offline features, we can choose different approaches based on the requirements:
- We can use RisingWave's built-in batch processing system, which allows batch data to be queried using SQL statements.
- If higher batch processing performance is required, we can use a sink to connect RisingWave to a data warehouse or a system that is more suitable for batch processing.
Here, we will describe in detail the feature processing process in RisingWave using the first approach. The diagram below shows the RisingWave feature processing process for the New York taxi example mentioned in this article.
As data is continuously injected into RisingWave, it first goes through a filter operation to remove invalid data and saves the cleaned data to the corresponding filter MV. Then, two types of MVs are established downstream of the filter MV:
- User MV, which is used to store user inputs and desired output results, i.e., the pickup and drop-off locations of the taxi (
location_id
), as well as the total fare amount (fare_amount
). - Feature MV, which is used to store the features extracted from the valid data. Here, the pickup and drop-off locations of the taxi are used as primary keys for easy access to features during training and prediction.
The above MVs are updated in real-time as data is injected, ensuring the real-time nature of online features.
During training, we pass in the required feature names, and then query the corresponding offline features in the feature MV and the corresponding total fare amount in the User MV. These features are used together to train the model. During prediction, we pass in the location_id
and query the corresponding online features in the feature MV. These features are then used to make predictions using the trained model.
How to quickly build a feature store using RisingWave
Based on the above architecture, we have developed a simple demo to showcase the feature store implemented in RisingWave.
Here are the deployment steps:
- Get the source code.
git clone <https://github.com/risingwavelabs/risingwave.git>
cd integration_tests/feature-store
- Deploy Kafka, RisingWave, and the feature store using Docker.
docker compose up --build
Here, we start RisingWave, the feature store, and Kafka in a Docker cluster. RisingWave mainly consists of frontend nodes, compute nodes, metadata nodes, and MinIO. In this demo cluster, the data for materialized views will be stored in a MinIO instance. The SQL statements below show how the data is processed. The feature store includes a server and a simulator. After creating the Kafka nodes, topics are automatically created.
The feature store system performs the following actions sequentially:
- Writes simulated offline data to Kafka→RisingWave, extracting behavior tables and feature tables.
- Joins the behavior table and feature table to obtain corresponding offline training data and performs model training.
- Writes simulated online feature data to Kafka→RisingWave.
- Uses
do_location_id
andpu_location_id
to query the latest online features in RisingWave and makes predictions using the trained model.
-- Create a source to consume data from Kafka
create source if not exists taxiallfeature (
vendor_id int,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag boolean,
ratecode_id float,
pu_location_id int,
do_location_id int,
passenger_count float,
trip_distance float,
fare_amount float,
extra float,
mta_tax float,
tip_amount float,
tolls_amount float,
ehail_fee float,
improvement_surcharge float,
total_amount float,
payment_type float,
trip_type float,
congestion_surcharge float,
) with (
connector = 'kafka',
topic = 'taxi',
properties.bootstrap.server = 'kafka:9092',
)
FORMAT PLAIN ENCODE JSON;
-- Create a materialized view to filter out invalid data and irrelevant features for prediction, and create a window identifier (window_start) --
create materialized view useful_filter as select window_start,
lpep_pickup_datetime ,lpep_dropoff_datetime ,do_location_id,
pu_location_id,passenger_count ,trip_distance ,fare_amount,
extra ,mta_tax,tip_amount,tolls_amount ,improvement_surcharge,
total_amount,congestion_surcharge from (
select * from tumble(taxiallfeature,lpep_pickup_datetime,INTERVAL '5' hour)
) where payment_type in (1,2,4);
-- Create a behavior table to store the true results corresponding to each behavior for later training, with window_start for easy feature querying --
create materialized view location_id_store as select do_location_id,pu_location_id,window_start,fare_amount from useful_filter;
-- Create a feature table to extract relevant features from pre-defined windows. Here, the window size is 5 hours, and we calculate various averages for the same do_location_id --
create materialized view converted_features_with_do_location as select
do_location_id,
window_start,
-- Calculate the minute used between pickup and dropoff times and convert it to INT --
avg(EXTRACT(EPOCH FROM lpep_dropoff_datetime - lpep_pickup_datetime)::INT) / 10 as latency,
avg(passenger_count) as passenger_count,
avg(trip_distance) as trip_distance,
avg(extra) as extra,
avg(mta_tax) as mta_tax,
avg(tip_amount) as tip_amount,
avg(tolls_amount) as tolls_amount,
avg(improvement_surcharge) as improvement_surcharge,
avg(total_amount) as total_amount,
avg(congestion_surcharge) as congestion_surcharge,
-- Determine if the trip is a long-distance order, where distance greater than 30 is considered long-distance --
avg(trip_distance) > 30 as long_distance
from useful_filter group by do_location_id,window_start;
-- Create a feature table to extract relevant features from pre-defined windows. Here, the window size is 5 hours, and we calculate various averages for the same pu_location_id --
create materialized view converted_features_with_pu_location as select
pu_location_id,
window_start,
avg(EXTRACT(EPOCH FROM lpep_dropoff_datetime - lpep_pickup_datetime)::INT) / 10 as latency,
avg(passenger_count) as passenger_count,
avg(trip_distance) as trip_distance,
avg(extra) as extra,
avg(mta_tax) as mta_tax,
avg(tip_amount) as tip_amount,
avg(tolls_amount) as tolls_amount,
avg(improvement_surcharge) as improvement_surcharge,
avg(total_amount) as total_amount,
avg(congestion_surcharge) as congestion_surcharge,
avg(trip_distance) > 30 as long_distance
from useful_filter group by pu_location_id,window_start;
- Then you can use the following command to retrieve the system logs.
cat .log/simulator_log
The specific logs are as follows. Please note that this demonstration is only for showcasing the feature store built on RisingWave. So the model parameters used in the demo have not been optimized, and the data volume is also small. The prediction accuracy may thus not meet production requirements. You can expand on it if needed.
// Connect to the client for training and prediction
This is the recwave actor!
Connected to server
// Write simulated training data to Kafka->RisingWave
Write training data len is 9000
// Start training using existing data in RisingWave. Offline features are obtained through the join of the behavior table and the feature table.
Start training
Offline features have been written to Kafka
// Training completed. Simulate writing online features
write online feature, DOLocationID is 138
write online feature, DOLocationID is 243
write online feature, DOLocationID is 63
write online feature, DOLocationID is 37
write online feature, DOLocationID is 160
// Simulate real-time prediction using existing online features from RisingWave. Here, the corresponding features from the latest window in RisingWave are queried and imported into the trained model with offline features for prediction. The predicted results are then compared with the actual results and printed.
DOLocationID is 138 fare amount: predicted results 24.845151901245117 , real results 25.0
DOLocationID is 243 fare amount: predicted results 7.779667854309082 , real results 8.0
DOLocationID is 63 fare amount: predicted results 57.45588684082031 , real results 57.5
DOLocationID is 37 fare amount: predicted results 18.309465408325195 , real results 12.0
DOLocationID is 160 fare amount: predicted results 10.065141677856445 , real results 10.0
At this point, the online features stored in RisingWave for converted_features_with_pu_location
are as follows. You can see that the online data has been extracted and saved in RisingWave based on the relevant windows.
do_location_id | avg_amount | window_start | latency | passenger_count | trip_distance | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | long_distance
--------------------------------------------------------------------------
243 | 8 | 2022-04-01 03:00:00 | 50.40 | 1 | 1.7399999999999998 | 0.5 | 0.5 | 0 | 0 | 0.3 | 9.3 | 0 | f
37 | 18.25 | 2022-03-31 22:00:00 | 221 | 1 | 3.825 | 0.25 | 0.25 | 4.455 | 0 | 0.29999999999999993 | 23.505 | 0 | f
160 | 10 | 2022-04-01 03:00:00 | 75.20 | 1 | 2.14 | 0.5 | 0.5 | 0 | 0 | 0.3 | 11.3 | 0 | f
138 | 25 | 2022-04-01 03:00:00 | 82 | 1 | 5.03 | 0 | 0 | 0 | 0 | 0.3 | 25.3 | 0 | f
63 | 57.5 | 2022-04-01 03:00:00 | 523.70 | 2 | 12.65 | 0.5 | 0.5 | 0 | 0 | 0.3 | 58.8 | 0 |
This article introduces the workings of machine learning through a classic case, followed by an explanation of the concept of Feature Store, and how RisingWave can be used as a component for online feature processing in Feature Stores. Lastly, a simple demo is created to demonstrate the Feature Store implemented with RisingWave.