Dynamic SQL processing with Apache Flink
In this blog post, I would like to cover the hidden possibilities of dynamic SQL processing using the current Flink implementation. I will showcase a…
Read moreIn the ever-evolving world of data analytics, businesses are continuously seeking innovative methods to unlock hidden value from their data. A significant portion of the data housed in many data warehouses is unstructured, with natural language text constituting a large percentage. Traditionally, data warehouses have relied heavily on static rules and regular expressions (regexes) developed by data analysts to parse and process natural language text. However, with the introduction of Large Language Models (LLMs), things have changed. LLMs open up a world of possibilities for understanding, handling and drawing value from text data (and not only), taking our ability to enrich the data to a whole new level.
In this blog post, we aim to provide a comprehensive guide on how you can leverage LLMs and BigQuery to bring additional value to your data and boost your analytical capabilities. We'll cover use-cases, architectural considerations, implementation steps, and evaluation methods to ensure you're fully equipped to use this powerful combination in your own data processes.
The full implementation of the use-case described in this article is available here: https://github.com/pilis/dbt-bigquery-llm-enrichment-example
Imagine that you are responsible for moderating and maintaining the quality of comments published on your social platform like Hacker News. Your task is to assess and remove comments that violate service rules.
Currently, this function is performed by a moderator who reviews all violations reported by users once a day and manually decides whether to remove a comment or not, based on their knowledge of the service rules.
As the number of users increases, and with it the number of reported violations, it seems useful to optimise the process so that the moderator's work is more effective. Also, you would want the moderator to only deal with hard, non-obvious cases.
The company owns a simple data warehouse based on BigQuery, which is used for current reporting. Due to its small scale, the company does not have dedicated data scientists and a large budget for automating such a process, hence solutions that can be implemented quickly and at a low cost will be gratefully received.
Let's assume that in BigQuery we have a table called "comments", which contains the history of all comments published by users on the social media platform.
Our solution will come down to extending the existing “comments” table with a column whose content will be generated by the LLM model, based on other data from the same record.
Below is a simple diagram of the solution architecture:
The key components of this integration are:
Many LLM models have limitations on the number of queries and tokens per minute. For example, OpenAI's GPT-4 has limitations of 40 RPM (requests per minute) and 40k TPM (tokens per minute). This is a serious constraint that prevents the processing of a large table. A solution to this problem could be to use incremental models in such a way that we process fewer records at a time, than the assumed limits. If we want to process more data, we just need to run the model more times, until all the data is processed. Additionally, during the development of the dbt model, we can limit the number of processed records to a minimum.
Depending on the requirements, the aforementioned architecture can be easily adapted to higher data security demands. This can be achieved by using the PaLM model hosted on Vertex AI, or the custom self-deployed model on Vertex AI, which provides a greater degree of data privacy compared to OpenAI, where requests are transported over public internet. “Generative AI support on Vertex AI” is also HIPAA certified.
For demo purposes, we’ll simulate the described “example scenario” by using comments from the public Hacker News dataset published on BigQuery marketplace.
Link to a public dataset on GCP Marketplace:
https://console.cloud.google.com/marketplace/details/y-combinator/hacker-news
The starting point in implementing a use case based on a Large Language Model (LLM) should be designing the prompt and interaction with the model. Given the limitations of LLM models such as context length, task complexity and hallucinations, it's valuable to assess in the first step whether our tasks can be satisfactorily implemented.
Below is a screenshot from the OpenAI playground, where different versions of prompts and model parameters can be easily tested. A similar solution is available in Generative AI Studio for those wanting to test Google's PaLM model:
In the system prompt, you outline the functionality of your "worker" by:
In the user prompt, you specify the particular instance of the task that the 'worker' is expected to perform. In this scenario, the prompt will be generated for each row from the BigQuery table, allowing you to include values from various columns and provide additional instructions on the expected response, supplementing the guidelines stated in the system prompt.
`call_llm_model`
Cloud Function's main responsibility is to act as an interface between BigQuery Remote Functions and LLM API. Within this function, we can create an environment that integrates with various LLM models, logs requests or monitors and controls the number of requests, so as not to avoid a cost explosion by processing a large amount of data.
In the current version, we have implemented integration with OpenAI GPT-4 as a state-of-the-art solution, as well as integration with Google's PaLM model available through Vertex AI. Depending on the application, we can choose between a more powerful model, but one that communicates via the public internet, or PaLM, available within Google's infrastructure, thereby providing greater security.
The Cloud Function is configured as follows: we're using the Gen2 runtime, which effectively uses Cloud Run under the hood and permits concurrent requests on a single instance, unlike the Gen1 runtime. The timeout is set to the maximum value of 300 seconds, which should be increased proportionally when expecting longer response times. We've capped the maximum instances at 10 and the concurrency at 20, enabling the handling of roughly 200 simultaneous requests—a feature that requires further tuning. Additionally, we have utilised secrets/environment variables to supply API keys, such as for OpenAI.
Link to GitHub with full code and deployment instructions: https://github.com/pilis/dbt-bigquery-llm-enrichment-example/tree/main/remote_functions/call_llm_model
BigQuery Remote Function allows calling an external service (Cloud Function or Cloud Run) for each record in a BigQuery table.
When creating a Remote Function, we can specify the endpoint of the Cloud Function. Thanks to the max_batching_rows parameter, we can determine how many records we want to process within a single request. In the user_defined_context, we can pass a configuration independent of function calls, such as: the LLM model we will use, or the prompt system.
Here’s the DDL statement to create Remote Function in BigQuery:
CREATE OR REPLACE FUNCTION `gcp_project.bq_dataset`.call_llm_model(prompt STRING) RETURNS STRING
REMOTE WITH CONNECTION `gcp_project.us.cloud_resources_connection`
OPTIONS (
endpoint = 'YOUR_CLOUD_FUNCTION_ENDPOINT_URL',
max_batching_rows = 10,
user_defined_context = [
("system_prompt", "{{ system_prompt_sanitized }}"),
("model", "openai-gpt-4")
]
)
dbt (data build tool) is an open-source command-line tool that enables data analysts and engineers to transform raw data into analysis-friendly structures using SQL. It helps manage complex data transformations, test data quality and document datasets, making it a core component in modern data analytics stacks.
Let's assume that we already have a "hacker_news_comments
" dbt model and we want to extend it with the columns: moderation_decision
and moderation_explanation
. These columns will be generated by the LLM.
Due to the limitations on the number of requests to the LLM API, we are not able to process an arbitrary number of records in a single run of the dbt model. One solution is to use incremental materialisation, where we can process successive segments of a larger table, always starting from the oldest unprocessed data. Additionally, to quickly and frequently test changes, we can limit the number of processed records to the last few, which will help reduce costs.
The presented example is a complete solution that includes system and user prompt definitions, the definition of the connection with Cloud Function through Remote Functions, incremental materialisation and parsing responses into separate columns:
{{
config(
materialized = 'incremental',
unique_key = 'id',
partition_by = {
'field': 'created_at',
'data_type': 'timestamp',
'granularity': 'day'
}
)
}}
{% set start_date = "2022-11-15" %}
{% set max_rows_batch = 10 %}
{% set max_rows_total = 20 %}
{% set user_prompt = 'PUT_YOUR_USER_PROMPT_HERE' %}
{% set system_prompt %}
PUT_YOUR_SYSTEM_PROMPT_HERE
{% endset %}
{% set system_prompt_sanitized = system_prompt|escape|replace('\n', '') %}
-- Prepare the DDL statement to create the Remote Function
{% set create_function_ddl %}
create or replace function `{{ target.project }}.{{ target.dataset }}`.call_llm_model(prompt string) returns string
remote with connection `{{ target.project }}.us.cloud_resources_connection`
options (
endpoint = "{{ var('call_llm_model_cloud_function_url') }}",
max_batching_rows = {{ max_rows_batch }},
user_defined_context = [
("system_prompt", "{{ system_prompt_sanitized }}"),
("model", "openai-gpt-4")
]
)
{% endset %}
-- Execute the DDL statement to create the Remote Function
{% call statement(name, fetch_result=False) %}
{{ create_function_ddl }}
{% endcall %}
with
-- source_table is a table that contains the data that we want to use to make predictions (features)
comments_batched as (
select *
from {{ ref('hacker_news_comments') }}
where
created_at >= '{{ start_date }}'
{% if target.name == 'production' %}
{% if is_incremental() %}
and created_at >= coalesce((select max(created_at) from {{ this }}), '1900-01-01')
{% endif %}
{% endif %}
order by created_at
{% if target.name == 'production' %}
limit {{ max_rows_total }}
{% else %}
limit 1 -- For testing purposes limit the number of rows to 1
{% endif %}
),
user_prompt_added as (
select
*,
replace('{{ user_prompt }}', "TO_REPLACE", content) as prompt
from comments_batched
),
enriched as (
select
*,
`{{ target.project }}.{{ target.dataset }}`.call_llm_model(prompt) as response
from user_prompt_added
),
final as (
select
* except (response),
json_extract_scalar(response, "$.decision") as moderation_decision,
json_extract_scalar(response, "$.explanation") as moderation_explanation
from enriched
)
select * from final
Thanks to our integration, we can easily and repeatedly generate responses for a large number of examples (records in the table). The simplest way to assess the results is through manual inspection by a domain expert, for instance, in the form of a dashboard presenting the latest predictions.
An example of a prediction made by the PaLM model showing two classes: REJECT and APPROVE
An example of a prediction made by the OpenAI GPT-4 model showing two classes: REJECT and APPROVE
Another idea is to have a domain expert label a portion of the dataset and compare it with the predictions. In the case of predictions where the values belong to a closed set, such as the APPROVE and REJECT classes as in the presented example, we can calculate evaluation metrics like accuracy, precision, recall, F1 score, and the confusion matrix. The approach to evaluating the model depends on the task at hand and requires an individual approach.
An interesting idea would be the integration with a model registry (e.g., MLflow), tracking the model version along with its metrics. Then, by changing the prompt or LLM parameters, we could assess whether we are moving in the right direction by comparing with previous configurations.
An alternative solution is to use the ML.GENERATE_TEXT function in BigQuery, which allows direct access to the text-bison model. This is a decidedly simpler solution to use, but limited to one model. The solution described in this blog post, however, allows for integration with any LLM model and greater customization.
This post detailed the integration of Large Language Models (LLMs) with Google's BigQuery for data enrichment. By leveraging Cloud Functions and BigQuery Remote Functions, we easily interfaced BigQuery with LLM APIs. We demonstrated how dbt can help with data transformations, and addressed limitations and security concerns of LLMs. In essence, LLMs with BigQuery offer an easy to deploy and cost-effective solution for enhancing data analysis capabilities.
Would you like to know more about integration between LLM and BigQuery? If you want to discuss this or any other topics, feel free to sign up for a 30 minutes free consultation with our experts.
In this blog post, I would like to cover the hidden possibilities of dynamic SQL processing using the current Flink implementation. I will showcase a…
Read moreFeature Stores are becoming increasingly popular tools in the machine learning environment, serving to manage and share the features needed to build…
Read moreWe are proud to present you our first e-book, created by GetInData specialists. Apache NiFi: A Complete Guide is the result of long and fruitful work…
Read moreFlink is an open-source stream processing framework that supports both batch processing and data streaming programs. Streaming happens as data flows…
Read moreIt's been a year since the announcement of the dbt-flink-adapter, and the concept of enabling real-time analytics with dbt and Flink SQL is simply…
Read moreTrend 4. Larger clouds over the Big Data landscape A decade ago, only a few companies ran their Big Data infrastructure and pipelines in the public…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?