Cấu hình Script Huấn luyện để Load Dữ liệu từ DVC
Chúng ta sẽ viết một script Python chuẩn để lấy dữ liệu đã được versioning trong DVC. Thay vì hardcode đường dẫn, script sẽ sử dụng API của DVC để pull dữ liệu về đúng directory huấn luyện, đảm bảo tính nhất quán giữa môi trường dev và production.
Việc này giải quyết bài toán "data drift" khi dữ liệu thay đổi nhưng script huấn luyện vẫn chạy trên data cũ. DVC đảm bảo script luôn load đúng hash version của dataset.
Tạo file train.py trong thư mục dự án với nội dung hoàn chỉnh sau:
import dvc.api
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib
# Cấu hình
DATA_FILE = "data/processed/train.csv"
MODEL_PATH = "models/best_model.pkl"
PARAMS_FILE = "params.yaml"
def load_data():
"""
Load dữ liệu từ DVC.
Nếu data chưa được pull, DVC sẽ tự động fetch về từ remote storage.
"""
print("Loading data from DVC...")
try:
# Sử dụng API của DVC để load data trực tiếp
# Nếu chạy trong pipeline DVC, nó sẽ tự động resolve version
df = pd.read_csv(DATA_FILE)
print(f"Data loaded successfully. Shape: {df.shape}")
return df
except FileNotFoundError:
print("Data file not found. Ensure 'dvc pull' has been run or DVC is initialized.")
# Fallback: Force pull nếu file chưa tồn tại
dvc.api.pull(DATA_FILE)
df = pd.read_csv(DATA_FILE)
return df
def train_model(df):
"""
Huấn luyện mô hình Random Forest đơn giản.
"""
print("Starting model training...")
X = df.drop(columns=['target'])
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
score = model.score(X_test, y_test)
print(f"Model accuracy: {score:.4f}")
# Lưu mô hình
joblib.dump(model, MODEL_PATH)
print(f"Model saved to {MODEL_PATH}")
return score
if __name__ == "__main__":
df = load_data()
accuracy = train_model(df)
print(f"Training finished. Final accuracy: {accuracy}")
Kết quả mong đợi: Script chạy thành công, không báo lỗi FileNotFoundError nếu dữ liệu đã được sync, và xuất ra đường dẫn file mô hình đã lưu.
Verify kết quả load dữ liệu
Chạy lệnh dưới đây để kiểm tra xem script có nhận diện đúng file dữ liệu từ DVC không:
python train.py
Nếu thành công, bạn sẽ thấy log "Data loaded successfully" và "Model saved to models/best_model.pkl". Nếu chưa có data, DVC sẽ tự động pull về.
Cấu hình DVC tự động cập nhật sau khi huấn luyện
Tạo cấu hình params.yaml và metrics.yaml
Để DVC tự động theo dõi sự thay đổi của tham số (params) và kết quả (metrics) sau mỗi lần huấn luyện, chúng ta cần định nghĩa rõ các file cấu hình này.
Tham số sẽ được DVC theo dõi để xác định khi nào cần chạy lại pipeline, còn metrics sẽ được ghi vào DVC để so sánh hiệu suất giữa các version.
Tạo file params.yaml tại thư mục gốc dự án:
n_estimators: 100
random_state: 42
test_size: 0.2
Tạo file metrics.yaml để lưu kết quả đánh giá:
accuracy: 0.85
Lưu ý: Trong script train.py, chúng ta cần sửa lại phần lưu metrics để ghi vào file này thay vì chỉ print ra console.
Cập nhật script huấn luyện để ghi Metrics
Sửa hàm train_model trong train.py để ghi kết quả accuracy vào file metrics.yaml bằng thư viện yaml.
import yaml
# ... (import ở trên giữ nguyên)
def train_model(df):
# ... (code huấn luyện giữ nguyên)
score = model.score(X_test, y_test)
# Ghi metrics vào file
with open("metrics.yaml", "w") as f:
yaml.dump({"accuracy": score}, f)
print(f"Model accuracy: {score:.4f}")
print(f"Metrics saved to metrics.yaml")
return score
Kết quả mong đợi: Sau khi chạy train.py, file metrics.yaml sẽ được tự động cập nhật với giá trị accuracy mới nhất.
Register pipeline vào DVC
Để DVC tự động theo dõi sự phụ thuộc giữa input (data, params) và output (model, metrics), ta cần tạo file dvc.yaml định nghĩa pipeline.
File này đóng vai trò như một DAG (Directed Acyclic Graph), giúp DVC biết khi nào data thay đổi thì cần chạy lại bước huấn luyện.
Tạo file dvc.yaml tại thư mục gốc:
stages:
train:
cmd: python train.py
deps:
- train.py
- data/processed/train.csv
- params.yaml
outs:
- models/best_model.pkl
- metrics.yaml
Kết quả mong đợi: DVC đã nhận diện pipeline. Khi chạy dvc repro, nó sẽ chỉ chạy lại nếu train.csv hoặc params.yaml thay đổi.
Verify kết quả tự động cập nhật
Chạy lệnh để kích hoạt pipeline và kiểm tra trạng thái:
dvc repro train -v
Bạn sẽ thấy log chi tiết của DVC kiểm tra hash của các file input. Nếu không có gì thay đổi, DVC sẽ báo "nothing to reproduce". Nếu bạn sửa params.yaml (ví dụ tăng n_estimators), chạy lại lệnh này sẽ kích hoạt huấn luyện mới và cập nhật metrics.yaml.
Sử dụng DVC Pipeline để tự động hóa tiền xử lý dữ liệu
Tạo script tiền xử lý (Preprocessing)
Chúng ta cần tách biệt bước tiền xử lý dữ liệu ra khỏi bước huấn luyện để tạo thành một pipeline đầy đủ.
Việc này giúp tái sử dụng bước tiền xử lý cho các mô hình khác nhau và đảm bảo dữ liệu đầu vào cho bước huấn luyện luôn ở định dạng chuẩn.
Tạo file preprocess.py trong thư mục dự án:
import pandas as pd
import numpy as np
import os
def preprocess():
# Giả sử raw data nằm ở data/raw/dataset.csv
raw_path = "data/raw/dataset.csv"
processed_path = "data/processed/train.csv"
# Tạo thư mục nếu chưa có
os.makedirs(os.path.dirname(processed_path), exist_ok=True)
print(f"Loading raw data from {raw_path}...")
df = pd.read_csv(raw_path)
# Thực hiện các bước xử lý đơn giản
# 1. Xóa hàng null
df = df.dropna()
# 2. Tạo biến target giả lập (nếu chưa có)
if 'target' not in df.columns:
df['target'] = np.random.randint(0, 2, size=len(df))
# 3. Chuẩn hóa (ví dụ: normalize các cột số)
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
for col in numeric_cols:
if col != 'target':
df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min())
print(f"Saving processed data to {processed_path}...")
df.to_csv(processed_path, index=False)
print("Preprocessing done.")
if __name__ == "__main__":
preprocess()
Kết quả mong đợi: Script chạy xong sẽ tạo ra file data/processed/train.csv từ dữ liệu thô.
Cấu hình Pipeline DVC cho Preprocessing
Bây giờ ta sẽ mở rộng file dvc.yaml để bao gồm cả bước preprocess và thiết lập dependency giữa preprocess và train.
DVC sẽ tự động chạy preprocess trước, sau đó mới chạy train nếu dữ liệu thô thay đổi.
Thay thế nội dung file dvc.yaml bằng cấu hình mới:
stages:
preprocess:
cmd: python preprocess.py
deps:
- preprocess.py
- data/raw/dataset.csv
outs:
- data/processed/train.csv
train:
cmd: python train.py
deps:
- train.py
- data/processed/train.csv
- params.yaml
outs:
- models/best_model.pkl
- metrics.yaml
Kết quả mong đợi: DVC hiểu rằng train phụ thuộc vào output của preprocess.
Verify kết quả pipeline tự động
Chạy lệnh để kích hoạt toàn bộ pipeline từ đầu:
dvc repro -v
Bạn sẽ thấy DVC tự động chạy theo thứ tự:
1. Kiểm tra data/raw/dataset.csv. Nếu mới, chạy preprocess.
2. Khi data/processed/train.csv được tạo, DVC nhận ra input của train đã thay đổi.
3. Tự động chạy train để huấn luyện lại mô hình.
Chạy thử nghiệm End-to-End từ Dữ liệu đến Mô hình
Chuẩn bị dữ liệu mẫu và Remote Storage
Để test toàn bộ quy trình, chúng ta cần một file dữ liệu thô và cấu hình remote storage (ví dụ: S3 hoặc local storage) để lưu trữ version của data và model.
Ở đây ta giả định đã có remote storage được config trong phần trước. Nếu chưa, ta dùng local storage để test nhanh.
mkdir -p data/raw
echo "col1,col2,target
1.0,2.0,0
3.0,4.0,1
5.0,6.0,0" > data/raw/dataset.csv
# Config remote storage local (nếu chưa có)
dvc remote add -d myremote ./dvc_storage
dvc push
Kết quả mong đợi: Dữ liệu thô đã được tạo, và DVC đã push các version hiện tại lên remote storage local.
Chạy Pipeline End-to-End
Bây giờ ta sẽ xóa các file output (processed data, model, metrics) để mô phỏng môi trường mới và chạy lại toàn bộ quy trình.
rm -rf data/processed
rm -rf models
rm metrics.yaml
# Kích hoạt pipeline
dvc repro -v
Kết quả mong đợi:
1. DVC phát hiện data/processed/train.csv bị thiếu -> Chạy preprocess.
2. DVC phát hiện models/best_model.pkl bị thiếu -> Chạy train.
3. Xuất ra log thành công của cả 2 bước.
Commit và Push toàn bộ thay đổi lên Version Control
Sau khi pipeline chạy xong, chúng ta cần commit các thay đổi của DVC (file .dvc và dvc.lock) vào Git để lưu lại state của pipeline.
git add dvc.yaml dvc.lock data/processed/train.csv.dvc models/best_model.pkl.dvc metrics.yaml.dvc
git commit -m "Add ML pipeline with preprocessing and training"
dvc push
Kết quả mong đợi: Git commit thành công và DVC push các artifact (data, model) lên remote storage.
Verify kết quả cuối cùng
Kiểm tra xem pipeline đã được lưu trữ đúng cách chưa bằng lệnh:
dvc dag
dvc status
Bạn sẽ thấy:
- dvc dag: Hiển thị đồ thị phụ thuộc giữa preprocess và train.
- dvc status: Báo "clean" nếu mọi thứ đã được sync lên remote.
Để kiểm tra lịch sử các lần chạy (nếu bạn đã chạy nhiều lần với params khác nhau), sử dụng:
dvc metrics show --all
Lệnh này sẽ hiển thị bảng so sánh accuracy của các version mô hình khác nhau, xác nhận quy trình DataOps đã hoạt động trọn vẹn.
Điều hướng series:
Mục lục: Series: Xây dựng nền tảng DataOps với DVC, MLflow và Kubernetes cho vòng đời AI
« Phần 3: Tích hợp DVC vào quy trình huấn luyện ML
Phần 4: Cài đặt và cấu hình MLflow trên Kubernetes »