User-defined functions (UDFs) are a common feature in data processing systems. They allow users to define their own functions in various languages, supplementing built-in functions to achieve customized execution logic. With UDFs, we can combine multiple existing functions into new ones, simplifying query logic; or use languages like Python to leverage other language ecosystems, filling the gaps in SQL and built-in functions' expressiveness. Besides computation, we can also call external system APIs, integrating external services into a unified data processing pipeline. UDFs greatly enhance the flexibility and extensibility of data processing systems.

In previous articles, we introduced the development framework for internal functions in RisingWave, mainly for internal developers. Next, we will introduce the design and implementation of user-oriented UDF features through a series of articles. Technically, it follows the same principles but demands higher usability. As the first article in this series, let's overview the types, user interfaces, and usage scenarios of UDFs.


Categories of UDFs


UDFs can be categorized from three dimensions.

The first dimension is the function's input and output. In SQL, we have the following common functions:

  • Scalar function: Takes one row as input and outputs one row. For example, abs(-1) -> 1.
  • Table function: Takes one row as input and outputs multiple rows. For example, generate_series(1,3) -> {1,2,3}.
  • Aggregate function: Takes multiple rows as input and outputs one row. For example, sum({1,2,3}) -> 6.
  • Window function: Takes multiple rows as input and outputs multiple rows. For example, lag({1,2,3}) -> {NULL,1,2}.

Each type can support user-defined functions. In the context of UDFs, they are usually abbreviated as UDF, UDTF, UDAF, and UDWF. RisingWave currently supports UDF and UDTF, covering most practical needs. We are also developing UDAF to support more scenarios.

The second dimension is the language used to write the function. RisingWave currently supports using SQL, Python, Java, JavaScript, and Rust to write. They target different users and scenarios:

  • SQL is the native language of RisingWave. SQL UDFs do not extend beyond SQL's expressiveness; they simply combine existing functions for code reuse and query simplification.
  • Python is popular in data science and AI. It’s easy to write, and it has a rich ecosystem with many users, making it the first UDF language we supported. However, being an interpreted language, it's slow and unsuitable for heavy computations.
  • Java is mainstream in big data systems like Hadoop, Spark, Flink, and Kafka. So our Java UDFs mainly target users migrating from these systems. Java offers high performance but is complex to write and deploy.
  • JavaScript is mainstream for web development. Its ecosystem is rapidly growing, almost rivaling Python. Our JavaScript UDFs mainly target frontend and backend developers.
  • Rust is a high-performance systems programming language and is also used to develop RisingWave. Rust UDFs are suitable for heavy computation tasks, compiled into WebAssembly to run in RisingWave's built-in container.

Our goal is to let users write functions in any preferred and suitable language, removing language as a barrier to logic expression.

The third dimension is the function’s execution method, which affects performance and capabilities. We currently have three execution methods:

  • Inline execution: Specific to SQL UDFs, expanding directly into expressions at the frontend, with nearly no performance difference from manually calling multiple functions.
  • Embedded execution: Functions run in the language runtime embedded in RisingWave computation nodes. With almost no additional communication overhead, it’s fast but limited to pure computation without network access for safety. Currently, Python, JavaScript, and Rust support embedded execution.
  • External function: Functions run in a separate process, providing services to RisingWave via RPC. Now we support this type of function for Python and Java. This method offers maximum flexibility, allowing users to do anything in the process. For this type of functions, RisingWave is isolated from resource competition or interference. However, RPC introduces significant latency, potentially blocking data flow (especially if deployed in different data centers). It also complicates deployment and maintenance.

We first implemented external functions in Python, the most used UDF type by users. To address performance and usability issues, we developed SQL UDFs and embedded UDFs. Future articles will delve into their design and implementation, exploring related issues and solutions.


RisingWave UDF user interface


To give a more concrete idea of the UDFs mentioned above, let's see how users define UDFs in RisingWave with some classic scenarios. For more details, please refer to the RisingWave documentation.


Example 1: Simplify queries with SQL UDFs


If we want to convert student scores from percentage to grades, it can be done using the case when statement in SQL, but it's cumbersome and less readable. We can create a SQL UDF for this process. In RisingWave, use the create function statement to define a function:

create function grade(score int) returns varchar language sql as $$
    select case score >= 100 then 'A+'
        when score >= 90 then 'A'
        when score >= 80 then 'B'
        when score >= 70 then 'C'
        when score >= 60 then 'D'
        else 'F'
    end;
$$;

Then, we can call this UDF like any built-in function:

select score, grade(score) from generate_series(50, 100, 10) t(score);
 score | grade
-------+-------
    50 | F
    60 | D
    70 | C
    80 | B
    90 | A
   100 | A+
(6 rows)


Example 2: Mathematical calculations with Rust UDFs


To perform mathematical operations not supported by built-in functions, like finding the greatest common divisor (GCD), use a Rust UDF. Similar to the SQL UDF, we embed Rust code in the create function statement.

create function gcd(int, int) returns int language rust as $$
    fn gcd(mut a: i32, mut b: i32) -> i32 {
        while b != 0 {
            (a, b) = (b, a % b);
        }
        a
    }
$$;

This code is compiled to WebAssembly in the RisingWave frontend and achieves near-native performance when run on computation nodes using JIT (Just-in-time) compilation. Scenarios needing high-performance custom computation, like extracting quantitative factors, suit Rust UDFs.


Example 3: Parse protobuf data with Rust UDFs


Parsing Protobuf can also be done using Rust UDF. However, in this case, we need to use some third-party libraries (such as prost) to generate Rust code from the .proto file first, and then extract the data. The whole process requires creating a complete cargo project, which cannot be accomplished by merely embedding code in a statement.

// lib.rs
use arrow_udf::{function, types::StructType};
use prost::{DecodeError, Message};

// Import Rust code generated from .proto
pub mod proto {
    include!(concat!(env!("OUT_DIR"), "/proto.rs"));
}

// Define return structure
#[derive(StructType)]
struct DataKey {
    stream: String,
    pan: String,
}

// Define parsing function
#[function("decode_proto(bytea) -> struct DataKey")]
fn decode_proto(data: &[u8]) -> Result<DataKey, DecodeError> {
    let data_key = proto::DataKey::decode(data)?;
    Ok(DataKey {
        stream: data_key.stream,
        pan: data_key.pan,
    })
}

In such scenarios, we allow users to create their own Rust projects and compile them into WebAssembly modules. Then, using the create function statement, the WASM module can be directly imported into RisingWave in BASE64 encoding.

\\set wasm_binary `base64 -i target/release/decode.wasm`
create function decode_proto(bytea) returns struct<stream varchar, pan varchar>
language wasm using base64 :'wasm_binary';

For detailed instructions, please refer to the RisingWave documentation.


Example 4: Using JavaScript UDTF to process JSON data


Suppose we want to get the list of contributors to the RisingWave project. We access the Github REST API to get a JSON list and and want to extract the desired information. We can define a UDTF in JavaScript, input the JSON returned by the API, and output the developer's nickname and the number of contributions line by line.

create function contributors(response jsonb)
returns table (name varchar, contributions int)
language javascript as $$
    for (let user of response) {
        yield { name: user.login, contributions: user.contributions };
    }
$$;
select * from contributors('<response>') limit 5;
      name       | contributions
-----------------+---------------
 BugenZhao       |           616
 skyzh           |           482
 TennyZhuang     |           465
 xxchan          |           435
 kwannoel        |           410


Example 5: Using Python external functions to call LLM services


If you want to call the large language model services in stream processing, you can use Python to define external functions. The script below creates a UDF server with a function that calls the OpenAI API to generate text:

from risingwave.udf import udf, UdfServer
from openai import OpenAI

client = OpenAI()

@udf(input_types=["VARCHAR"], result_type="VARCHAR", io_threads=8)
def ask_ai(content):
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": content},
        ],
    )
    return response.choices[0].message.content

if __name__ == "__main__":
    server = UdfServer(location="localhost:8815")
    server.add_function(ask_ai)
    server.serve()

Once the UDF server is running, we can create a function in RisingWave to connect to the Python server.

create function ask_ai(varchar) returns varchar
as ask_ai using link '<http://localhost:8815>';


Summary


Our users can create custom functions in RisingWave using the create function statement. The function needs to specify parameter and return types, then embed implementation code or specify the address of an external function. Depending on the use case, you can choose the appropriate language to implement UDFs.

Use caseSuitable UDF
Combining functions, simplifying queriesSQL
High-performance computingRust
Data format conversionRust / JavaScript / Python
Calling external servicesPython external function
Migrating from systems like FlinkJava external function


Implementation


RisingWave UDFs use Apache Arrow as the data interface format. This is UDFs involve exchanging data across different languages and processes, and Arrow has become the widely accepted standard for this purpose. Using Arrow allows us to leverage its existing ecosystem. For instance, we use Arrow Flight RPC, pyarrow, and Arrow Java API to implement external function frameworks for Python and Java. Additionally, our UDF development framework can be used by other projects based on Arrow.

Recently, we extracted all UDF implementation code from RisingWave and released it as an independent project arrow-udf to the community. Databend quickly used this framework to implement Python, JavaScript, and WASM UDF support for their system. A community developer also added Deno backend support for JavaScript UDFs. Currently, this UDF ecosystem is taking shape and is expected to develop further with the community's help. If you are developing a data processing system in Rust, you are welcome to try integrating arrow-udf! It can be used for both UDFs and built-in functions.

Close
Featured Module structure of various UDFs in RisingWave

After discussing the interface, the implementation work is relatively trivial. For embedded UDFs, including Rust (WebAssembly), JavaScript, and Python, we embed wasmtime, quickjs/deno, and CPython as the language runtime, respectively. For external functions, we use Arrow Flight RPC to send data to the remote UDF process, which uses our provided Python SDK and Java SDK for development. In either form, there is glue code responsible for converting data from Arrow to the language's native types, ultimately calling the user-defined function. Essentially, this is an FFI (Foreign Function Interface) task.

We will introduce the specific implementation of Python external functions and Rust UDFs based on WebAssembly in the next two articles. Stay tuned!

Avatar

Runji Wang

Software Engineer

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.