Join our Streaming Lakehouse Tour!
Register Now.->
Calling External APIs from Streaming SQL: Introducing fetch and Batching in RisingWave JS UDFs

Calling External APIs from Streaming SQL: Introducing fetch and Batching in RisingWave JS UDFs

Stream processing empowers us to react to data in real-time, transforming raw events into valuable insights. RisingWave excels at this with its powerful SQL interface. But what happens when your streaming logic needs to interact with the outside world – to enrich data from an external CRM, get predictions from an AI model, or check inventory against a live database?

Traditionally, making such external calls from within a streaming SQL query efficiently has been a significant challenge. Each call could introduce latency, and handling many calls for high-throughput streams could bottleneck the entire system or become prohibitively expensive if APIs charge per call or have rate limits.

Bridging the Gap: fetch API and Batching in RisingWave v2.3 JS UDFs!

We're excited to announce a major enhancement to JavaScript User-Defined Functions (UDFs) in RisingWave v2.3: direct support for the fetch API and batched execution, specifically designed to make external API calls from your streaming SQL queries powerful, efficient, and cost-effective.

1. fetch API Support: Seamlessly Call External Services

You can now use the standard fetch API directly within your JavaScript UDFs to make network requests to any HTTP endpoint. This unlocks a vast range of possibilities for real-time data enrichment and integration.

  • How it Works (The Role of async):

    To enable fetch, your JavaScript UDF needs to be declared as async and you'll use await for your fetch calls. For example:

      CREATE FUNCTION function_name(arg_name arg_type [, ...])
      RETURNS return_type
      LANGUAGE javascript
      AS $$
      export async function myAsyncFunction(input) {
          const response = await fetch('<https://api.example.com/data?q=>' + input);
          // ... process response
          return result;
      }
      $$ 
      WITH (async = true);
    

Crucially, this async integration means that when your UDF awaits a fetch response, RisingWave's underlying Rust engine can efficiently manage this I/O operation. While the processing of the

specific input row by this UDF instance will wait for the fetch call to complete, RisingWave's executor can continue processing other tasks or other rows in parallel, preventing the entire system from stalling due to many concurrent network requests.

2. Batch Execution: Optimize Your API Calls (Especially for Cost and Throughput)

While fetch allows you to call external APIs, many modern services (like OpenAI, geolocation services, or internal microservices) are far more efficient or cost-effective when you send them data in batches rather than one item at a time.

  • The Motivation: Imagine needing to get embeddings for thousands of text snippets per second using an AI service. Making one API call per snippet would be slow and expensive. Batching allows you to send, say, 100 snippets in a single API request.

  • How it Works:

    You can define your JS UDF to accept an array of inputs and return an array of results. In your CREATE FUNCTION statement, you'll add WITH (batch = true).

      // JS UDF designed for batching
      CREATE FUNCTION function_name(arg_name arg_type [, ...])
      RETURNS return_type
      LANGUAGE javascript
      AS $$
      export function process_batch(inputs_array) {
          // 'inputs_array' is an array of values
          // ... logic to process the batch (e.g., prepare for a batch API call)
          return results_array;
      }
      $$
      WITH (batch = true);
    

RisingWave will then automatically bundle multiple input rows into an array before calling your UDF and expect an array of results back.

  1. Combining fetch with Batch for Maximum Efficiency:

The real power emerges when you combine fetch (enabled by async) with batch execution. This is ideal for interacting with external APIs that support batch requests.

CREATE FUNCTION get_embeddings_batch(texts TEXT[]) -- UDF expects an array
RETURNS JSONB[] -- Returns an array of JSON (e.g., embeddings)
LANGUAGE javascript
AS $$
  export async function get_embeddings_batch(texts_batch) {
    // Example: Call an external embedding service that accepts a batch of texts
    const response = await fetch('<https://api.ai-service.com/v1/embeddings_batch>', {
      method: 'POST',
      headers: {'Content-Type': 'application/json', 'Authorization': 'Bearer YOUR_API_KEY'},
      body: JSON.stringify({ inputs: texts_batch })
    });
    const data = await response.json();
    // Assume data.embeddings is an array of embeddings corresponding to texts_batch
    return data.embeddings;
  }
$$
WITH (
  async = true, -- Necessary for 'await fetch'
  batch = true  -- To process texts and call the API in batches
);

Illustrative Examples: fetch and Batch UDFs in Action

Having covered how to enable fetch (via async = true) and batch processing (via batch = true) in your JavaScript UDFs, let's walk through two practical examples. These demonstrate how you'd structure UDFs for common external API interaction patterns:

Example 1: Calling an External API for Individual Items (using fetch)

Here, your JavaScript function uses async and await fetch to make an individual network request for each input row.

CREATE FUNCTION fetch_user_profile(user_id int)
RETURNS jsonb -- e.g., a JSON object with user details
LANGUAGE javascript
AS $$
  export async function fetch_user_profile(id) {
      const response = await fetch('<https://api.mycrm.com/users/>' + id + '?apikey=SECRET');
      if (!response.ok) {
          // Handle errors, perhaps return null or a default structure
          console.error("API Error:", response.status, await response.text()); // Log error details
          return null; // Or throw an error if appropriate for your stream
      }
      return await response.json();
  }
$$
WITH (
  async = true -- Enable 'await fetch'
);

When you use fetch_user_profile(some_user_id_column) in your SQL query, RisingWave will invoke this UDF for each user ID. The async nature ensures that while one call awaits an API response, RisingWave can efficiently process other rows or tasks.

Example 2: Calling a Batch-Optimized External API (using fetch and batch)

In this scenario, your JavaScript function is designed to accept an array of inputs and return an array of outputs. It uses async/await fetch to make a single API call with the entire batch.

CREATE FUNCTION translate_texts_batch(texts_to_translate varchar[])
RETURNS varchar[]
LANGUAGE javascript
AS $$
  export async function translate_texts_batch(texts) {
      // Hypothetical batch translation API
      const response = await fetch('<https://api.translate.com/batch_translate>', {
          method: 'POST',
          body: JSON.stringify({ texts: texts, target_lang: 'es' }),
          headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer YOUR_TRANSLATION_API_KEY' }
      });
      if (!response.ok) {
          console.error("Batch API Error:", response.status, await response.text());
          // For batch, you might return an array of nulls or throw an error
          // that affects the whole batch, depending on desired behavior.
          // Returning an array of nulls of the same length as input:
          return texts.map(() => null); 
      }
      const translations = await response.json(); // Expects an array of translated strings
      // Ensure the API returns results in the same order as inputs,
      // or handle mapping if the order is not guaranteed.
      return translations.results; // Assuming the API returns { results: ["hola", "mundo"] }
  }
$$
WITH (
  async = true, -- For 'await fetch'
  batch = true  -- To bundle inputs and call the API in batches
);

Using translate_texts_batch(some_text_column) (typically with GROUP BY or a window function if you need to control batch formation, though RisingWave handles the UDF batching directly) allows RisingWave to send multiple texts to your UDF. The UDF then makes a single, more efficient call to the translation API, ideal for services optimized for batch requests.

Why This Matters: Powerful Use Cases Unlocked

This combination of fetch and batching for JS UDFs opens up a world of possibilities for real-time stream processing:

  1. Real-time Data Enrichment via External APIs:

    • Geo-IP Lookup: Enrich streaming IP addresses with location by calling a Geo-IP service (potentially in batches if the service supports it).

    • User Profile Augmentation: Fetch user details (segment, plan) from an external database or microservice based on user IDs in the event stream.

    • AI-Powered Insights: Get sentiment analysis, entity extraction, or embeddings for text data by calling services like OpenAI, Cohere, or custom model endpoints, leveraging batching for cost and throughput.

  2. External System Integration:

    • Alerting/Notifications: Trigger external alerts (PagerDuty, Slack) based on stream conditions by calling their APIs.

    • Live Feature Flags: Check feature flag status from a service like LaunchDarkly for incoming events.

    • Database Lookups/Writes: Interact with external databases for supplementary data or to write results out.

  3. Cost and Performance Optimization for API-Heavy Workloads:

    • Reduced API Call Costs: Significantly lower costs for APIs that charge per call or offer better pricing for batch requests.

    • Higher Throughput: Process more events per second when interacting with external services by minimizing per-call latency and leveraging batching.

    • Respecting Rate Limits: Batching can help stay within API rate limits by making fewer, larger requests.

Get Started Today!

The fetch API support and batch execution for JavaScript UDFs are available now in RisingWave v2.3. This significantly enhances your ability to build sophisticated, efficient, and well-integrated real-time applications.

Ready to dive deeper?

We're excited to see how you'll use these new UDF capabilities to connect RisingWave to the wider world of data and services!

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.