Kiến trúc RAG và Chuẩn bị Dữ liệu Vector
Bước đầu tiên là thiết lập cơ sở dữ liệu chứa dữ liệu văn bản đã được chuyển đổi sang vector (embedding). Chúng ta sẽ sử dụng Databend để lưu trữ cặp dữ liệu: nội dung văn bản gốc và vector tương ứng.
Mục đích: Tạo bảng lưu trữ tài liệu (documents) và vector embedding để phục vụ cho việc tìm kiếm tương tự (similarity search).
Kết quả mong đợi: Bảng rag_documents được tạo với cột vector và hàm index HNSW tối ưu cho tìm kiếm.
Trước khi tạo bảng, hãy đảm bảo bạn đã kết nối vào Databend CLI hoặc MySQL client từ phần 4.
CREATE TABLE IF NOT EXISTS rag_documents (
id String,
content String,
embedding Vector(1536),
metadata String,
created_at DateTime DEFAULT now()
);
Thực thi lệnh trên sẽ tạo bảng với cột embedding có kích thước 1536 (thường dùng cho model text-embedding-ada-002 hoặc similar).
Bây giờ ta tạo index HNSW để tăng tốc độ tìm kiếm vector. Index này là bắt buộc để Databend thực hiện similarity search hiệu quả.
CREATE INDEX rag_embedding_idx ON rag_documents (embedding) USING HNSW WITH ('metric' = 'COSINE');
Kết quả mong đợi: Lệnh chạy thành công không lỗi, index được tạo và sẵn sàng cho các truy vấn vector.
Để kiểm tra cấu trúc bảng và index đã được tạo chưa, hãy chạy lệnh mô tả bảng.
DESCRIBE rag_documents;
Verify: Kiểm tra output, đảm bảo cột embedding có kiểu Vector(1536) và index HNSW xuất hiện trong phần Index của bảng.
Đưa dữ liệu mẫu vào Databend
Chúng ta cần chèn một số tài liệu mẫu đã được vector hóa vào bảng. Trong thực tế, bước tính vector thường làm ở Python/Go, ở đây ta giả định đã có sẵn vector.
Mục đích: Làm giàu dữ liệu để test khả năng tìm kiếm.
Kết quả mong đợi: Bảng có ít nhất 3-5 dòng dữ liệu với vector khác nhau.
INSERT INTO rag_documents (id, content, embedding, metadata) VALUES
('doc_001', 'Databend là một Data Cloud Platform native.', '[0.1, 0.2, 0.3, ..., 0.95]', '{"source": 'internal'}'),
('doc_002', 'Retrieval-Augmented Generation giúp LLM trả lời chính xác hơn.', '[0.4, 0.5, 0.6, ..., 0.96]', '{"source": 'research'}'),
('doc_003', 'Ubuntu 24.04 mang lại kernel mới và hiệu năng cao.', '[0.8, 0.9, 1.0, ..., 0.98]', '{"source": 'blog'}');
-- Lưu ý: Vector ở đây là ví dụ rút gọn, thực tế phải là mảng đầy đủ 1536 số float
Chú ý: Trong môi trường production, bạn sẽ không chèn vector thủ công mà dùng Python script để compute embedding rồi insert.
Verify: Chạy lệnh SELECT COUNT(*) FROM rag_documents; để đảm bảo số lượng dòng dữ liệu tăng lên.
Truy vấn Similarity Search với SQL
Đây là trái tim của hệ thống RAG. Chúng ta cần viết câu lệnh SQL để tìm những tài liệu có vector gần nhất với vector của câu hỏi (query vector).
Mục đích: Tìm kiếm top-k tài liệu có độ tương đồng cao nhất với truy vấn đầu vào.
Kết quả mong đợi: Trả về danh sách các document có điểm số (score) giảm dần.
Sử dụng hàm vector_distance_cosine để tính độ tương tự Cosine. Hàm này trả về khoảng cách, giá trị càng nhỏ thì càng giống nhau.
SELECT
id,
content,
metadata,
vector_distance_cosine(embedding, '[0.45, 0.55, 0.65, ..., 0.96]') AS similarity_score
FROM rag_documents
ORDER BY similarity_score ASC
LIMIT 3;
Lưu ý: Thay thế vector trong câu lệnh trên bằng vector thực tế của query mà bạn đã tính toán từ LLM.
Để tối ưu hơn, ta có thể lọc trước bằng ngưỡng điểm số để giảm tải tính toán.
SELECT
id,
content,
vector_distance_cosine(embedding, '[0.45, 0.55, 0.65, ..., 0.96]') AS score
FROM rag_documents
WHERE vector_distance_cosine(embedding, '[0.45, 0.55, 0.65, ..., 0.96]') < 0.8
ORDER BY score ASC
LIMIT 5;
Verify: Chạy lệnh và kiểm tra EXPLAIN để đảm bảo Databend đang dùng index HNSW (thấy HNSW trong plan) thay vì quét toàn bộ bảng (Full Scan).
EXPLAIN SELECT id, content FROM rag_documents WHERE vector_distance_cosine(embedding, '[0.45, 0.55, 0.65, ..., 0.96]') < 0.8 ORDER BY score ASC LIMIT 5;
Tích hợp Python với Databend cho Pipeline RAG
Chúng ta sẽ xây dựng một script Python để đóng vai trò là trung gian: nhận câu hỏi, gọi LLM để tạo vector, gửi vector vào Databend tìm kiếm, và trả kết quả về cho LLM.
Mục đích: Tạo một microservice RAG hoàn chỉnh bằng Python.
Kết quả mong đợi: Script chạy thành công, in ra kết quả tìm kiếm từ Databend.
Trước hết, cài đặt các thư viện cần thiết: driver Databend, PyTorch (hoặc numpy) để xử lý vector, và OpenAI (hoặc local embedding model).
pip install databend-sqlalchemy openai numpy
Tạo file cấu hình kết nối Databend để không lộ thông tin nhạy cảm trong code.
# /etc/databend_rag/config.yaml
database:
host: '192.168.1.100'
port: 8000
user: 'admin'
password: 'your_secure_password'
database: 'default'
rag:
embedding_model: 'text-embedding-ada-002'
top_k: 3
similarity_threshold: 0.7
Bây giờ viết file chính rag_app.py để thực hiện logic tìm kiếm.
import yaml
import numpy as np
import databend_driver
from openai import OpenAI
# Load config
with open('/etc/databend_rag/config.yaml', 'r') as f:
config = yaml.safe_load(f)
db_config = config['database']
rag_config = config['rag']
# Setup Databend Connection
def get_connection():
conn = databend_driver.connect(
host=db_config['host'],
port=db_config['port'],
user=db_config['user'],
password=db_config['password'],
database=db_config['database']
)
return conn
# Generate embedding (Mock for local testing, replace with real API call)
def generate_embedding(text):
# In production: client.embeddings.create(input=text, model=rag_config['embedding_model'])
# Return a dummy 1536-dim vector for demo structure
return np.random.rand(1536).astype(np.float32).tolist()
def search_documents(query_text, top_k=3):
query_vector = generate_embedding(query_text)
# Convert numpy array/list to string format for SQL if needed,
# but Databend driver handles numpy array to Vector type automatically in newer versions.
# Here we format it as a string array for SQL injection safety demo.
vec_str = str(query_vector).replace(' ', '').replace('[', '').replace(']', '')
sql = f"""
SELECT id, content, metadata, vector_distance_cosine(embedding, '[{vec_str}]') AS score
FROM rag_documents
WHERE vector_distance_cosine(embedding, '[{vec_str}]') < {rag_config['similarity_threshold']}
ORDER BY score ASC
LIMIT {top_k}
"""
conn = get_connection()
cur = conn.cursor()
cur.execute(sql)
results = cur.fetchall()
conn.close()
return results
if __name__ == "__main__":
query = "Làm thế nào để Databend cải thiện RAG?"
print(f"Searching for: {query}")
docs = search_documents(query)
for doc in docs:
print(f"ID: {doc[0]}, Content: {doc[1]}, Score: {doc[3]}")
Chạy script để test kết nối và logic tìm kiếm.
python3 rag_app.py
Verify: Kiểm tra output console, đảm bảo in ra các document có score thấp (tương đồng cao) và nội dung liên quan đến từ khóa tìm kiếm.
Tối ưu hóa Pipeline và Context cho LLM
Không chỉ lấy dữ liệu, chúng ta cần đóng gói kết quả tìm kiếm thành một "Context" chuẩn để gửi cho LLM.
Mục đích: Định dạng chuỗi prompt từ dữ liệu SQL trả về để LLM hiểu và tổng hợp câu trả lời.
Kết quả mong đợi: Một chuỗi prompt hoàn chỉnh bao gồm câu hỏi và các tài liệu liên quan.
Sửa lại hàm search_documents để trả về định dạng JSON hoặc chuỗi formatted.
def build_rag_context(query_text, results):
context_parts = []
for row in results:
doc_id, content, metadata, score = row
context_parts.append(f"""
--- Document {doc_id} (Score: {score:.4f}) ---
{content}
""")
final_prompt = f"""
Dựa trên các tài liệu sau, hãy trả lời câu hỏi: "{query_text}"
Tài liệu tham khảo:
{''.join(context_parts)}
Nếu không tìm thấy thông tin liên quan trong tài liệu, hãy trả lời rằng bạn không biết.
"""
return final_prompt
Tích hợp vào main function để tạo prompt cuối cùng.
if __name__ == "__main__":
query = "Databend là gì?"
docs = search_documents(query)
prompt = build_rag_context(query, docs)
print("--- FINAL PROMPT FOR LLM ---")
print(prompt)
# Send 'prompt' to LLM API here
Verify: Chạy lại script, output phải là một đoạn văn bản có cấu trúc rõ ràng, chứa câu hỏi và nội dung các document đã tìm được, sẵn sàng để gửi vào API của LLM.
Triển khai và Verify Hiệu năng
Để đảm bảo hệ thống RAG hoạt động ổn định, ta cần kiểm tra thời gian phản hồi (latency) của toàn bộ pipeline.
Mục đích: Đo lường thời gian từ khi nhận query đến khi có kết quả từ Databend.
Kết quả mong đợi: Thời gian query dưới 100ms cho các truy vấn vector (không bao gồm thời gian gọi LLM).
Sử dụng thư viện time trong Python để đo thời gian thực thi.
import time
if __name__ == "__main__":
query = "Hiệu năng của Databend thế nào?"
start_time = time.time()
docs = search_documents(query)
db_time = time.time() - start_time
print(f"Databend Query Latency: {db_time:.4f} seconds")
# Process context
prompt = build_rag_context(query, docs)
print(f"Context Length: {len(prompt)} characters")
Verify: Chạy script nhiều lần, đảm bảo thời gian Databend Query Latency ổn định và không tăng đột biến khi dữ liệu lớn lên.
Kiểm tra lại index HNSW trong Databend để đảm bảo không bị fragmentation.
OPTIMIZE TABLE rag_documents FINAL;
Verify: Chạy OPTIMIZE xong, chạy lại query SQL trực tiếp, kiểm tra EXPLAIN xem có dùng index hay không.
Điều hướng series:
Mục lục: Series: Triển khai Database AI-native với Databend và Ubuntu 24.04
« Phần 4: Tích hợp tính năng AI-native: Vector Search và Embedding
Phần 6: Giám sát hiệu năng và Troubleshooting nâng cao »