AI Monitoring & Observability: ดูแล AI ใน Production
การ monitor AI applications เป็นสิ่งจำเป็นเพื่อให้มั่นใจว่าระบบทำงานได้ดี มีคุณภาพ และควบคุมค่าใช้จ่ายได้
ทำไมต้อง Monitor AI?
ความท้าทายเฉพาะของ AI
AI ≠ Traditional Software
ปัญหาที่ต้องเจอ:
1. Non-deterministic outputs
2. Model degradation over time
3. Cost unpredictability
4. Latency variations
5. Quality drift
What to Monitor
Key Areas:
1. Performance
- Latency
- Throughput
- Error rates
2. Quality
- Output quality
- User satisfaction
- Accuracy metrics
3. Cost
- Token usage
- API costs
- Cost per request
4. Reliability
- Availability
- Rate limits
- Failures
Core Metrics
Performance Metrics
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class RequestMetrics:
request_id: str
model: str
latency_ms: float
input_tokens: int
output_tokens: int
status: str
error: Optional[str] = None
class PerformanceTracker:
def __init__(self):
self.metrics = []
def track_request(self, model, messages):
request_id = str(uuid.uuid4())
start_time = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=messages
)
latency_ms = (time.time() - start_time) * 1000
metric = RequestMetrics(
request_id=request_id,
model=model,
latency_ms=latency_ms,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
status="success"
)
self.metrics.append(metric)
return response
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
metric = RequestMetrics(
request_id=request_id,
model=model,
latency_ms=latency_ms,
input_tokens=0,
output_tokens=0,
status="error",
error=str(e)
)
self.metrics.append(metric)
raise
Quality Metrics
class QualityTracker:
def __init__(self):
self.evaluations = []
def evaluate_response(self, prompt, response, expected=None):
"""Evaluate response quality."""
evaluation = {
"timestamp": datetime.now(),
"prompt_length": len(prompt),
"response_length": len(response),
"has_content": len(response.strip()) > 0
}
# Check for common issues
evaluation["has_hallucination_markers"] = any(
phrase in response.lower()
for phrase in ["i cannot", "i don't have", "as an ai"]
)
# If expected output provided, compare
if expected:
evaluation["matches_expected"] = self._compare(response, expected)
self.evaluations.append(evaluation)
return evaluation
def get_quality_score(self, window_hours=24):
"""Calculate quality score over time window."""
cutoff = datetime.now() - timedelta(hours=window_hours)
recent = [e for e in self.evaluations if e["timestamp"] > cutoff]
if not recent:
return None
has_content = sum(1 for e in recent if e["has_content"])
no_hallucination = sum(1 for e in recent if not e["has_hallucination_markers"])
return {
"content_rate": has_content / len(recent),
"quality_rate": no_hallucination / len(recent),
"sample_size": len(recent)
}
Cost Metrics
class CostTracker:
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet-4": {"input": 3.00, "output": 15.00}
}
def __init__(self):
self.costs = []
def track_cost(self, model, input_tokens, output_tokens):
if model not in self.PRICING:
return None
prices = self.PRICING[model]
input_cost = (input_tokens / 1_000_000) * prices["input"]
output_cost = (output_tokens / 1_000_000) * prices["output"]
total_cost = input_cost + output_cost
self.costs.append({
"timestamp": datetime.now(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": total_cost
})
return total_cost
def get_daily_cost(self, date=None):
date = date or datetime.now().date()
daily = [c for c in self.costs if c["timestamp"].date() == date]
return {
"total": sum(c["cost"] for c in daily),
"by_model": self._group_by_model(daily),
"request_count": len(daily)
}
def _group_by_model(self, costs):
by_model = {}
for c in costs:
model = c["model"]
if model not in by_model:
by_model[model] = 0
by_model[model] += c["cost"]
return by_model
Logging
Structured Logging
import logging
import json
class AILogger:
def __init__(self, name="ai_app"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
def log_request(self, request_id, model, prompt, metadata=None):
self.logger.info(json.dumps({
"event": "ai_request",
"request_id": request_id,
"model": model,
"prompt_length": len(prompt),
"metadata": metadata or {}
}))
def log_response(self, request_id, response, latency_ms, tokens):
self.logger.info(json.dumps({
"event": "ai_response",
"request_id": request_id,
"latency_ms": latency_ms,
"input_tokens": tokens["input"],
"output_tokens": tokens["output"],
"response_length": len(response)
}))
def log_error(self, request_id, error, context=None):
self.logger.error(json.dumps({
"event": "ai_error",
"request_id": request_id,
"error_type": type(error).__name__,
"error_message": str(error),
"context": context or {}
}))
Prompt/Response Logging
class PromptLogger:
def __init__(self, storage_path="logs/prompts"):
self.storage_path = storage_path
os.makedirs(storage_path, exist_ok=True)
def log(self, request_id, prompt, response, metadata=None):
"""Log full prompt and response for debugging."""
log_entry = {
"request_id": request_id,
"timestamp": datetime.now().isoformat(),
"prompt": prompt,
"response": response,
"metadata": metadata or {}
}
# Save to file (or send to logging service)
filepath = os.path.join(
self.storage_path,
f"{datetime.now().strftime('%Y-%m-%d')}.jsonl"
)
with open(filepath, "a") as f:
f.write(json.dumps(log_entry) + "\n")
def search(self, query, date=None):
"""Search logs for debugging."""
date = date or datetime.now().strftime('%Y-%m-%d')
filepath = os.path.join(self.storage_path, f"{date}.jsonl")
results = []
with open(filepath, "r") as f:
for line in f:
entry = json.loads(line)
if query.lower() in entry["prompt"].lower():
results.append(entry)
return results
Tracing
LangSmith Integration
import os
# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-ai-app"
# All LangChain operations are now traced
# View at smith.langchain.com
Custom Tracing
import uuid
from contextlib import contextmanager
class Tracer:
def __init__(self):
self.traces = {}
@contextmanager
def trace(self, name, parent_id=None):
trace_id = str(uuid.uuid4())
start_time = time.time()
trace_data = {
"id": trace_id,
"name": name,
"parent_id": parent_id,
"start_time": start_time,
"spans": []
}
self.traces[trace_id] = trace_data
try:
yield trace_id
finally:
trace_data["end_time"] = time.time()
trace_data["duration_ms"] = (
trace_data["end_time"] - start_time
) * 1000
def add_span(self, trace_id, name, data):
if trace_id in self.traces:
self.traces[trace_id]["spans"].append({
"name": name,
"timestamp": time.time(),
"data": data
})
# Usage
tracer = Tracer()
with tracer.trace("ai_request") as trace_id:
tracer.add_span(trace_id, "prompt_prepared", {"length": 100})
response = call_ai(prompt)
tracer.add_span(trace_id, "response_received", {"tokens": 500})
Alerting
Alert Rules
class AlertManager:
def __init__(self, notification_service):
self.notification_service = notification_service
self.rules = []
def add_rule(self, name, condition, severity="warning"):
self.rules.append({
"name": name,
"condition": condition,
"severity": severity
})
def check_metrics(self, metrics):
alerts = []
for rule in self.rules:
if rule["condition"](metrics):
alert = {
"rule": rule["name"],
"severity": rule["severity"],
"timestamp": datetime.now(),
"metrics": metrics
}
alerts.append(alert)
self.notification_service.send(alert)
return alerts
# Setup alerts
alert_manager = AlertManager(slack_notifier)
# High latency alert
alert_manager.add_rule(
"high_latency",
lambda m: m.get("avg_latency_ms", 0) > 5000,
severity="critical"
)
# High error rate alert
alert_manager.add_rule(
"high_error_rate",
lambda m: m.get("error_rate", 0) > 0.05,
severity="critical"
)
# Cost spike alert
alert_manager.add_rule(
"cost_spike",
lambda m: m.get("daily_cost", 0) > 100,
severity="warning"
)
Slack Notifications
import requests
class SlackNotifier:
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def send(self, alert):
color = "#ff0000" if alert["severity"] == "critical" else "#ffaa00"
payload = {
"attachments": [{
"color": color,
"title": f"🚨 AI Alert: {alert['rule']}",
"text": f"Severity: {alert['severity']}",
"fields": [
{"title": k, "value": str(v), "short": True}
for k, v in alert["metrics"].items()
],
"ts": alert["timestamp"].timestamp()
}]
}
requests.post(self.webhook_url, json=payload)
Dashboard
Metrics Aggregation
class MetricsDashboard:
def __init__(self, tracker):
self.tracker = tracker
def get_summary(self, hours=24):
cutoff = datetime.now() - timedelta(hours=hours)
recent = [m for m in self.tracker.metrics
if m.timestamp > cutoff]
if not recent:
return None
latencies = [m.latency_ms for m in recent]
errors = [m for m in recent if m.status == "error"]
return {
"time_range": f"Last {hours} hours",
"total_requests": len(recent),
"success_rate": 1 - len(errors) / len(recent),
"latency": {
"avg_ms": sum(latencies) / len(latencies),
"p50_ms": sorted(latencies)[len(latencies) // 2],
"p95_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"p99_ms": sorted(latencies)[int(len(latencies) * 0.99)]
},
"tokens": {
"total_input": sum(m.input_tokens for m in recent),
"total_output": sum(m.output_tokens for m in recent),
"avg_per_request": sum(m.input_tokens + m.output_tokens
for m in recent) / len(recent)
},
"by_model": self._group_by_model(recent),
"errors": {
"count": len(errors),
"types": self._count_error_types(errors)
}
}
def _group_by_model(self, metrics):
by_model = {}
for m in metrics:
if m.model not in by_model:
by_model[m.model] = {"count": 0, "tokens": 0}
by_model[m.model]["count"] += 1
by_model[m.model]["tokens"] += m.input_tokens + m.output_tokens
return by_model
def _count_error_types(self, errors):
types = {}
for e in errors:
error_type = e.error.split(":")[0] if e.error else "unknown"
types[error_type] = types.get(error_type, 0) + 1
return types
Best Practices
1. Start with Key Metrics
Essential metrics to track:
- Latency (p50, p95, p99)
- Error rate
- Token usage
- Cost
- Request volume
2. Set Up Baselines
# Establish baselines during initial period
baselines = {
"latency_p95_ms": 3000,
"error_rate": 0.01,
"daily_cost": 50
}
# Alert when deviating from baseline
def check_deviation(current, baseline, threshold=0.5):
return abs(current - baseline) / baseline > threshold
3. Monitor Quality
# Sample and evaluate responses
def sample_for_quality(response, sample_rate=0.1):
if random.random() < sample_rate:
# Send for human evaluation
queue_for_review(response)
# Or automated evaluation
quality_score = evaluate_with_llm(response)
log_quality_metric(quality_score)
สรุป
Key Monitoring Areas:
- Performance: Latency, throughput, errors
- Quality: Output quality, accuracy
- Cost: Token usage, spending
- Reliability: Availability, rate limits
Essential Components:
- Structured logging
- Metrics collection
- Distributed tracing
- Alerting system
Best Practices:
- Start with key metrics
- Set baselines
- Monitor quality
- Automate alerting
อ่านเพิ่มเติม:
เขียนโดย
AI Unlocked Team
บทความอื่นๆ ที่น่าสนใจ
วิธีติดตั้ง FFmpeg บน Windows และ Mac: คู่มือฉบับสมบูรณ์
เรียนรู้วิธีติดตั้ง FFmpeg บน Windows และ macOS พร้อมการตั้งค่า PATH อย่างละเอียด เพื่อใช้งานโปรแกรมตัดต่อวิดีโอและเสียงระดับมืออาชีพ
04/12/2568
สร้าง AI-Powered SaaS: จากไอเดียสู่ผลิตภัณฑ์
คู่มือครบวงจรในการสร้าง AI-Powered SaaS ตั้งแต่การวางแผน พัฒนา ไปจนถึง launch และ scale รวมถึง tech stack, pricing และ business model
03/02/2568
AI Security: วิธีใช้ AI อย่างปลอดภัย
เรียนรู้แนวทางการใช้ AI อย่างปลอดภัย ครอบคลุม prompt injection, data privacy, API security และ best practices สำหรับองค์กร
02/02/2568