RisingWave allows users to write user-defined functions (UDFs) using Python and Java. These functions run in separate processes and communicate with RisingWave via RPC. We refer to this type of UDF as an "external function"。 External functions are naturally isolated and flexible: the operation of the function itself does not affect RisingWave. Users can perform any tasks within the process and deploy this process flexibly anywhere. However, this approach also has its drawbacks, such as high RPC latency, complex exception handling, and additional maintenance burdens. In this article, we will discuss the implementation of RisingWave external functions.


User interface


Let's start with the user interface. To create a Python external function in RisingWave, users first need to install the arrow-udf package, define and register the function in a file, and then run this file as a UDF server.

# Import components from the arrow-udf module
from arrow_udf import udf, udtf, UdfServer

# Define a scalar function that returns a single value
@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x, y):
    while y != 0:
        (x, y) = (y, x % y)
    return x

# Define a table function over a Python generator function
@udtf(input_types='INT', result_types='INT')
def series(n):
    for i in range(n):
        yield i

# Start a UDF server
if __name__ == '__main__':
    server = UdfServer(location="0.0.0.0:8815")
    server.add_function(gcd)
    server.add_function(series)
    server.serve()

After the service starts, create a function in RisingWave to establish a connection with the Python server.

CREATE FUNCTION gcd(int, int) RETURNS int
USING LINK '<http://localhost:8815>' AS gcd;

CREATE FUNCTION series(int) RETURNS TABLE (x int)
USING LINK '<http://localhost:8815>' AS series;

You can then call the Python functions like any built-in function.


Internal implementation


Internally, we use the Arrow Flight RPC framework as the communication framework between the core and the server. Based on the Flight RPC interface, we designed a simple UDF calling protocol. The reason for choosing Arrow Flight is that it is an efficient language-independent data exchange protocol with RPC library implementations for common languages. This allows us to quickly build a prototype system without worrying about the specific details of RPC implementation, and it can be easily extended to more languages in the future.

Close
Featured Arrow Flight RPC.

Arrow Flight provides multiple data exchange interfaces. We primarily use two of them:

  1. GetFlightInfo: Retrieves metadata information for a given resource, such as schema. We use it to implement the query function for a given function, which checks if the signature matches when creating a function.
  2. DoExchange: A bidirectional streaming call that can simultaneously send and receive Arrow data. We use it to implement the function call operation.
    1. For scalar functions, the client sends a batch containing function parameters and receives a return value batch with the same number of rows.
    2. For table-valued functions, several variable-length batches are received, representing the table returned by the function.
    3. For aggregate functions, parameters to be aggregated are sent continuously, and an aggregate result is received at the end (we haven't implemented this yet).

To maintain backward compatibility with the UDF server, we need to update the protocol version whenever this calling protocol changes. The server must expose its current protocol version to ensure that the client can call it correctly. Therefore, we later used DoAction to transmit version information.

Although Arrow is used internally for storage and communication, users ultimately write functions using the native types of the language. Thus, the main responsibility of the SDK we provide is to hide the details of Arrow's data format and batch processing, transforming the incoming data step by step and ultimately calling the user's native function.

At this point, the characteristics of different languages result in slight differences in their interfaces and internal implementations.

For dynamically typed languages like Python, we require users to annotate each function's SQL input and output types:

@udf(input_types=['VARCHAR[]', 'INT'], result_type='VARCHAR')
def array_index(array, i):
    # ...

For statically typed languages like Java, we can automatically infer SQL types from Java types using reflection:

public class ArrayIndex implements ScalarFunction {
    public String eval(String[] array, int i) {
        // => array_index(varchar[], int) -> varchar
        // ...
    }
}

Due to the dynamic typing feature of Python, when using pyarrow, Arrow arrays can be directly generated from native lists. However, Java does not support dynamic types, so when using the Arrow Java package, it is necessary to manually enumerate various types and implement corresponding array construction operations. Furthermore, if a language has even lower dynamic flexibility and lacks runtime reflection, like Rust, the issue can only be addressed using procedural macros at compile time.


Discussion


Finally, let's discuss the issues encountered with external functions in applications.

Performance is a key concern. Python code is known for its slow execution speed. Once the user logic becomes slightly complex, each function's runtime increases significantly. Meanwhile, because of Arrow's batch processing feature, the server needs to invoke the function for each row sequentially and then return them all at once. This can result in high latency for a single RPC call. A UDF might be just one part of a large streaming pipeline, but this part could cause the entire streaming process to stall. To prevent this, a simple solution is to implement a timeout mechanism. This means abandoning the wait and filling the result with NULL if a function doesn't return on time. This method is not problematic in batch processing, but in stream processing, it can lead to subsequent issues, such as inconsistent results from two calculations (one timing out and one normal), leading to chaotic subsequent results. Therefore, the feasible approach is to work hard to reduce latency.

To reduce latency under high load, we need to expand processing capacity. Unfortunately, Python's GIL becomes an obstacle again, preventing us from fully utilizing multi-core capabilities through multithreading. Therefore, we can only manually start multiple service processes and use a proxy for load balancing. However, since Arrow Flight relies on gRPC, which depends on the multiplexing feature of HTTP2, uneven load balancing might still occur if not configured correctly. On the other hand, if the user function is not compute-intensive but has IO-intensive external calls, multithreading can solve the problem. We allow users to add an io_threads parameter in the @udf tag of specific functions, enabling an internal thread pool to execute each row concurrently within the same batch, reducing the total RPC latency.

Although computation latency can be reduced through parallelism or concurrency, the overall RPC latency is still inevitably constrained by network communication. Network latency primarily depends on the deployment location of the UDF server and the core. Ideally, they should be deployed on the same node for the lowest latency, but this might lead to resource contention. A more common practice is to deploy them on different nodes within the same data center, allowing communication over the internal network while being relatively easy to manage. It is crucial to avoid deploying them in different data centers, which would introduce very high network latency (potentially up to hundreds of milliseconds).

Conclusion

RisingWave supports Python and Java external function UDFs. Internally, it uses Arrow Flight RPC for communication. The flexibility of external functions allows users to perform almost any task in a UDF, such as accessing local storage or external services. However, it also has the inherent disadvantage of high communication latency. If you only need some pure computational operations, the Rust UDF introduced in the previous article might be a better choice.

Avatar

Runji Wang

Software Engineer

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.