AI Integration
Caching
Performance
Best Practices
Optimization

AI Caching Strategies: เพิ่มความเร็วและลดต้นทุน

เรียนรู้กลยุทธ์ caching สำหรับ AI applications ตั้งแต่ response caching, embedding caching ไปจนถึง semantic caching

AI Unlocked Team
22/01/2568
AI Caching Strategies: เพิ่มความเร็วและลดต้นทุน

AI Caching Strategies: เพิ่มความเร็วและลดต้นทุน

Caching เป็นเทคนิคสำคัญในการลดค่าใช้จ่ายและเพิ่มความเร็วของ AI applications

ทำไมต้อง Cache AI Responses?

ประโยชน์ของ Caching

1. Cost Reduction
   - ไม่ต้องเรียก API ซ้ำ
   - ประหยัดได้ 50-80%

2. Latency Reduction
   - Response ทันที
   - UX ดีขึ้นมาก

3. Rate Limit Management
   - ลด API calls
   - Avoid hitting limits

4. Reliability
   - Serve cached when API down
   - Graceful degradation

When to Cache

✅ ควร Cache:
- Repeated queries
- Static content generation
- Embeddings
- FAQ responses
- Common classifications

❌ ไม่ควร Cache:
- Real-time data
- Personalized responses
- Random/creative content
- Time-sensitive info

Response Caching

Basic In-Memory Cache

import hashlib
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_completion(prompt_hash, model):
    """Cache based on prompt hash."""
    # This will cache results in memory
    return _call_api(prompt_hash, model)

def get_completion(prompt, model="gpt-4o-mini"):
    # Create hash of prompt
    prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
    return cached_completion(prompt_hash, model)

Redis Cache

import redis
import json
import hashlib

class RedisCache:
    def __init__(self, redis_url, ttl=3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl

    def _get_key(self, prompt, model):
        content = f"{model}:{prompt}"
        return f"ai_cache:{hashlib.md5(content.encode()).hexdigest()}"

    def get(self, prompt, model):
        key = self._get_key(prompt, model)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None

    def set(self, prompt, model, response):
        key = self._get_key(prompt, model)
        self.redis.setex(key, self.ttl, json.dumps(response))

    def get_or_call(self, prompt, model, call_fn):
        # Check cache
        cached = self.get(prompt, model)
        if cached:
            return cached

        # Call API
        response = call_fn(prompt, model)

        # Cache result
        self.set(prompt, model, response)

        return response

# Usage
cache = RedisCache("redis://localhost:6379")

def get_ai_response(prompt):
    return cache.get_or_call(
        prompt,
        "gpt-4o-mini",
        lambda p, m: client.chat.completions.create(
            model=m,
            messages=[{"role": "user", "content": p}]
        ).choices[0].message.content
    )

Database Cache

import sqlite3
import json
import time

class SQLiteCache:
    def __init__(self, db_path="cache.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS cache (
                key TEXT PRIMARY KEY,
                value TEXT,
                created_at REAL,
                ttl INTEGER
            )
        """)

    def get(self, key):
        cursor = self.conn.execute(
            "SELECT value, created_at, ttl FROM cache WHERE key = ?",
            (key,)
        )
        row = cursor.fetchone()

        if row:
            value, created_at, ttl = row
            if time.time() - created_at < ttl:
                return json.loads(value)
            else:
                # Expired, delete
                self.conn.execute("DELETE FROM cache WHERE key = ?", (key,))
                self.conn.commit()

        return None

    def set(self, key, value, ttl=3600):
        self.conn.execute("""
            INSERT OR REPLACE INTO cache (key, value, created_at, ttl)
            VALUES (?, ?, ?, ?)
        """, (key, json.dumps(value), time.time(), ttl))
        self.conn.commit()

Embedding Caching

Why Cache Embeddings?

Embedding costs:
- text-embedding-3-small: $0.02/1M tokens

For 1M queries × 100 tokens avg:
- Without cache: $2/day = $60/month
- With cache: $0.02 one-time

Savings: 99%+ for repeated texts

Embedding Cache Implementation

import numpy as np
import hashlib

class EmbeddingCache:
    def __init__(self, cache_path="embeddings.npy"):
        self.cache_path = cache_path
        self.cache = self._load_cache()

    def _load_cache(self):
        try:
            return dict(np.load(self.cache_path, allow_pickle=True).item())
        except:
            return {}

    def _save_cache(self):
        np.save(self.cache_path, self.cache)

    def _get_key(self, text):
        return hashlib.md5(text.encode()).hexdigest()

    def get_embedding(self, text):
        key = self._get_key(text)

        if key in self.cache:
            return self.cache[key]

        # Generate embedding
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        embedding = np.array(response.data[0].embedding)

        # Cache
        self.cache[key] = embedding
        self._save_cache()

        return embedding

    def get_embeddings_batch(self, texts):
        """Get embeddings with batch optimization."""
        results = {}
        uncached = []

        # Check cache first
        for text in texts:
            key = self._get_key(text)
            if key in self.cache:
                results[text] = self.cache[key]
            else:
                uncached.append(text)

        # Batch generate uncached
        if uncached:
            response = client.embeddings.create(
                model="text-embedding-3-small",
                input=uncached
            )

            for text, data in zip(uncached, response.data):
                embedding = np.array(data.embedding)
                key = self._get_key(text)
                self.cache[key] = embedding
                results[text] = embedding

            self._save_cache()

        return [results[t] for t in texts]

Semantic Caching

What is Semantic Caching?

Traditional cache: Exact match
"What is Python?" → cached response

Semantic cache: Similar meaning match
"What is Python?" → cached
"Tell me about Python" → same cached response!

ใช้ embeddings เปรียบเทียบความคล้าย

Semantic Cache Implementation

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticCache:
    def __init__(self, similarity_threshold=0.95):
        self.threshold = similarity_threshold
        self.cache = []  # [(embedding, query, response)]

    def _get_embedding(self, text):
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return np.array(response.data[0].embedding)

    def get(self, query):
        if not self.cache:
            return None

        query_embedding = self._get_embedding(query)

        # Find most similar cached query
        best_match = None
        best_score = 0

        for cached_emb, cached_query, cached_response in self.cache:
            similarity = cosine_similarity(
                [query_embedding],
                [cached_emb]
            )[0][0]

            if similarity > best_score:
                best_score = similarity
                best_match = cached_response

        if best_score >= self.threshold:
            return best_match

        return None

    def set(self, query, response):
        embedding = self._get_embedding(query)
        self.cache.append((embedding, query, response))

    def get_or_call(self, query, call_fn):
        # Check semantic cache
        cached = self.get(query)
        if cached:
            return cached

        # Call API
        response = call_fn(query)

        # Cache
        self.set(query, response)

        return response

Using Vector Database for Semantic Cache

import chromadb

class VectorSemanticCache:
    def __init__(self, threshold=0.95):
        self.client = chromadb.Client()
        self.collection = self.client.create_collection("cache")
        self.threshold = threshold

    def get(self, query):
        results = self.collection.query(
            query_texts=[query],
            n_results=1
        )

        if results['distances'][0]:
            # ChromaDB returns L2 distance, convert to similarity
            distance = results['distances'][0][0]
            similarity = 1 / (1 + distance)

            if similarity >= self.threshold:
                return results['metadatas'][0][0]['response']

        return None

    def set(self, query, response):
        self.collection.add(
            documents=[query],
            metadatas=[{"response": response}],
            ids=[str(len(self.collection.get()['ids']))]
        )

Provider Caching

OpenAI Cached Prompts

# OpenAI automatically caches repeated prompts
# No code change needed, just repeated exact prompts
# get 50% discount on input tokens

# To maximize cache hits:
# - Use consistent system prompts
# - Batch similar requests together
# - Avoid unnecessary variations

Claude Prompt Caching

# Explicit cache control in Claude
response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": "Long system prompt with instructions...",
            "cache_control": {"type": "ephemeral"}
        }
    ],
    messages=[{"role": "user", "content": user_input}]
)

# Check cache stats
print(f"Cache created: {response.usage.cache_creation_input_tokens}")
print(f"Cache read: {response.usage.cache_read_input_tokens}")

# Cache read tokens are 90% cheaper!

Cache Invalidation

Time-Based Invalidation

class TTLCache:
    def __init__(self, default_ttl=3600):
        self.cache = {}
        self.default_ttl = default_ttl

    def get(self, key):
        if key in self.cache:
            value, expires_at = self.cache[key]
            if time.time() < expires_at:
                return value
            else:
                del self.cache[key]
        return None

    def set(self, key, value, ttl=None):
        ttl = ttl or self.default_ttl
        expires_at = time.time() + ttl
        self.cache[key] = (value, expires_at)

Event-Based Invalidation

class SmartCache:
    def __init__(self):
        self.cache = {}
        self.tags = {}  # tag -> set of keys

    def set(self, key, value, tags=None):
        self.cache[key] = value

        if tags:
            for tag in tags:
                if tag not in self.tags:
                    self.tags[tag] = set()
                self.tags[tag].add(key)

    def invalidate_by_tag(self, tag):
        """Invalidate all entries with this tag."""
        if tag in self.tags:
            for key in self.tags[tag]:
                if key in self.cache:
                    del self.cache[key]
            del self.tags[tag]

# Usage
cache = SmartCache()
cache.set("user_123_summary", summary, tags=["user_123"])

# When user data changes
cache.invalidate_by_tag("user_123")

Best Practices

1. Cache Hit Rate Monitoring

class MonitoredCache:
    def __init__(self, cache):
        self.cache = cache
        self.hits = 0
        self.misses = 0

    def get(self, key):
        result = self.cache.get(key)
        if result:
            self.hits += 1
        else:
            self.misses += 1
        return result

    def get_stats(self):
        total = self.hits + self.misses
        hit_rate = self.hits / total if total > 0 else 0
        return {
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": hit_rate
        }

2. Layered Caching

class LayeredCache:
    def __init__(self):
        self.l1 = {}  # Memory (fast)
        self.l2 = RedisCache()  # Redis (persistent)

    def get(self, key):
        # Try L1 first
        if key in self.l1:
            return self.l1[key]

        # Try L2
        value = self.l2.get(key)
        if value:
            self.l1[key] = value  # Promote to L1
            return value

        return None

    def set(self, key, value):
        self.l1[key] = value
        self.l2.set(key, value)

3. Async Cache

import asyncio
import aioredis

class AsyncCache:
    def __init__(self, redis_url):
        self.redis = None
        self.redis_url = redis_url

    async def connect(self):
        self.redis = await aioredis.from_url(self.redis_url)

    async def get(self, key):
        return await self.redis.get(key)

    async def set(self, key, value, ttl=3600):
        await self.redis.setex(key, ttl, value)

สรุป

Caching Strategies:

  1. Response Cache: Cache complete responses
  2. Embedding Cache: Cache vectors
  3. Semantic Cache: Match by meaning
  4. Provider Cache: Use built-in caching

Best Practices:

  • Monitor cache hit rates
  • Use layered caching
  • Implement proper invalidation
  • Consider semantic similarity

Benefits:

  • 50-90% cost reduction
  • 10-100x faster responses
  • Better reliability
  • Rate limit management

อ่านเพิ่มเติม:


เขียนโดย

AI Unlocked Team