AI Integration
Rate Limiting
API
Best Practices
Reliability

AI Rate Limiting: จัดการ API Limits อย่างมืออาชีพ

เรียนรู้วิธีจัดการ rate limits ของ AI APIs ตั้งแต่ retry strategies, queuing, backoff algorithms ไปจนถึง best practices

AI Unlocked Team
21/01/2568
AI Rate Limiting: จัดการ API Limits อย่างมืออาชีพ

AI Rate Limiting: จัดการ API Limits อย่างมืออาชีพ

Rate limits เป็นความท้าทายที่ต้องจัดการเมื่อใช้ AI APIs ในระดับ production เรียนรู้วิธีรับมืออย่างมืออาชีพ

Understanding Rate Limits

ทำไมต้องมี Rate Limits?

เหตุผลที่ providers กำหนด limits:

1. Prevent Abuse
   - ป้องกัน spam/misuse
   - Fair usage

2. System Stability
   - ป้องกัน overload
   - Quality of service

3. Cost Management
   - ควบคุมค่าใช้จ่าย
   - Usage monitoring

Types of Rate Limits

1. Requests Per Minute (RPM)
   - จำนวน requests ต่อนาที
   - เช่น 500 RPM

2. Tokens Per Minute (TPM)
   - จำนวน tokens ต่อนาที
   - เช่น 150,000 TPM

3. Tokens Per Day (TPD)
   - จำนวน tokens ต่อวัน
   - เช่น 1,000,000 TPD

4. Concurrent Requests
   - จำนวน requests พร้อมกัน
   - เช่น 25 concurrent

OpenAI Rate Limits

GPT-4o (Tier 1):
- 500 RPM
- 30,000 TPM

GPT-4o-mini (Tier 1):
- 500 RPM
- 200,000 TPM

Higher tiers = Higher limits
(Based on usage and payment history)

Claude Rate Limits

Claude (Standard):
- Requests: Based on tier
- Tokens: Based on tier

Rate limits vary by:
- Account type
- Model used
- Region

Handling Rate Limits

Basic Retry with Backoff

import time
from openai import OpenAI, RateLimitError

client = OpenAI()

def call_with_retry(messages, max_retries=5):
    """Call API with exponential backoff on rate limit."""

    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages
            )
            return response

        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise

            # Exponential backoff
            wait_time = (2 ** attempt) + random.random()
            print(f"Rate limited. Waiting {wait_time:.2f}s...")
            time.sleep(wait_time)

    raise Exception("Max retries exceeded")

Using Tenacity Library

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from openai import RateLimitError

@retry(
    retry=retry_if_exception_type(RateLimitError),
    wait=wait_exponential(multiplier=1, min=4, max=60),
    stop=stop_after_attempt(5)
)
def call_api(prompt):
    return client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )

Reading Retry Headers

from openai import RateLimitError

def call_with_header_retry(messages):
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages
        )
        return response

    except RateLimitError as e:
        # Read retry-after header
        retry_after = e.response.headers.get("retry-after")

        if retry_after:
            wait_time = int(retry_after)
        else:
            wait_time = 60  # Default

        print(f"Rate limited. Retry after {wait_time}s")
        time.sleep(wait_time)

        # Retry
        return call_with_header_retry(messages)

Advanced Strategies

Token Bucket Rate Limiter

import time
import threading

class TokenBucket:
    def __init__(self, tokens_per_second, max_tokens):
        self.tokens_per_second = tokens_per_second
        self.max_tokens = max_tokens
        self.tokens = max_tokens
        self.last_update = time.time()
        self.lock = threading.Lock()

    def acquire(self, tokens=1):
        with self.lock:
            now = time.time()
            # Add tokens based on time passed
            elapsed = now - self.last_update
            self.tokens = min(
                self.max_tokens,
                self.tokens + elapsed * self.tokens_per_second
            )
            self.last_update = now

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def wait_for_tokens(self, tokens=1):
        while not self.acquire(tokens):
            time.sleep(0.1)

# Usage
rate_limiter = TokenBucket(
    tokens_per_second=8,  # ~500 RPM
    max_tokens=10
)

def call_api(prompt):
    rate_limiter.wait_for_tokens()
    return client.chat.completions.create(...)

Request Queue

import asyncio
from collections import deque

class RequestQueue:
    def __init__(self, max_rpm=500, max_concurrent=25):
        self.max_rpm = max_rpm
        self.max_concurrent = max_concurrent
        self.queue = deque()
        self.request_times = deque()
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def _clean_old_requests(self):
        now = time.time()
        while self.request_times and now - self.request_times[0] > 60:
            self.request_times.popleft()

    async def _wait_for_capacity(self):
        await self._clean_old_requests()

        while len(self.request_times) >= self.max_rpm:
            await asyncio.sleep(0.1)
            await self._clean_old_requests()

    async def execute(self, coro):
        await self._wait_for_capacity()

        async with self.semaphore:
            self.request_times.append(time.time())
            return await coro

# Usage
queue = RequestQueue(max_rpm=500)

async def make_request(prompt):
    return await queue.execute(
        async_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
    )

Distributed Rate Limiting (Redis)

import redis
import time

class DistributedRateLimiter:
    def __init__(self, redis_url, key_prefix, max_rpm):
        self.redis = redis.from_url(redis_url)
        self.key_prefix = key_prefix
        self.max_rpm = max_rpm

    def acquire(self):
        key = f"{self.key_prefix}:requests"
        now = time.time()
        window_start = now - 60

        pipe = self.redis.pipeline()

        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)

        # Count current requests
        pipe.zcard(key)

        # Add new request
        pipe.zadd(key, {str(now): now})

        # Set expiry
        pipe.expire(key, 61)

        results = pipe.execute()
        count = results[1]

        if count >= self.max_rpm:
            # Remove the request we just added
            self.redis.zrem(key, str(now))
            return False

        return True

    def wait_for_capacity(self):
        while not self.acquire():
            time.sleep(0.1)

Request Prioritization

Priority Queue

import heapq
import asyncio

class PriorityRequestQueue:
    def __init__(self, max_rpm=500):
        self.max_rpm = max_rpm
        self.queue = []
        self.counter = 0

    async def enqueue(self, request, priority=5):
        """
        Priority: 1 = highest, 10 = lowest
        """
        self.counter += 1
        heapq.heappush(
            self.queue,
            (priority, self.counter, request)
        )

    async def process(self):
        while True:
            if self.queue:
                _, _, request = heapq.heappop(self.queue)
                await self._execute(request)
            await asyncio.sleep(60 / self.max_rpm)

# Usage
queue = PriorityRequestQueue()

# High priority request
await queue.enqueue(important_request, priority=1)

# Normal request
await queue.enqueue(normal_request, priority=5)

Batch by Priority

class BatchProcessor:
    def __init__(self, batch_size=10, max_wait=5):
        self.batch_size = batch_size
        self.max_wait = max_wait
        self.high_priority = []
        self.normal_priority = []

    async def add(self, request, high_priority=False):
        if high_priority:
            self.high_priority.append(request)
        else:
            self.normal_priority.append(request)

        # Process high priority immediately if batch ready
        if len(self.high_priority) >= self.batch_size:
            await self._process_batch(self.high_priority)
            self.high_priority = []

    async def flush(self):
        # Process high priority first
        if self.high_priority:
            await self._process_batch(self.high_priority)
            self.high_priority = []

        # Then normal
        if self.normal_priority:
            await self._process_batch(self.normal_priority)
            self.normal_priority = []

Monitoring & Alerting

Track Rate Limit Events

import logging
from datetime import datetime

class RateLimitMonitor:
    def __init__(self):
        self.events = []
        self.logger = logging.getLogger(__name__)

    def log_rate_limit(self, endpoint, wait_time):
        event = {
            "timestamp": datetime.now(),
            "endpoint": endpoint,
            "wait_time": wait_time
        }
        self.events.append(event)
        self.logger.warning(f"Rate limited on {endpoint}. Wait: {wait_time}s")

        # Alert if too many events
        recent = [e for e in self.events
                  if (datetime.now() - e["timestamp"]).seconds < 300]

        if len(recent) > 10:
            self.alert("High rate limit frequency")

    def alert(self, message):
        # Send to Slack, PagerDuty, etc.
        self.logger.critical(f"ALERT: {message}")

Usage Dashboard

class UsageTracker:
    def __init__(self):
        self.requests = []
        self.tokens = []

    def log_request(self, tokens_used):
        now = datetime.now()
        self.requests.append(now)
        self.tokens.append({"time": now, "tokens": tokens_used})

    def get_current_rpm(self):
        now = datetime.now()
        minute_ago = now - timedelta(minutes=1)
        return len([r for r in self.requests if r > minute_ago])

    def get_current_tpm(self):
        now = datetime.now()
        minute_ago = now - timedelta(minutes=1)
        return sum(
            t["tokens"] for t in self.tokens
            if t["time"] > minute_ago
        )

    def get_utilization(self, max_rpm, max_tpm):
        return {
            "rpm_utilization": self.get_current_rpm() / max_rpm,
            "tpm_utilization": self.get_current_tpm() / max_tpm
        }

Best Practices

1. Implement Graceful Degradation

async def get_response(prompt, timeout=30):
    try:
        return await asyncio.wait_for(
            call_api_with_retry(prompt),
            timeout=timeout
        )
    except (RateLimitError, asyncio.TimeoutError):
        # Fallback to cached/default response
        return get_fallback_response(prompt)

2. Pre-estimate Token Usage

import tiktoken

def estimate_tokens(text, model="gpt-4o-mini"):
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def can_process(prompts, max_tpm):
    total_tokens = sum(estimate_tokens(p) for p in prompts)
    return total_tokens < max_tpm

3. Use Multiple API Keys

class KeyRotator:
    def __init__(self, api_keys):
        self.keys = api_keys
        self.current_index = 0
        self.rate_limited = {}

    def get_key(self):
        # Skip rate-limited keys
        for _ in range(len(self.keys)):
            key = self.keys[self.current_index]
            self.current_index = (self.current_index + 1) % len(self.keys)

            if key not in self.rate_limited:
                return key

            # Check if cooldown expired
            if time.time() > self.rate_limited[key]:
                del self.rate_limited[key]
                return key

        raise Exception("All keys rate limited")

    def mark_rate_limited(self, key, cooldown=60):
        self.rate_limited[key] = time.time() + cooldown

สรุป

Rate Limit Strategies:

  1. Retry Logic: Exponential backoff
  2. Token Bucket: Smooth request flow
  3. Request Queue: Ordered processing
  4. Monitoring: Track and alert

Best Practices:

  • Implement graceful degradation
  • Pre-estimate token usage
  • Use multiple API keys
  • Monitor utilization

Remember:

  • Rate limits protect everyone
  • Plan for limits in architecture
  • Test with realistic load
  • Have fallback strategies

อ่านเพิ่มเติม:


เขียนโดย

AI Unlocked Team