Building Trustful AI — Bedrock Agent — Improving Data Pipeline

ML-Guy
12 min readJun 6, 2024

--

It is easy to create a simple demo for a chat interface based on generative AI to query data in natural language. However, applying it to larger, more complex data “in the wild” is much harder. In this post, we will cover the various options to improve the solution, including getting the data into S3, creating folder structure to support scale, enriching the data catalog metadata, adjusting the Lambda function as the Group Action of the Bedrock agent, and adding information to the agent instructions. These improvements should help you implement trustful AI services in your organization on real-life internal and external data as steps toward benefiting from generative AI technologies by providing real business value.

This is part of an online course: “Building AI with Bedrock Agent” in O’Reilly and EdX.

Happy Path

One of the main complaints against the new generative AI technology is that it is easy to create an impressive demo. Still, it is hard to turn it into a successful business application. An application that can build a natural language chat interface for a data set is a good example. Building an interface that can answer business questions without any SQL experience is surprisingly easy. For example, the following interface can be built in 15 minutes.

Chat interface based on generative AI to answer natural language questions

You can try it yourself by following the instructions in this GitHub repository: https://github.com/guyernest/bedrock-agent

The solution is based on the following architecture:

Chat interface for natural language query solution architecture on AWS

AWS Glue crawls the data files uploaded to S3 to discover their schema. The Lambda function is used as the generative AI agent’s action to retrieve the schema and the results of the SQL queries generated by the LLM to questions sent from the chat UI hosted on App Runner. Using AWS CDK as the deployment tool, you can build this solution in your cloud environment in less than 15 minutes.

The solution works well since the dataset selected is small and well-defined, such that the AWS Glue crawler can analyze it easily, and the LLM used by the Bedrock agent can query it successfully with a couple of trials. However, building the solution becomes harder if we try to apply this solution to a larger and more complex dataset. The agent performs poorly, trying multiple queries and mostly failing to find the answer.

Failure of the AI agent to answer the question with multiple trials

“In the Wild”

We don’t live in a simple world. Let’s see what happens when we try to use this solution with a different dataset, and solve in various parts of the data pipeline the problems we face. Overall, we will improve the way we (1) get the dataset to S3, (2) organize the data in S3, (3) create the data schema and metadata, (4) tell the LLM about the schema, and finally (5) instruct the LLM how to understand and query the data.

How to improve data pipeline for generative AI SQL expert agent

(1) Handling Large Dataset

Let’s try to apply the solution to NBA play-by-play data, which can be found here. This dataset is more interesting as it is much larger (~1.5GB), wider (more than 30 columns), and “dirtier” (missing header, different event types, etc.).

NBA Basketball Dataset website

For a small set of files, it is not a problem to upload them through the management console or using aws s3 cp AWS CLI command. However, this is not always practical for a larger data set with more large files. For security reasons, it is not always possible to download the files to the local machine, and the local network can also be a bottleneck for uploading large files.

A good solution for that problem is to use CloudShell, a browser-based cloud shell. Instead of downloading the files from the above source to the local machine, we can use the local directory of the CloudShell instance and use better connectivity within the cloud infrastructure to upload the files to S3.

You can open the CloudShell from the footer of the AWS management console and use the following script to copy all the data files from the website to your S3 bucket:

export base_url="https://sports-statistics.com/database/basketball-data/nba"
for year in {2000..2019}; do
next_year=$((year + 1));
season="${year}-${next_year:2:2}_pbp.csv";
url="${base_url}/${season}";
wget -P . "$url";
aws s3 cp "${season}" "s3://raw-data-<REGION>-<ACCOUNT>/data/play-by-play/year=${year}/";
rm "${season}";
done
CloudShell with copy script of data from a website to S3

(2) Using Data Partitions

We don’t need to think much about organizing the files for a small data set. However, when the data set grows, it is important to consider the best way to create and store it. We want multiple files since we use a distributed SQL engine with Amazon Athena. The NBA play-by-play dataset is already split into 20 yearly files; we don’t have to split further. Athena is also a serverless service, and its pricing is based on the amount of data scanned. Therefore, it is recommended to partition the data based on how you plan to query it, usually over time. Such time-based partition also supports adding newer data over time.

The partition process has two steps: the first is to create the folder structure in S3 to allow the Athena engine to filter the files it needs to scan, and the second is to define the partition in the table schema in the Glue data catalog. The Glue crawler can pick up the partition information from the S3 folder structure. In the script above, we create the folder structure and naming in line 7 (note the year=${year}/ in the folder name in S3):

aws s3 cp "${season}" "s3://raw-data-<REGION>-<ACCOUNT>/data/play-by-play/year=${year}/";
S3 folders with partition names for the data folders

We will see the partition definition in the schema below. With such a partition, a query that will use in its WHERE clause year=2004 , will scan only 5% of the 20-year data, and its cost will be 95% cheaper.

(3) Enriching Data Schema

In the happy path, the AWS Glue crawler discovered the data structure and created a useful schema with column names and types. However, for the more complex data set of the NBA play-by-play, the crawler was less successful, and the table schema couldn’t provide useful metadata information to the generative AI model to generate correct SQL queries to the data.

Useless column names in the default Glue crawler output

Until AWS decides to upgrade its crawler to use LLM and other AI models, it is up to us to fix the schema. A simple way to do that is to paste a sample of the data into one of the LLM interfaces in Bedrock or ChatGPT and ask for the CREATE EXTERNAL TABLE statement:

CREATE EXTERNAL TABLE statement from ChatGPT

Please note that we should not take the output of the LLM and use it without changes. For example, the AI model ignored the first index column since it didn’t have a header, and it couldn’t see the partition column of year that is not visible in the data. It is a good example of the collaboration between artificial intelligence, which can create the correct SQL structure, and human intelligence, which can add information about the context.

You can also ask the LLM to add comments for each one of the columns, and you will get something similar to the following statement:

CREATE EXTERNAL TABLE `play_by_play`(
`idx` bigint,
`EVENTMSGACTIONTYPE` int COMMENT 'Specific type of action within the event message type.',
`EVENTMSGTYPE` int COMMENT 'General type of event (e.g., shot, foul, turnover).',
`EVENTNUM` int COMMENT 'Sequential event number within the game.',
`GAME_ID` bigint COMMENT 'Unique identifier for the game.',
`HOMEDESCRIPTION` string COMMENT 'Description of the play involving the home team.',
`NEUTRALDESCRIPTION` string COMMENT 'Neutral description of the play.',
`PCTIMESTRING` string COMMENT 'Time remaining in the period.',
`PERIOD` string COMMENT 'Period of the game (quarters, overtime).',
`PERSON1TYPE` string COMMENT 'Role type of the first person involved in the event.',
`PERSON2TYPE` string COMMENT 'Role type of the second person involved in the event.',
`PERSON3TYPE` string COMMENT 'Role type of the third person involved in the event.',
`PLAYER1_ID` string COMMENT 'ID of the first player involved.',
`PLAYER1_NAME` string COMMENT 'Name of the first player involved.',
`PLAYER1_TEAM_ABBREVIATION` string COMMENT 'Team abbreviation for the first player.',
`PLAYER1_TEAM_CITY` string COMMENT 'City of the first player’s team.',
`PLAYER1_TEAM_ID` string COMMENT 'Team ID for the first player.',
`PLAYER1_TEAM_NICKNAME` string COMMENT 'Nickname of the first player’s team.',
`PLAYER2_ID` string COMMENT 'ID of the second player involved.',
`PLAYER2_NAME` string COMMENT 'Name of the second player involved.',
`PLAYER2_TEAM_ABBREVIATION` string COMMENT 'Team abbreviation for the second player.',
`PLAYER2_TEAM_CITY` string COMMENT 'City of the second player’s team.',
`PLAYER2_TEAM_ID` string COMMENT 'Team ID for the second player.',
`PLAYER2_TEAM_NICKNAME` string COMMENT 'Nickname of the second player’s team.',
`PLAYER3_ID` string COMMENT 'ID of the third player involved.',
`PLAYER3_NAME` string COMMENT 'Name of the third player involved.',
`PLAYER3_TEAM_ABBREVIATION` string COMMENT 'Team abbreviation for the third player.',
`PLAYER3_TEAM_CITY` string COMMENT 'City of the third player’s team.',
`PLAYER3_TEAM_ID` string COMMENT 'Team ID for the third player.',
`PLAYER3_TEAM_NICKNAME` string COMMENT 'Nickname of the third player’s team.',
`SCORE` string COMMENT 'Current score after the event.',
`SCOREMARGIN` string COMMENT 'Score margin after the event. The format is home - visitor, for example 4 - 6',
`VISITORDESCRIPTION` string COMMENT 'Description of the play involving the visiting team.',
`WCTIMESTRING` string COMMENT 'Wall clock time when the event occurred.')
PARTITIONED BY (`year` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'quoteChar'='\"',
'separatorChar'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://raw-data-<REGION>-<ACCOUNT>/data/play-by-play/'
TBLPROPERTIES (
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'skip.header.line.count'='1',
'typeOfData'='file')

We can use the Athena Query Editor to make these changes by executing DROP TABLE play_by_play , and then executing the CREATE statement above. Remember that the DROP TABLE doesn’t delete the data in S3 and that the Glue data catalog only maps the data in S3 for the Athena SQL engine. Also, don’t forget to load the data in the partitions using MSCK REPAIR TABLE `play_by_play`;.

(4) Enriching Lambda Action Reply

The Lambda function action serves the AI agent as the way to query database schema and data and is designed to be agnostic to the dataset. It has two main functionalities: the first is get-schema, and the second is execute-query. The original get-schema function in the solution was minimalist, returning only the table's and columns' names.

def get_schema() -> List:
tables = (
wr
.catalog
.tables(database=database_name)
[["Table","Description","Columns"]]
.to_dict('records')
)
return tables

We can extend the functionality to include the comments on the columns and the types we added above to give the agent more information about them.

def get_schema() -> Dict:
# Initialize the JSON structure
database_info = {"database": "bedrock_agent", "tables": []}

# Fetch tables and descriptions
tables_info = (
wr
.catalog
.tables(database='bedrock_agent')
[["Table", "Description"]]
)

# Iterate over each table to fetch column details
for _, row in tables_info.iterrows():
table_name = row["Table"]
table_description = row["Description"]

# Fetch column details
columns_info = (
wr
.catalog
.table(database="bedrock_agent", table=table_name)
[["Column Name", "Type", "Comment"]]
)

# Add table and column details to the JSON structure
table_info = {
"Table": table_name,
"Description": table_description,
"Columns": []
}

for _, col_row in columns_info.iterrows():
column_info = {
"Column Name": col_row["Column Name"],
"Type": col_row["Type"],
"Comment": col_row["Comment"]
}
table_info["Columns"].append(column_info)

database_info["tables"].append(table_info)

return database_info

Please note that such changes in the Lambda function highlight some of the power of LLM-based applications, which are more resilient to changes in API serialized into a string the LLM can parse and understand.

(5) Enriching Agent Instruction Prompt

The NBA play-by-play dataset includes various events during a basketball game, such as shots, fouls, timeouts, steals, or substitutions. Understanding the meaning of the event types is not possible from the schema, and it requires analyzing the data and enriching the agent instruction prompt with that analysis.

There are various ways to perform that analysis using LLM or manually. The manual analysis should not take too long since only 13 types of events exist. For example, you can perform a word cloud on the description columns for each of the EVENTMSGTYPE column:

from wordcloud import WordCloud
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS

# Convert to a single string
text = ' '.join(descriptions)
# Function to tokenize and clean text
def preprocess_text(text):
# Split text into words and convert to lowercase
words = text.lower().split()
# Remove common stop words
words = [word for word in words if word not in ENGLISH_STOP_WORDS]
return ' '.join(words)
# Preprocess the combined text
processed_text = preprocess_text(text)

# Generate the word cloud
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(processed_text)
# Display the word cloud using matplotlib
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.show()

Which will generate an image such as the following for event type 6:

Word Cloud for event type 6 of the NBA play-by-play data

You can find the full analysis notebook in the GitHub repository here.

After analyzing the 13 event types, we can add the following paragraph to the agent instruction:

Note that the play_by_play table includes the column EVENTMSGTYPE that has the following values and their meanings:
1 - points scored.
2 - shot miss.
3 - free throw socre and miss.
4 - Rebound.
5 - Trunover and steal.
6 - Foul.
7 - Violation.
8 - Subtitution.
9 - Timeout.
10 - Jump ball.
11 - Ejection.
12 - Game/Period start.
13 - Game/Period end.

You can repeat the analysis and the instruction enrichment for other columns as needed.

Updated Instructions for the Agent to include the event type meanings

The decision to include the analysis results in the instruction prompt is based on personal preference. A different data engineer might prefer to create a set of views for each event type and provide the information needed to generate the correct SQL queries through the schema. For example, here is the SQL statement to create the view for the fouls (note that some of the columns were omitted as they are not relevant for fouls):

CREATE VIEW fouls_view AS
SELECT
EVENTNUM,
GAME_ID,
HOMEDESCRIPTION,
NEUTRALDESCRIPTION,
PCTIMESTRING,
`PERIOD`,
PERSON1TYPE,
PERSON2TYPE,
PLAYER1_ID,
PLAYER1_NAME,
PLAYER1_TEAM_ABBREVIATION,
PLAYER1_TEAM_CITY,
PLAYER1_TEAM_ID,
PLAYER1_TEAM_NICKNAME,
PLAYER2_ID,
PLAYER2_NAME,
PLAYER2_TEAM_ABBREVIATION,
PLAYER2_TEAM_CITY,
PLAYER2_TEAM_ID,
PLAYER2_TEAM_NICKNAME,
SCORE,
SCOREMARGIN,
VISITORDESCRIPTION,
WCTIMESTRING
FROM play_by_play
WHERE EVENTMSGTYPE = 6;

Furthermore, you can mix the two approaches and create a view of a few of the types that can benefit from it more, which can also include the parsing of the textual description fields. For example, for the free throws, the description looks like this: Harris Free Throw 1 of 2 (12 PTS) , and the view can parse it to three fields to capture the 1, 2, and 12 values.

Testing the agent

After we perform the changes across the data and agent pipeline, we can test the agent in the AWS management console:

Test Bedrock Agent in the AWS management console.

The multiple steps in the trace on the right show that the agent still had some problems querying the database. Nevertheless, it could understand the natural language question, generate the correct SQL query, extract the needed data, and reply in natural language.

Summary

Building trustful AI on real-life data “in the wild” requires fixes to different system parts, from data ingestion to agent instruction and all the steps in between. In this post, we described some examples of these changes to allow you to evaluate the stability of the end-to-end pipeline and the best places to make improvements to increase generative AI system performance.

Successfully building trustful AI systems requires skills in data engineering, ML engineering, MLOps, and similar new job definitions. It also requires a good understanding of the cloud services and tools, the strengths and weaknesses of LLM, and the ability to architect a solution that allows replacing some of the components as the solution matures.

AI and we both need a guiding hand to take the first steps in the real world of using a young and emerging technology such as LLM and AI in general. We often use too technical tools and services to build a useful and trustful solution with significant usage and business impact. I hope this post is such a hand, and we can continue our journey of building trustful AI systems.

--

--

ML-Guy

Guy Ernest is the co-founder and CTO of @aiOla, a promising AI startup that closes the loop between knowledge, people & systems. He is also an AWS ML Hero.