Retrieval Augmented Generation (RAG) enhances AI responses by combining the generative AI model’s capabilities with information from external data sources, rather than relying solely on the model’s built-in knowledge. In this post, we showcase the custom data connector capability in Amazon Bedrock Knowledge Bases that makes it straightforward to build RAG workflows with custom input data. Through this capability, Amazon Bedrock Knowledge Bases supports the ingestion of streaming data, which means developers can add, update, or delete data in their knowledge base through direct API calls.
Think of the examples of clickstream data, credit card swipes, Internet of Things (IoT) sensor data, log analysis and commodity prices—where both current data and historical trends are important to make a learned decision. Previously, to feed such critical data inputs, you had to first stage it in a supported data source and then either initiate or schedule a data sync job. Based on the quality and quantity of the data, the time to complete this process varied. With custom data connectors, you can quickly ingest specific documents from custom data sources without requiring a full sync and ingest streaming data without the need for intermediary storage. By avoiding time-consuming full syncs and storage steps, you gain faster access to data, reduced latency, and improved application performance.
However, with streaming ingestion using custom connectors, Amazon Bedrock Knowledge Bases processes such streaming data without using an intermediary data source, making it available almost immediately. This feature chunks and converts input data into embeddings using your chosen Amazon Bedrock model and stores everything in the backend vector database. This automation applies to both newly created and existing databases, streamlining your workflow so you can focus on building AI applications without worrying about orchestrating data chunking, embeddings generation, or vector store provisioning and indexing. Additionally, this feature provides the ability to ingest specific documents from custom data sources, all while reducing latency and alleviating operational costs for intermediary storage.
Amazon Bedrock
Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies such as Anthropic, Cohere, Meta, Stability AI, and Amazon through a single API, along with a broad set of capabilities you need to build generative AI applications with security, privacy, and responsible AI. Using Amazon Bedrock, you can experiment with and evaluate top FMs for your use case, privately customize them with your data using techniques such as fine-tuning and RAG, and build agents that execute tasks using your enterprise systems and data sources.
Amazon Bedrock Knowledge Bases
Amazon Bedrock Knowledge Bases allows organizations to build fully managed RAG pipelines by augmenting contextual information from private data sources to deliver more relevant, accurate, and customized responses. With Amazon Bedrock Knowledge Bases, you can build applications that are enriched by the context that is received from querying a knowledge base. It enables a faster time to product release by abstracting from the heavy lifting of building pipelines and providing you an out-of-the-box RAG solution, thus reducing the build time for your application.
Amazon Bedrock Knowledge Bases custom connector
Amazon Bedrock Knowledge Bases supports custom connectors and the ingestion of streaming data, which means you can add, update, or delete data in your knowledge base through direct API calls.
Solution overview: Build a generative AI stock price analyzer with RAG
For this post, we implement a RAG architecture with Amazon Bedrock Knowledge Bases using a custom connector and topics built with Amazon Managed Streaming for Apache Kafka (Amazon MSK) for a user who may be interested to understand stock price trends. Amazon MSK is a streaming data service that manages Apache Kafka infrastructure and operations, making it straightforward to run Apache Kafka applications on Amazon Web Services (AWS). The solution enables real-time analysis of customer feedback through vector embeddings and large language models (LLMs).
The following architecture diagram has two components:
Preprocessing streaming data workflow noted in letters on the top of the diagram:
- Mimicking streaming input, upload a .csv file with stock price data into MSK topic
- Automatically trigger the consumer AWS Lambda function
- Ingest consumed data into a knowledge base
- Knowledge base internally using embeddings model transforms into vector index
- Knowledge base internally storing vector index into the vector database
Runtime execution during user queries noted in numerals at the bottom of the diagram:
- Users query on stock prices
- Foundation model uses the knowledge base to search for an answer
- The knowledge base returns with relevant documents
- User answered with relevant answer
Implementation design
The implementation follows these high-level steps:
- Data source setup – Configure an MSK topic that streams input stock prices
- Amazon Bedrock Knowledge Bases setup – Create a knowledge base in Amazon Bedrock using the quick create a new vector store option, which automatically provisions and sets up the vector store
- Data consumption and ingestion – As and when data lands in the MSK topic, trigger a Lambda function that extracts stock indices, prices, and timestamp information and feeds into the custom connector for Amazon Bedrock Knowledge Bases
- Test the knowledge base – Evaluate customer feedback analysis using the knowledge base
Solution walkthrough
To build a generative AI stock analysis tool with Amazon Bedrock Knowledge Bases custom connector, use instructions in the following sections.
Configure the architecture
To try this architecture, deploy the AWS CloudFormation template from this GitHub repository in your AWS account. This template deploys the following components:
- Functional virtual private clouds (VPCs), subnets, security groups and AWS Identity and Access Management (IAM) roles
- An MSK cluster hosting Apache Kafka input topic
- A Lambda function to consume Apache Kafka topic data
- An Amazon SageMaker Studio notebook for granular setup and enablement
Create an Apache Kafka topic
In the precreated MSK cluster, the required brokers are deployed ready for use. The next step is to use a SageMaker Studio terminal instance to connect to the MSK cluster and create the test stream topic. In this step, you follow the detailed instructions that are mentioned at Create a topic in the Amazon MSK cluster. The following are the general steps involved:
- Download and install the latest Apache Kafka client
- Connect to the MSK cluster broker instance
- Create the test stream topic on the broker instance
Create a knowledge base in Amazon Bedrock
To create a knowledge base in Amazon Bedrock, follow these steps:
- On the Amazon Bedrock console, in the left navigation page under Builder tools, choose Knowledge Bases.
- To initiate knowledge base creation, on the Create dropdown menu, choose Knowledge Base with vector store, as shown in the following screenshot.
- In the Provide Knowledge Base details pane, enter
BedrockStreamIngestKnowledgeBase
as the Knowledge Base name. - Under IAM permissions, choose the default option, Create and use a new service role, and (optional) provide a Service role name, as shown in the following screenshot.
- On the Choose data source pane, select Custom as the data source where your dataset is stored
- Choose Next, as shown in the following screenshot
- On the Configure data source pane, enter
BedrockStreamIngestKBCustomDS
as the Data source name. - Under Parsing strategy, select Amazon Bedrock default parser and for Chunking strategy, choose Default chunking. Choose Next, as shown in the following screenshot.
- On the Select embeddings model and configure vector store pane, for Embeddings model, choose Titan Text Embeddings v2. For Embeddings type, choose Floating-point vector embeddings. For Vector dimensions, select 1024, as shown in the following screenshot. Make sure you have requested and received access to the chosen FM in Amazon Bedrock. To learn more, refer to Add or remove access to Amazon Bedrock foundation models.
- On the Vector database pane, select Quick create a new vector store and choose the new Amazon OpenSearch Serverless option as the vector store.
- On the next screen, review your selections. To finalize the setup, choose Create.
- Within a few minutes, the console will display your newly created knowledge base.
Configure AWS Lambda Apache Kafka consumer
Now, using API calls, you configure the consumer Lambda function so it gets triggered as soon as the input Apache Kafka topic receives data.
- Configure the manually created Amazon Bedrock Knowledge Base ID and its custom Data Source ID as environment variables within the Lambda function. When you use the sample notebook, the referred function names and IDs will be filled in automatically.
- When it’s completed, you tie the Lambda consumer function to listen for events in the source Apache Kafka topic:
Review AWS Lambda Apache Kafka consumer
The Apache Kafka consumer Lambda function reads data from the Apache Kafka topic, decodes it, extracts stock price information, and ingests it into the Amazon Bedrock knowledge base using the custom connector.
- Extract the knowledge base ID and the data source ID:
- Define a Python function to decode input events:
- Decode and parse required data on the input event received from the Apache Kafka topic. Using them, create a payload to be ingested into the knowledge base:
- Ingest the payload into Amazon Bedrock Knowledge Bases using the custom connector:
Testing
Now that the required setup is done, you trigger the workflow by ingesting test data into your Apache Kafka topic hosted with the MSK cluster. For best results, repeat this section by changing the .csv input file to show stock price increase or decrease.
- Prepare the test data. In my case, I had the following data input as a .csv file with a header.
ticker | price |
OOOO | $44.50 |
ZVZZT | $3,413.23 |
ZNTRX | $22.34 |
ZNRXX | $208.76 |
NTEST | $0.45 |
ZBZX | $36.23 |
ZEXIT | $942.34 |
ZIEXT | $870.23 |
ZTEST | $23.75 |
ZVV | $2,802.86 |
ZXIET | $63.00 |
ZAZZT | $18.86 |
ZBZZT | $998.26 |
ZCZZT | $72.34 |
ZVZZC | $90.32 |
ZWZZT | $698.24 |
ZXZZT | $932.32 |
- Define a Python function to put data to the topic. Use pykafka client to ingest data:
- Read the .csv file and push the records to the topic:
Verification
If the data ingestion and subsequent processing is successful, navigate to the Amazon Bedrock Knowledge Bases data source page to check the uploaded information.
Querying the knowledge base
Within the Amazon Bedrock Knowledge Bases console, you have access to query the ingested data immediately, as shown in the following screenshot.
To do that, select an Amazon Bedrock FM that you have access to. In my case, I chose Amazon Nova Lite 1.0, as shown in the following screenshot.
When it’s completed, the question, “How is ZVZZT trending?”, yields the results based on the ingested data. Note how Amazon Bedrock Knowledge Bases shows how it derived the answer, even pointing to the granular data element from its source.
Cleanup
To make sure you’re not paying for resources, delete and clean up the resources created.
- Delete the Amazon Bedrock knowledge base.
- Delete the automatically created Amazon OpenSearch Serverless cluster.
- Delete the automatically created Amazon Elastic File System (Amazon EFS) shares backing the SageMaker Studio environment.
- Delete the automatically created security groups associated with the Amazon EFS share. You might need to remove the inbound and outbound rules before they can be deleted.
- Delete the automatically created elastic network interfaces attached to the Amazon MSK security group for Lambda traffic.
- Delete the automatically created Amazon Bedrock Knowledge Bases execution IAM role.
- Stop the kernel instances with Amazon SageMaker Studio.
- Delete the CloudFormation stack.
Conclusion
In this post, we showed you how Amazon Bedrock Knowledge Bases supports custom connectors and the ingestion of streaming data, through which developers can add, update, or delete data in their knowledge base through direct API calls. Amazon Bedrock Knowledge Bases offers fully managed, end-to-end RAG workflows to create highly accurate, low-latency, secure, and custom generative AI applications by incorporating contextual information from your company’s data sources. With this capability, you can quickly ingest specific documents from custom data sources without requiring a full sync, and ingest streaming data without the need for intermediary storage.
Send feedback to AWS re:Post for Amazon Bedrock or through your usual AWS contacts, and engage with the generative AI builder community at community.aws.
About the Author
Prabhakar Chandrasekaran is a Senior Technical Account Manager with AWS Enterprise Support. Prabhakar enjoys helping customers build cutting-edge AI/ML solutions on the cloud. He also works with enterprise customers providing proactive guidance and operational assistance, helping them improve the value of their solutions when using AWS. Prabhakar holds eight AWS and seven other professional certifications. With over 22 years of professional experience, Prabhakar was a data engineer and a program leader in the financial services space prior to joining AWS.