Retrieval-Augmented Generation (RAG) has become the default pattern for enterprise AI applications that need to answer questions about proprietary data — internal wikis, technical documentation, support ticket histories, product catalogs. The concept is straightforward: instead of hoping your LLM memorized the right facts during training, you fetch relevant context at query time and give it to the model alongside the question.

In practice, building a RAG pipeline that actually works in production involves a surprising number of decisions: how to chunk documents, which embedding model to use, how to structure your vector index, how to tune retrieval, and how to keep the whole thing from hallucinating when retrieval comes up empty. This post walks through a complete, production-oriented RAG pipeline using LangChain, AWS Bedrock for both embeddings and generation, and Amazon OpenSearch Serverless as the vector store.

Architecture Overview

The pipeline has two distinct phases that run on different schedules:

Keeping these phases separate is important for cost and reliability. The ingestion pipeline can be a Step Functions state machine or a simple Lambda triggered by S3 events. The query path needs to be fast — sub-second retrieval is achievable with OpenSearch, and Bedrock's Claude Haiku can generate a response in under two seconds for most queries.

Setting Up AWS Bedrock Access

Before writing any code, you need to enable model access in the Bedrock console. This is a one-time step per AWS account:

  1. Navigate to Amazon Bedrock → Model access in the AWS Console.
  2. Request access for amazon.titan-embed-text-v2:0 (embeddings) and anthropic.claude-3-haiku-20240307-v1:0 (generation). Both are usually approved instantly.
  3. Create an IAM role with bedrock:InvokeModel and bedrock:InvokeModelWithResponseStream permissions.
# Verify model access from CLI
aws bedrock list-foundation-models \
  --region us-east-1 \
  --query 'modelSummaries[?contains(modelId, `titan-embed`) || contains(modelId, `claude-3-haiku`)].{id:modelId,status:modelLifecycle.status}'

Creating the OpenSearch Serverless Collection

OpenSearch Serverless with the vectorsearch type is the easiest managed vector store on AWS. You pay per OCU (OpenSearch Compute Unit) per hour with no cluster management overhead. For a typical RAG workload serving a few dozen users, you'll stay well within the 2-OCU minimum.

# Create a vector search collection
aws opensearchserverless create-collection \
  --name rag-knowledge-base \
  --type VECTORSEARCH \
  --description "RAG pipeline vector store"

# Create the required encryption policy first
aws opensearchserverless create-security-policy \
  --name rag-kb-enc \
  --type encryption \
  --policy '{"Rules":[{"ResourceType":"collection","Resource":["collection/rag-knowledge-base"]}],"AWSOwnedKey":true}'

# Create network policy (VPC or public access)
aws opensearchserverless create-security-policy \
  --name rag-kb-net \
  --type network \
  --policy '[{"Rules":[{"ResourceType":"collection","Resource":["collection/rag-knowledge-base"]},{"ResourceType":"dashboard","Resource":["collection/rag-knowledge-base"]}],"AllowFromPublic":true}]'

After the collection is active (2–5 minutes), create the index with the correct knn_vector field. Titan Embed v2 produces 1,024-dimensional vectors by default:

import boto3, json, requests
from requests_aws4auth import AWS4Auth

region = "us-east-1"
service = "aoss"
credentials = boto3.Session().get_credentials()
auth = AWS4Auth(credentials.access_key, credentials.secret_key,
                region, service, session_token=credentials.token)

collection_endpoint = "https://<your-collection-id>.us-east-1.aoss.amazonaws.com"

index_body = {
    "settings": {"index.knn": True},
    "mappings": {
        "properties": {
            "embedding": {
                "type": "knn_vector",
                "dimension": 1024,
                "method": {
                    "name": "hnsw",
                    "space_type": "cosinesimil",
                    "engine": "nmslib"
                }
            },
            "content": {"type": "text"},
            "source": {"type": "keyword"},
            "chunk_id": {"type": "keyword"}
        }
    }
}

resp = requests.put(
    f"{collection_endpoint}/documents",
    auth=auth,
    json=index_body,
    headers={"Content-Type": "application/json"}
)
print(resp.json())

The Ingestion Pipeline

Document loading and chunking

Chunking strategy has an outsized impact on retrieval quality. The most common mistake is using a fixed character count without respecting document structure. Instead, chunk at semantic boundaries — paragraphs, sections, or sentences — and include a small overlap between chunks to avoid cutting context mid-thought.

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import S3DirectoryLoader

# Load documents from S3
loader = S3DirectoryLoader(
    bucket="your-docs-bucket",
    prefix="knowledge-base/",
    region_name="us-east-1"
)
raw_docs = loader.load()

# Chunk with overlap — 512 tokens ~= 400 words, 10% overlap
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1500,       # characters, not tokens
    chunk_overlap=150,
    separators=["\n\n", "\n", ". ", " ", ""],
    keep_separator=True
)
chunks = splitter.split_documents(raw_docs)
print(f"Split {len(raw_docs)} documents into {len(chunks)} chunks")

Embedding with Titan Embed v2

LangChain has a BedrockEmbeddings class that wraps the Bedrock API cleanly. Titan Embed v2 supports two modes — retrieval-focused (retrieval.passage for ingestion, retrieval.query at query time) — which marginally improves recall for asymmetric search scenarios like Q&A:

from langchain_aws import BedrockEmbeddings
import boto3

bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")

embeddings = BedrockEmbeddings(
    client=bedrock_client,
    model_id="amazon.titan-embed-text-v2:0",
    model_kwargs={"dimensions": 1024, "normalize": True}
)

# Embed and upsert in batches of 50 to respect Bedrock rate limits
BATCH_SIZE = 50
for i in range(0, len(chunks), BATCH_SIZE):
    batch = chunks[i:i + BATCH_SIZE]
    texts = [c.page_content for c in batch]
    vectors = embeddings.embed_documents(texts)

    # Build OpenSearch bulk payload
    bulk_body = []
    for chunk, vector in zip(batch, vectors):
        bulk_body.append({"index": {"_index": "documents"}})
        bulk_body.append({
            "embedding": vector,
            "content": chunk.page_content,
            "source": chunk.metadata.get("source", ""),
            "chunk_id": f"{chunk.metadata.get('source', '')}-{i}"
        })

    # POST to OpenSearch
    resp = requests.post(
        f"{collection_endpoint}/_bulk",
        auth=auth,
        json="\n".join(json.dumps(d) for d in bulk_body) + "\n",
        headers={"Content-Type": "application/x-ndjson"}
    )
    print(f"Batch {i//BATCH_SIZE + 1}: {resp.status_code}")

Key insight: Always use "normalize": True when embedding with Titan v2 and using cosine similarity in OpenSearch. Without normalization, cosine similarity and dot-product similarity give different results, and your retrieval scores become meaningless for ranking. Normalize at ingestion time, not at query time, to avoid inconsistencies.

The Query Pipeline

The query path ties together retrieval and generation. LangChain's RetrievalQA chain handles the orchestration, but for production use you typically want more control over the prompt and how citations are formatted.

from langchain_aws import ChatBedrock
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_community.vectorstores import OpenSearchVectorSearch

# Connect to the existing OpenSearch index
vector_store = OpenSearchVectorSearch(
    opensearch_url=collection_endpoint,
    index_name="documents",
    embedding_function=embeddings,
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)

# Claude on Bedrock for generation
llm = ChatBedrock(
    client=bedrock_client,
    model_id="anthropic.claude-3-haiku-20240307-v1:0",
    model_kwargs={
        "max_tokens": 1024,
        "temperature": 0.1    # low temp for factual Q&A
    }
)

# Custom prompt that discourages hallucination
RAG_PROMPT = PromptTemplate(
    input_variables=["context", "question"],
    template="""You are a helpful assistant. Answer the question using ONLY the information in the context below. If the context does not contain enough information to answer the question, say "I don't have enough information to answer that question" — do not guess.

Context:
{context}

Question: {question}

Answer:"""
)

qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vector_store.as_retriever(
        search_type="similarity",
        search_kwargs={"k": 5}
    ),
    chain_type_kwargs={"prompt": RAG_PROMPT},
    return_source_documents=True
)

def ask(question: str) -> dict:
    result = qa_chain.invoke({"query": question})
    return {
        "answer": result["result"],
        "sources": list({doc.metadata["source"] for doc in result["source_documents"]})
    }

Improving Retrieval Quality

Hybrid search: dense + sparse

Pure vector similarity works well for semantic questions but struggles with exact matches — product SKUs, error codes, proper nouns. OpenSearch supports hybrid search combining vector similarity (dense retrieval) with BM25 keyword scoring (sparse retrieval). Enabling it is a matter of adding a hybrid query type:

hybrid_query = {
    "query": {
        "hybrid": {
            "queries": [
                {
                    "neural": {
                        "embedding": {
                            "query_text": user_query,
                            "model_id": "<opensearch-ml-model-id>",
                            "k": 10
                        }
                    }
                },
                {
                    "match": {
                        "content": {
                            "query": user_query,
                            "boost": 0.3
                        }
                    }
                }
            ]
        }
    }
}

Re-ranking with a cross-encoder

After initial retrieval, a re-ranker can dramatically improve the quality of the top-K results passed to the LLM. The pattern: retrieve a larger candidate set (top-20), re-rank with a cross-encoder model like cross-encoder/ms-marco-MiniLM-L-6-v2 (which runs cheaply on a small Lambda or ECS task), then pass only the top-5 to the LLM prompt. This combination consistently outperforms either approach alone.

Cost Management at Scale

One of the main advantages of building on Bedrock is the pay-per-token pricing with no idle infrastructure costs. At scale, a few practices keep costs predictable:

Testing and Evaluation

Before shipping a RAG pipeline to users, you need a way to measure whether it actually answers questions correctly. The minimal viable evaluation harness: a golden dataset of 50–100 question/expected-answer pairs representative of real user queries, and a simple LLM-as-judge scoring loop that rates each answer on a 1–5 scale for accuracy and groundedness.

import json

def evaluate_rag(golden_dataset_path: str) -> dict:
    with open(golden_dataset_path) as f:
        examples = json.load(f)

    scores = []
    for ex in examples:
        result = ask(ex["question"])
        # Use Claude itself as judge (fast and cheap with Haiku)
        judge_prompt = f"""Rate this RAG answer 1-5 for accuracy.
Expected: {ex["expected_answer"]}
Actual: {result["answer"]}
Output only a JSON object: {{"score": N, "reason": "..."}}"""

        judge_response = llm.invoke(judge_prompt)
        score_data = json.loads(judge_response.content)
        scores.append(score_data["score"])

    return {
        "mean_score": sum(scores) / len(scores),
        "below_3": sum(1 for s in scores if s < 3),
        "total": len(scores)
    }

Aim for a mean score above 3.5 before going to production. Lower than that usually indicates a chunking or retrieval issue — the right information exists in your corpus but isn't being surfaced reliably — rather than a generation issue.

What Comes Next

This pipeline covers the essential RAG stack. From here, the most impactful extensions are typically: adding a query rewriter to handle multi-turn conversation context, implementing metadata filters so users can scope retrieval to specific document categories or date ranges, and setting up a feedback loop that logs low-confidence answers for human review. Each of these adds meaningfully to answer quality without requiring a fundamental architectural change.

The complete code for this pipeline — including the CDK stack that provisions the OpenSearch collection, IAM roles, and Lambda functions — is available in our open-source repository. If you'd rather get a working deployment faster, that's exactly the kind of engagement we run as a two-week sprint.