AI Integration
Monitoring
Observability
DevOps
Production

AI Monitoring & Observability: ดูแล AI ใน Production

เรียนรู้วิธี monitor AI applications ใน production ตั้งแต่ metrics tracking, logging, tracing ไปจนถึง alerting

AI Unlocked Team
23/01/2568
AI Monitoring & Observability: ดูแล AI ใน Production

AI Monitoring & Observability: ดูแล AI ใน Production

การ monitor AI applications เป็นสิ่งจำเป็นเพื่อให้มั่นใจว่าระบบทำงานได้ดี มีคุณภาพ และควบคุมค่าใช้จ่ายได้

ทำไมต้อง Monitor AI?

ความท้าทายเฉพาะของ AI

AI ≠ Traditional Software

ปัญหาที่ต้องเจอ:
1. Non-deterministic outputs
2. Model degradation over time
3. Cost unpredictability
4. Latency variations
5. Quality drift

What to Monitor

Key Areas:

1. Performance
   - Latency
   - Throughput
   - Error rates

2. Quality
   - Output quality
   - User satisfaction
   - Accuracy metrics

3. Cost
   - Token usage
   - API costs
   - Cost per request

4. Reliability
   - Availability
   - Rate limits
   - Failures

Core Metrics

Performance Metrics

import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class RequestMetrics:
    request_id: str
    model: str
    latency_ms: float
    input_tokens: int
    output_tokens: int
    status: str
    error: Optional[str] = None

class PerformanceTracker:
    def __init__(self):
        self.metrics = []

    def track_request(self, model, messages):
        request_id = str(uuid.uuid4())
        start_time = time.time()

        try:
            response = client.chat.completions.create(
                model=model,
                messages=messages
            )

            latency_ms = (time.time() - start_time) * 1000

            metric = RequestMetrics(
                request_id=request_id,
                model=model,
                latency_ms=latency_ms,
                input_tokens=response.usage.prompt_tokens,
                output_tokens=response.usage.completion_tokens,
                status="success"
            )

            self.metrics.append(metric)
            return response

        except Exception as e:
            latency_ms = (time.time() - start_time) * 1000

            metric = RequestMetrics(
                request_id=request_id,
                model=model,
                latency_ms=latency_ms,
                input_tokens=0,
                output_tokens=0,
                status="error",
                error=str(e)
            )

            self.metrics.append(metric)
            raise

Quality Metrics

class QualityTracker:
    def __init__(self):
        self.evaluations = []

    def evaluate_response(self, prompt, response, expected=None):
        """Evaluate response quality."""

        evaluation = {
            "timestamp": datetime.now(),
            "prompt_length": len(prompt),
            "response_length": len(response),
            "has_content": len(response.strip()) > 0
        }

        # Check for common issues
        evaluation["has_hallucination_markers"] = any(
            phrase in response.lower()
            for phrase in ["i cannot", "i don't have", "as an ai"]
        )

        # If expected output provided, compare
        if expected:
            evaluation["matches_expected"] = self._compare(response, expected)

        self.evaluations.append(evaluation)
        return evaluation

    def get_quality_score(self, window_hours=24):
        """Calculate quality score over time window."""
        cutoff = datetime.now() - timedelta(hours=window_hours)
        recent = [e for e in self.evaluations if e["timestamp"] > cutoff]

        if not recent:
            return None

        has_content = sum(1 for e in recent if e["has_content"])
        no_hallucination = sum(1 for e in recent if not e["has_hallucination_markers"])

        return {
            "content_rate": has_content / len(recent),
            "quality_rate": no_hallucination / len(recent),
            "sample_size": len(recent)
        }

Cost Metrics

class CostTracker:
    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-sonnet-4": {"input": 3.00, "output": 15.00}
    }

    def __init__(self):
        self.costs = []

    def track_cost(self, model, input_tokens, output_tokens):
        if model not in self.PRICING:
            return None

        prices = self.PRICING[model]
        input_cost = (input_tokens / 1_000_000) * prices["input"]
        output_cost = (output_tokens / 1_000_000) * prices["output"]
        total_cost = input_cost + output_cost

        self.costs.append({
            "timestamp": datetime.now(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost": total_cost
        })

        return total_cost

    def get_daily_cost(self, date=None):
        date = date or datetime.now().date()
        daily = [c for c in self.costs if c["timestamp"].date() == date]

        return {
            "total": sum(c["cost"] for c in daily),
            "by_model": self._group_by_model(daily),
            "request_count": len(daily)
        }

    def _group_by_model(self, costs):
        by_model = {}
        for c in costs:
            model = c["model"]
            if model not in by_model:
                by_model[model] = 0
            by_model[model] += c["cost"]
        return by_model

Logging

Structured Logging

import logging
import json

class AILogger:
    def __init__(self, name="ai_app"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)

        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(handler)

    def log_request(self, request_id, model, prompt, metadata=None):
        self.logger.info(json.dumps({
            "event": "ai_request",
            "request_id": request_id,
            "model": model,
            "prompt_length": len(prompt),
            "metadata": metadata or {}
        }))

    def log_response(self, request_id, response, latency_ms, tokens):
        self.logger.info(json.dumps({
            "event": "ai_response",
            "request_id": request_id,
            "latency_ms": latency_ms,
            "input_tokens": tokens["input"],
            "output_tokens": tokens["output"],
            "response_length": len(response)
        }))

    def log_error(self, request_id, error, context=None):
        self.logger.error(json.dumps({
            "event": "ai_error",
            "request_id": request_id,
            "error_type": type(error).__name__,
            "error_message": str(error),
            "context": context or {}
        }))

Prompt/Response Logging

class PromptLogger:
    def __init__(self, storage_path="logs/prompts"):
        self.storage_path = storage_path
        os.makedirs(storage_path, exist_ok=True)

    def log(self, request_id, prompt, response, metadata=None):
        """Log full prompt and response for debugging."""

        log_entry = {
            "request_id": request_id,
            "timestamp": datetime.now().isoformat(),
            "prompt": prompt,
            "response": response,
            "metadata": metadata or {}
        }

        # Save to file (or send to logging service)
        filepath = os.path.join(
            self.storage_path,
            f"{datetime.now().strftime('%Y-%m-%d')}.jsonl"
        )

        with open(filepath, "a") as f:
            f.write(json.dumps(log_entry) + "\n")

    def search(self, query, date=None):
        """Search logs for debugging."""
        date = date or datetime.now().strftime('%Y-%m-%d')
        filepath = os.path.join(self.storage_path, f"{date}.jsonl")

        results = []
        with open(filepath, "r") as f:
            for line in f:
                entry = json.loads(line)
                if query.lower() in entry["prompt"].lower():
                    results.append(entry)

        return results

Tracing

LangSmith Integration

import os

# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-ai-app"

# All LangChain operations are now traced
# View at smith.langchain.com

Custom Tracing

import uuid
from contextlib import contextmanager

class Tracer:
    def __init__(self):
        self.traces = {}

    @contextmanager
    def trace(self, name, parent_id=None):
        trace_id = str(uuid.uuid4())
        start_time = time.time()

        trace_data = {
            "id": trace_id,
            "name": name,
            "parent_id": parent_id,
            "start_time": start_time,
            "spans": []
        }

        self.traces[trace_id] = trace_data

        try:
            yield trace_id
        finally:
            trace_data["end_time"] = time.time()
            trace_data["duration_ms"] = (
                trace_data["end_time"] - start_time
            ) * 1000

    def add_span(self, trace_id, name, data):
        if trace_id in self.traces:
            self.traces[trace_id]["spans"].append({
                "name": name,
                "timestamp": time.time(),
                "data": data
            })

# Usage
tracer = Tracer()

with tracer.trace("ai_request") as trace_id:
    tracer.add_span(trace_id, "prompt_prepared", {"length": 100})

    response = call_ai(prompt)

    tracer.add_span(trace_id, "response_received", {"tokens": 500})

Alerting

Alert Rules

class AlertManager:
    def __init__(self, notification_service):
        self.notification_service = notification_service
        self.rules = []

    def add_rule(self, name, condition, severity="warning"):
        self.rules.append({
            "name": name,
            "condition": condition,
            "severity": severity
        })

    def check_metrics(self, metrics):
        alerts = []

        for rule in self.rules:
            if rule["condition"](metrics):
                alert = {
                    "rule": rule["name"],
                    "severity": rule["severity"],
                    "timestamp": datetime.now(),
                    "metrics": metrics
                }
                alerts.append(alert)
                self.notification_service.send(alert)

        return alerts

# Setup alerts
alert_manager = AlertManager(slack_notifier)

# High latency alert
alert_manager.add_rule(
    "high_latency",
    lambda m: m.get("avg_latency_ms", 0) > 5000,
    severity="critical"
)

# High error rate alert
alert_manager.add_rule(
    "high_error_rate",
    lambda m: m.get("error_rate", 0) > 0.05,
    severity="critical"
)

# Cost spike alert
alert_manager.add_rule(
    "cost_spike",
    lambda m: m.get("daily_cost", 0) > 100,
    severity="warning"
)

Slack Notifications

import requests

class SlackNotifier:
    def __init__(self, webhook_url):
        self.webhook_url = webhook_url

    def send(self, alert):
        color = "#ff0000" if alert["severity"] == "critical" else "#ffaa00"

        payload = {
            "attachments": [{
                "color": color,
                "title": f"🚨 AI Alert: {alert['rule']}",
                "text": f"Severity: {alert['severity']}",
                "fields": [
                    {"title": k, "value": str(v), "short": True}
                    for k, v in alert["metrics"].items()
                ],
                "ts": alert["timestamp"].timestamp()
            }]
        }

        requests.post(self.webhook_url, json=payload)

Dashboard

Metrics Aggregation

class MetricsDashboard:
    def __init__(self, tracker):
        self.tracker = tracker

    def get_summary(self, hours=24):
        cutoff = datetime.now() - timedelta(hours=hours)
        recent = [m for m in self.tracker.metrics
                  if m.timestamp > cutoff]

        if not recent:
            return None

        latencies = [m.latency_ms for m in recent]
        errors = [m for m in recent if m.status == "error"]

        return {
            "time_range": f"Last {hours} hours",
            "total_requests": len(recent),
            "success_rate": 1 - len(errors) / len(recent),
            "latency": {
                "avg_ms": sum(latencies) / len(latencies),
                "p50_ms": sorted(latencies)[len(latencies) // 2],
                "p95_ms": sorted(latencies)[int(len(latencies) * 0.95)],
                "p99_ms": sorted(latencies)[int(len(latencies) * 0.99)]
            },
            "tokens": {
                "total_input": sum(m.input_tokens for m in recent),
                "total_output": sum(m.output_tokens for m in recent),
                "avg_per_request": sum(m.input_tokens + m.output_tokens
                                       for m in recent) / len(recent)
            },
            "by_model": self._group_by_model(recent),
            "errors": {
                "count": len(errors),
                "types": self._count_error_types(errors)
            }
        }

    def _group_by_model(self, metrics):
        by_model = {}
        for m in metrics:
            if m.model not in by_model:
                by_model[m.model] = {"count": 0, "tokens": 0}
            by_model[m.model]["count"] += 1
            by_model[m.model]["tokens"] += m.input_tokens + m.output_tokens
        return by_model

    def _count_error_types(self, errors):
        types = {}
        for e in errors:
            error_type = e.error.split(":")[0] if e.error else "unknown"
            types[error_type] = types.get(error_type, 0) + 1
        return types

Best Practices

1. Start with Key Metrics

Essential metrics to track:
- Latency (p50, p95, p99)
- Error rate
- Token usage
- Cost
- Request volume

2. Set Up Baselines

# Establish baselines during initial period
baselines = {
    "latency_p95_ms": 3000,
    "error_rate": 0.01,
    "daily_cost": 50
}

# Alert when deviating from baseline
def check_deviation(current, baseline, threshold=0.5):
    return abs(current - baseline) / baseline > threshold

3. Monitor Quality

# Sample and evaluate responses
def sample_for_quality(response, sample_rate=0.1):
    if random.random() < sample_rate:
        # Send for human evaluation
        queue_for_review(response)

        # Or automated evaluation
        quality_score = evaluate_with_llm(response)
        log_quality_metric(quality_score)

สรุป

Key Monitoring Areas:

  1. Performance: Latency, throughput, errors
  2. Quality: Output quality, accuracy
  3. Cost: Token usage, spending
  4. Reliability: Availability, rate limits

Essential Components:

  • Structured logging
  • Metrics collection
  • Distributed tracing
  • Alerting system

Best Practices:

  • Start with key metrics
  • Set baselines
  • Monitor quality
  • Automate alerting

อ่านเพิ่มเติม:


เขียนโดย

AI Unlocked Team