RisingWave user-defined functions: Rust x WebAssembly
Rust UDFs is ideal for computationally intensive logic. In this article, we will introduce the design and implementation of RisingWave Rust UDFs.
Rust UDFs is ideal for computationally intensive logic. In this article, we will introduce the design and implementation of RisingWave Rust UDFs.
RisingWave supports creating user-defined functions (UDFs) using Rust. These Rust UDFs are compiled into WebAssembly, a virtual machine assembly language originally designed for Web browsers. They are then executed within a WebAssembly virtual machine embedded in RisingWave using Just-In-Time (JIT) compilation, offering high efficiency with minimal performance loss compared to native instructions and avoiding the high latency of remote communication. This makes Rust UDFs ideal for computationally intensive logic. In this article, we will introduce the design and implementation of RisingWave Rust UDFs.
Initially, RisingWave supported Python UDF servers due to Python's large user base and high development efficiency. However, as more users started using Python UDFs, several issues became apparent:
Most users just want to use UDFs to implement logic not supported by built-in functions, which does not justify the added maintenance burden and performance risk. Therefore, a high-performance, in-process UDF solution is needed.
To achieve high performance, we first need to select a suitable programming language. Options include C, C++, Rust, and Go. We chose Rust because it is the programming language used by RisingWave, and it also has a mature ecosystem in both Arrow and WebAssembly.
Then is the execution environment. In-process execution also requires strong isolation. Users can write arbitrary code, but it must not affect the operation of RisingWave itself. This requires a sandbox environment to restrict the CPU and memory usage of untrusted code and its access to external resources. Therefore, we cannot run the instructions compiled from UDF natively. Instead, we need to compile them into a virtual machine instruction set and execute them via JIT (Just-In-Time) compilation. WebAssembly is an ideal choice for this. Many WebAssembly interpreters are written in Rust, making them easy to embed into RisingWave.
Thus, we chose Rust as the programming language and WebAssembly as the execution environment.
To create a Rust UDF in RisingWave, use the create function
command to define the function name and data types, then embed a Rust function with the same signature.
create function gcd(int, int) returns int language rust as $$
fn gcd(mut a: i32, mut b: i32) -> i32 {
while b != 0 {
(a, b) = (b, a % b);
}
a
}
$$;
For table functions, since Rust's generators are not stable, the Rust function should return an iterator, yielding one row at a time:
create function range(n int) returns table (x int) language rust as $$
fn range(n: i32) -> impl Iterator<Item = i32> {
(0..n).into_iter()
}
$$;
These Rust code are compiled into WebAssembly modules on the frontend and executed in the WebAssembly runtime on the backend.
For more complex functions requiring third-party libraries or multiple files, users can create a standalone Rust project and compile it into a WebAssembly module using our provided framework. The WebAssembly module can then be uploaded to RisingWave for execution.
For example, to parse protobuf using the prost library:
// lib.rs
use arrow_udf::{function, types::StructType};
use prost::{DecodeError, Message};
// Import Rust code generated from .proto files
pub mod proto {
include!(concat!(env!("OUT_DIR"), "/proto.rs"));
}
// Define the returned struct
#[derive(StructType)]
struct DataKey {
stream: String,
pan: String,
}
// Define the parsing function
#[function("decode_proto(bytea) -> struct DataKey")]
fn decode_proto(data: &[u8]) -> Result<DataKey, DecodeError> {
let data_key = proto::DataKey::decode(data)?;
Ok(DataKey {
stream: data_key.stream,
pan: data_key.pan,
})
}
Finally, import the compiled WebAssembly module into RisingWave using base64 encoding.
\\set wasm_binary `base64 -i target/release/decode.wasm`
create function decode_proto(bytea) returns struct<stream varchar, pan varchar>
language wasm using base64 :'wasm_binary';
Note that the language
here is now wasm
instead of rust
, as we are dealing with WebAssembly instructions. In the future, we might support more languages that compile to WebAssembly.
After discussing the user interface, let's look at the internal implementation of RisingWave.
When receiving a create function ... language rust ...
statement, the RisingWave frontend generates a temporary Rust project locally. The embedded Rust code snippet is extracted and supplemented with necessary auxiliary code.
For example, for the following function:
create function gcd(int, int) returns int language rust as $$
fn gcd(mut a: i32, mut b: i32) -> i32 {
while b != 0 {
(a, b) = (b, a % b);
}
a
}
$$;
The frontend adds use statements and SQL signatures, generating a lib.rs
file like this:
use arrow_udf::{function, types::StructType}; // import prelude
#[function("gcd(int, int) -> int")]
fn gcd(mut a: i32, mut b: i32) -> i32 {
while b != 0 {
(a, b) = (b, a % b);
}
a
}
The frontend then compiles the code using cargo in release mode, producing a .wasm
file and attempts to reduce the binary size using wasm-strip (usually down to around 1MB).
The remaining steps are the same as handling the language wasm
scenario. The wasm file is compressed and stored as part of the function definition in the catalog metadata, to be read and decompressed during execution.
So how is this function compiled into a WebAssembly module that can be executed?
If you have read our previous article on the built-in function framework, you will notice a very similar design. Both are Rust implementations of SQL functions, with no fundamental differences. Therefore, we reused the same procedural macro-based code generation framework, ultimately wrapping the user function to generate a columnar evaluation function based on Apache Arrow:
fn gcd_eval(input: &RecordBatch) -> Result<RecordBatch> {...}
At this stage, if it were a built-in function, it could already be called and executed by Rust. However, since we need to compile it to WebAssembly, and WebAssembly interpreters can only call functions through the standard C ABI (as Rust does not yet have a stable ABI), we need to do some additional work to wrap it into a C function that can be called via FFI. The final generated code looks like this:
#[export_name = "arrowudf_Base64EncodedSignature"]
unsafe extern "C" fn gcd_int4_int4_int4_ffi(
ptr: *const u8,
len: usize,
out: *mut arrow_udf::ffi::CSlice,
) -> i32 {
// decode input RecordBatch from the buffer specified by `ptr` and `len`
let input: RecordBatch = ...;
// call Rust function
let result = gcd_eval(&input);
// encode output RecordBatch or error message to `out`
match result {
Ok(o) => { ...; 0 }
Err(e) => { ...; -1 }
}
}
Here, the ptr
and len
represent an input buffer containing the encoded input RecordBatch
. Correspondingly, out
is the function's return value, indicating a buffer with the encoded output RecordBatch
. This buffer needs to be manually reclaimed by the caller after reading. Thus, the WebAssembly module must expose its own alloc
and dealloc
functions. The function's return value is an error code: 0 for success, -1 for an error. If an error occurs, the buffer pointed to by out
contains the error message for reading.
An interesting design is the function's symbol name, indicated by the string in #[export_name]
. When the UDF loader gets the .wasm
file, it first reads the metadata of all UDFs from somewhere. The metadata tells it which functions the module contains, their signatures, and where to call them. The first and third questions are simple because every extern "C" fn
appears in the module's export symbol table. As for the second question, we can encode the function signature (e.g., "gcd(int4, int4) -> int4"
) as a string in the symbol name. However, since the symbol name cannot contain characters like parentheses and spaces, we need to re-encode the string into legal characters using base64 and add a fixed prefix arrowudf_
. The UDF loader only needs to filter the symbols with this prefix in the symbol table and decode the base64-encoded signature. (Note: the base64 used here is not standard; see code for reasons).
If the user function returns a struct type, the struct definition is encoded in a separate symbol. For example:
#[derive(StructType)]
struct KeyValue<'a> {
key: &'a str,
value: &'a str,
}
#[function("key_value(string) -> struct KeyValue")]
fn key_value(kv: &str) -> Option<KeyValue<'_>> {
let (key, value) = kv.split_once('=')?;
Some(KeyValue { key, value })
}
The procedural macro exports two symbols (with parts in square brackets base64-encoded):
arrowudf_[key_value(string)->struct KeyValue]
arrowudt_[KeyValue=key:string,value:string]
The symbol defining the struct is prefixed with arrowudt_
. This symbol does not point to any entity; it merely encodes its schema through its name. Once the loader finds all the function and type symbols, it can reconstruct the complete function signature.
If you have compiled a WebAssembly UDF module, you can view its export symbols with the following command:
wasm-objdump -x udf.wasm | grep arrowud
Finally, the database loads the WebAssembly module and executes its functions. We use wasmtime as the runtime. It first loads the decompressed WebAssembly module, then scans all functions and types according to the protocol. When a user calls a function, it locates the entry point of the function by its name. The input data is stored in a dynamically allocated memory within the module using its alloc
function. Then, the function is called, and the output is obtained. Finally, the output is parsed, and the memory is released in reverse steps.
Since each WebAssembly instance can only run single-threaded, we maintain an instance pool for multithreaded parallel execution. Each thread takes an idle instance from the pool, creates a new one if none is available, and returns it after use. This ensures on-demand instantiation, avoiding excessive memory usage (each instance consumes at least a few MB of memory).
Finally, let's discuss some technical issues in the WebAssembly-based Rust UDF solution that you may concern.
Can Rust UDFs achieve high performance? If no WebAssembly layer is used for isolation, theoretically, by loading dynamic libraries and running native instructions directly, UDFs would perform identically to built-in functions. If users have complete control over RisingWave and pursue extreme performance, this may be the best solution. However, the downside is that UDFs might cause RisingWave to block or even crash. Users must bear the risk themselves.
Using WebAssembly JIT execution, after warm-up, the execution time of pure functions is about 1.5-2x that of native execution. However, considering the additional data transmission overhead, the actual end-to-end execution time may be longer. According to our benchmarks, for simple calculations like gcd, WebAssembly execution time is about 10x that of native execution.
I believe there is room for optimization in data transmission. In the future, we can attempt to map host memory to the wasm internal directly, transmitting Arrow arrays with zero-copy. This could avoid a lot of encoding/decoding and memory copying overhead.
Currently, WebAssembly modules only support pure computation, with almost no external access, including network and file system access. This is because wasmtime defaults to sandbox mode. However, to support using std, we must enable WASI (can be understood as system calls for wasm). This way, even if UDF panics, error messages can be printed through stderr. However, we still limit the buffer size of stdout and stderr to prevent attackers from exhausting host memory.
Theoretically, we can gradually open various interfaces, allowing modules to access the network under control. This way, we could even use UDF to implement custom sources (refer to the attempt from a community user). However, these have not been implemented yet and require further exploration in the future.
Conclusion
This article discusses the design and implementation of Rust UDFs in RisingWave. Rust UDFs achieve isolation by compiling to WebAssembly while maintaining high performance. We reused the #[function]
procedural macro from the internal function framework to provide a concise interface for users. Functions are wrapped at compile time into C interfaces, exchanging data with callers via Arrow RecordBatch. Function signatures and type definitions are encoded in symbols. The loader then finds all functions and types by scanning the symbol table. Finally, functions are JIT executed by wasmtime.
It is worth mentioning that this work is not strongly tied to RisingWave. Any data processing system written in Rust can execute Rust UDFs by importing the arrow_udf_wasm library.
Runji Wang
Software Engineer
RisingWave is a cutting-edge SQL database designed to streamline the processing, analysis, and management of real-time event streaming data. But what sets it apart from other databases? Let’s take a look at the top 12 features, as ranked by our users in a recent survey, that make RisingWave a go-to choice for handling the complexities of real-time data.
Is it possible to implement a proactive agent which is able to know what to do without instructions from human? To achieve that, the agent needs to know what’s happening in real-time.
In this article, we'll show you how to set up a continuous data pipeline that seamlessly captures changes from your Postgres database using Change Data Capture (CDC) and streams them to Apache Iceberg.