RisingWave user-defined functions: Rust x WebAssembly

RisingWave user-defined functions: Rust x WebAssembly

Why choose Rust for UDFs?

Initially, RisingWave supported Python UDF servers due to Python's large user base and high development efficiency. However, as more users started using Python UDFs, several issues became apparent:

  • Performance issues: Python's dynamic nature and interpreted execution make it one of the slowest mainstream programming languages, causing even moderately complex logic to run for extended periods.
  • High latency: Data is transmitted between the database and the UDF server via remote procedure calls (RPC), introducing significant latency (usually in milliseconds), slowing down the entire data flow response.
  • Maintenance overhead: The UDF Server is flexible, but users need to deploy and maintain the service themselves. If the load is high, they also need to manage load balancing. Additionally, it is not managed by RisingWave, introducing many uncontrollable factors into the system.

Most users just want to use UDFs to implement logic not supported by built-in functions, which does not justify the added maintenance burden and performance risk. Therefore, a high-performance, in-process UDF solution is needed.

To achieve high performance, we first need to select a suitable programming language. Options include C, C++, Rust, and Go. We chose Rust because it is the programming language used by RisingWave, and it also has a mature ecosystem in both Arrow and WebAssembly.

Then is the execution environment. In-process execution also requires strong isolation. Users can write arbitrary code, but it must not affect the operation of RisingWave itself. This requires a sandbox environment to restrict the CPU and memory usage of untrusted code and its access to external resources. Therefore, we cannot run the instructions compiled from UDF natively. Instead, we need to compile them into a virtual machine instruction set and execute them via JIT (Just-In-Time) compilation. WebAssembly is an ideal choice for this. Many WebAssembly interpreters are written in Rust, making them easy to embed into RisingWave.

Thus, we chose Rust as the programming language and WebAssembly as the execution environment.

User interface

Embedding Rust code

To create a Rust UDF in RisingWave, use the create function command to define the function name and data types, then embed a Rust function with the same signature.

create function gcd(int, int) returns int language rust as $$
    fn gcd(mut a: i32, mut b: i32) -> i32 {
        while b != 0 {
            (a, b) = (b, a % b);
        }
        a
    }
$$;

For table functions, since Rust's generators are not stable, the Rust function should return an iterator, yielding one row at a time:

create function range(n int) returns table (x int) language rust as $$
    fn range(n: i32) -> impl Iterator<Item = i32> {
        (0..n).into_iter()
    }
$$;

These Rust code are compiled into WebAssembly modules on the frontend and executed in the WebAssembly runtime on the backend.

Uploading WebAssembly modules

For more complex functions requiring third-party libraries or multiple files, users can create a standalone Rust project and compile it into a WebAssembly module using our provided framework. The WebAssembly module can then be uploaded to RisingWave for execution.

For example, to parse protobuf using the prost library:

// lib.rs
use arrow_udf::{function, types::StructType};
use prost::{DecodeError, Message};

// Import Rust code generated from .proto files
pub mod proto {
    include!(concat!(env!("OUT_DIR"), "/proto.rs"));
}

// Define the returned struct
#[derive(StructType)]
struct DataKey {
    stream: String,
    pan: String,
}

// Define the parsing function
#[function("decode_proto(bytea) -> struct DataKey")]
fn decode_proto(data: &[u8]) -> Result<DataKey, DecodeError> {
    let data_key = proto::DataKey::decode(data)?;
    Ok(DataKey {
        stream: data_key.stream,
        pan: data_key.pan,
    })
}

Finally, import the compiled WebAssembly module into RisingWave using base64 encoding.

set wasm_binary `base64 -i target/release/decode.wasm`
create function decode_proto(bytea) returns struct<stream varchar, pan varchar>
language wasm using base64 :'wasm_binary';

Note that the language here is now wasm instead of rust, as we are dealing with WebAssembly instructions. In the future, we might support more languages that compile to WebAssembly.

Internal implementation

After discussing the user interface, let's look at the internal implementation of RisingWave.

Compilation on frontend

When receiving a create function ... language rust ... statement, the RisingWave frontend generates a temporary Rust project locally. The embedded Rust code snippet is extracted and supplemented with necessary auxiliary code.

For example, for the following function:

create function gcd(int, int) returns int language rust as $$
    fn gcd(mut a: i32, mut b: i32) -> i32 {
        while b != 0 {
            (a, b) = (b, a % b);
        }
        a
    }
$$;

The frontend adds use statements and SQL signatures, generating a lib.rs file like this:

use arrow_udf::{function, types::StructType}; // import prelude

#[function("gcd(int, int) -> int")]
fn gcd(mut a: i32, mut b: i32) -> i32 {
    while b != 0 {
        (a, b) = (b, a % b);
    }
    a
}

The frontend then compiles the code using cargo in release mode, producing a .wasm file and attempts to reduce the binary size using wasm-strip (usually down to around 1MB).

The remaining steps are the same as handling the language wasm scenario. The wasm file is compressed and stored as part of the function definition in the catalog metadata, to be read and decompressed during execution.

Functions compilation

So how is this function compiled into a WebAssembly module that can be executed?

If you have read our previous article on the built-in function framework, you will notice a very similar design. Both are Rust implementations of SQL functions, with no fundamental differences. Therefore, we reused the same procedural macro-based code generation framework, ultimately wrapping the user function to generate a columnar evaluation function based on Apache Arrow:

fn gcd_eval(input: &RecordBatch) -> Result<RecordBatch> {...}

At this stage, if it were a built-in function, it could already be called and executed by Rust. However, since we need to compile it to WebAssembly, and WebAssembly interpreters can only call functions through the standard C ABI (as Rust does not yet have a stable ABI), we need to do some additional work to wrap it into a C function that can be called via FFI. The final generated code looks like this:

#[export_name = "arrowudf_Base64EncodedSignature"]
unsafe extern "C" fn gcd_int4_int4_int4_ffi(
    ptr: *const u8,
    len: usize,
    out: *mut arrow_udf::ffi::CSlice,
) -> i32 {
    // decode input RecordBatch from the buffer specified by `ptr` and `len`
    let input: RecordBatch = ...;

    // call Rust function
    let result = gcd_eval(&input);

    // encode output RecordBatch or error message to `out`
    match result {
        Ok(o) => { ...; 0 }
        Err(e) => { ...; -1 }
    }
}

Here, the ptr and len represent an input buffer containing the encoded input RecordBatch. Correspondingly, out is the function's return value, indicating a buffer with the encoded output RecordBatch. This buffer needs to be manually reclaimed by the caller after reading. Thus, the WebAssembly module must expose its own alloc and dealloc functions. The function's return value is an error code: 0 for success, -1 for an error. If an error occurs, the buffer pointed to by out contains the error message for reading.

An interesting design is the function's symbol name, indicated by the string in #[export_name]. When the UDF loader gets the .wasm file, it first reads the metadata of all UDFs from somewhere. The metadata tells it which functions the module contains, their signatures, and where to call them. The first and third questions are simple because every extern "C" fn appears in the module's export symbol table. As for the second question, we can encode the function signature (e.g., "gcd(int4, int4) -> int4") as a string in the symbol name. However, since the symbol name cannot contain characters like parentheses and spaces, we need to re-encode the string into legal characters using base64 and add a fixed prefix arrowudf_. The UDF loader only needs to filter the symbols with this prefix in the symbol table and decode the base64-encoded signature. (Note: the base64 used here is not standard; see code for reasons).

If the user function returns a struct type, the struct definition is encoded in a separate symbol. For example:

#[derive(StructType)]
struct KeyValue<'a> {
    key: &'a str,
    value: &'a str,
}
#[function("key_value(string) -> struct KeyValue")]
fn key_value(kv: &str) -> Option<KeyValue<'_>> {
    let (key, value) = kv.split_once('=')?;
    Some(KeyValue { key, value })
}

The procedural macro exports two symbols (with parts in square brackets base64-encoded):

arrowudf_[key_value(string)->struct KeyValue]
arrowudt_[KeyValue=key:string,value:string]

The symbol defining the struct is prefixed with arrowudt_. This symbol does not point to any entity; it merely encodes its schema through its name. Once the loader finds all the function and type symbols, it can reconstruct the complete function signature.

If you have compiled a WebAssembly UDF module, you can view its export symbols with the following command:

wasm-objdump -x udf.wasm | grep arrowud

Loading and execution

Finally, the database loads the WebAssembly module and executes its functions. We use wasmtime as the runtime. It first loads the decompressed WebAssembly module, then scans all functions and types according to the protocol. When a user calls a function, it locates the entry point of the function by its name. The input data is stored in a dynamically allocated memory within the module using its alloc function. Then, the function is called, and the output is obtained. Finally, the output is parsed, and the memory is released in reverse steps.

Since each WebAssembly instance can only run single-threaded, we maintain an instance pool for multithreaded parallel execution. Each thread takes an idle instance from the pool, creates a new one if none is available, and returns it after use. This ensures on-demand instantiation, avoiding excessive memory usage (each instance consumes at least a few MB of memory).

Discussions

Finally, let's discuss some technical issues in the WebAssembly-based Rust UDF solution that you may concern.

Performance

Can Rust UDFs achieve high performance? If no WebAssembly layer is used for isolation, theoretically, by loading dynamic libraries and running native instructions directly, UDFs would perform identically to built-in functions. If users have complete control over RisingWave and pursue extreme performance, this may be the best solution. However, the downside is that UDFs might cause RisingWave to block or even crash. Users must bear the risk themselves.

Using WebAssembly JIT execution, after warm-up, the execution time of pure functions is about 1.5-2x that of native execution. However, considering the additional data transmission overhead, the actual end-to-end execution time may be longer. According to our benchmarks, for simple calculations like gcd, WebAssembly execution time is about 10x that of native execution.

I believe there is room for optimization in data transmission. In the future, we can attempt to map host memory to the wasm internal directly, transmitting Arrow arrays with zero-copy. This could avoid a lot of encoding/decoding and memory copying overhead.

External access

Currently, WebAssembly modules only support pure computation, with almost no external access, including network and file system access. This is because wasmtime defaults to sandbox mode. However, to support using std, we must enable WASI (can be understood as system calls for wasm). This way, even if UDF panics, error messages can be printed through stderr. However, we still limit the buffer size of stdout and stderr to prevent attackers from exhausting host memory.

Theoretically, we can gradually open various interfaces, allowing modules to access the network under control. This way, we could even use UDF to implement custom sources (refer to the attempt from a community user). However, these have not been implemented yet and require further exploration in the future.

>

This article discusses the design and implementation of Rust UDFs in RisingWave. Rust UDFs achieve isolation by compiling to WebAssembly while maintaining high performance. We reused the #[function] procedural macro from the internal function framework to provide a concise interface for users. Functions are wrapped at compile time into C interfaces, exchanging data with callers via Arrow RecordBatch. Function signatures and type definitions are encoded in symbols. The loader then finds all functions and types by scanning the symbol table. Finally, functions are JIT executed by wasmtime. > >

>

It is worth mentioning that this work is not strongly tied to RisingWave. Any data processing system written in Rust can execute Rust UDFs by importing the arrow_udf_wasm library. > >

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.