AI Rate Limiting: จัดการ API Limits อย่างมืออาชีพ
Rate limits เป็นความท้าทายที่ต้องจัดการเมื่อใช้ AI APIs ในระดับ production เรียนรู้วิธีรับมืออย่างมืออาชีพ
Understanding Rate Limits
ทำไมต้องมี Rate Limits?
เหตุผลที่ providers กำหนด limits:
1. Prevent Abuse
- ป้องกัน spam/misuse
- Fair usage
2. System Stability
- ป้องกัน overload
- Quality of service
3. Cost Management
- ควบคุมค่าใช้จ่าย
- Usage monitoring
Types of Rate Limits
1. Requests Per Minute (RPM)
- จำนวน requests ต่อนาที
- เช่น 500 RPM
2. Tokens Per Minute (TPM)
- จำนวน tokens ต่อนาที
- เช่น 150,000 TPM
3. Tokens Per Day (TPD)
- จำนวน tokens ต่อวัน
- เช่น 1,000,000 TPD
4. Concurrent Requests
- จำนวน requests พร้อมกัน
- เช่น 25 concurrent
OpenAI Rate Limits
GPT-4o (Tier 1):
- 500 RPM
- 30,000 TPM
GPT-4o-mini (Tier 1):
- 500 RPM
- 200,000 TPM
Higher tiers = Higher limits
(Based on usage and payment history)
Claude Rate Limits
Claude (Standard):
- Requests: Based on tier
- Tokens: Based on tier
Rate limits vary by:
- Account type
- Model used
- Region
Handling Rate Limits
Basic Retry with Backoff
import time
from openai import OpenAI, RateLimitError
client = OpenAI()
def call_with_retry(messages, max_retries=5):
"""Call API with exponential backoff on rate limit."""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff
wait_time = (2 ** attempt) + random.random()
print(f"Rate limited. Waiting {wait_time:.2f}s...")
time.sleep(wait_time)
raise Exception("Max retries exceeded")
Using Tenacity Library
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from openai import RateLimitError
@retry(
retry=retry_if_exception_type(RateLimitError),
wait=wait_exponential(multiplier=1, min=4, max=60),
stop=stop_after_attempt(5)
)
def call_api(prompt):
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
Reading Retry Headers
from openai import RateLimitError
def call_with_header_retry(messages):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
return response
except RateLimitError as e:
# Read retry-after header
retry_after = e.response.headers.get("retry-after")
if retry_after:
wait_time = int(retry_after)
else:
wait_time = 60 # Default
print(f"Rate limited. Retry after {wait_time}s")
time.sleep(wait_time)
# Retry
return call_with_header_retry(messages)
Advanced Strategies
Token Bucket Rate Limiter
import time
import threading
class TokenBucket:
def __init__(self, tokens_per_second, max_tokens):
self.tokens_per_second = tokens_per_second
self.max_tokens = max_tokens
self.tokens = max_tokens
self.last_update = time.time()
self.lock = threading.Lock()
def acquire(self, tokens=1):
with self.lock:
now = time.time()
# Add tokens based on time passed
elapsed = now - self.last_update
self.tokens = min(
self.max_tokens,
self.tokens + elapsed * self.tokens_per_second
)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_for_tokens(self, tokens=1):
while not self.acquire(tokens):
time.sleep(0.1)
# Usage
rate_limiter = TokenBucket(
tokens_per_second=8, # ~500 RPM
max_tokens=10
)
def call_api(prompt):
rate_limiter.wait_for_tokens()
return client.chat.completions.create(...)
Request Queue
import asyncio
from collections import deque
class RequestQueue:
def __init__(self, max_rpm=500, max_concurrent=25):
self.max_rpm = max_rpm
self.max_concurrent = max_concurrent
self.queue = deque()
self.request_times = deque()
self.semaphore = asyncio.Semaphore(max_concurrent)
async def _clean_old_requests(self):
now = time.time()
while self.request_times and now - self.request_times[0] > 60:
self.request_times.popleft()
async def _wait_for_capacity(self):
await self._clean_old_requests()
while len(self.request_times) >= self.max_rpm:
await asyncio.sleep(0.1)
await self._clean_old_requests()
async def execute(self, coro):
await self._wait_for_capacity()
async with self.semaphore:
self.request_times.append(time.time())
return await coro
# Usage
queue = RequestQueue(max_rpm=500)
async def make_request(prompt):
return await queue.execute(
async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
)
Distributed Rate Limiting (Redis)
import redis
import time
class DistributedRateLimiter:
def __init__(self, redis_url, key_prefix, max_rpm):
self.redis = redis.from_url(redis_url)
self.key_prefix = key_prefix
self.max_rpm = max_rpm
def acquire(self):
key = f"{self.key_prefix}:requests"
now = time.time()
window_start = now - 60
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current requests
pipe.zcard(key)
# Add new request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, 61)
results = pipe.execute()
count = results[1]
if count >= self.max_rpm:
# Remove the request we just added
self.redis.zrem(key, str(now))
return False
return True
def wait_for_capacity(self):
while not self.acquire():
time.sleep(0.1)
Request Prioritization
Priority Queue
import heapq
import asyncio
class PriorityRequestQueue:
def __init__(self, max_rpm=500):
self.max_rpm = max_rpm
self.queue = []
self.counter = 0
async def enqueue(self, request, priority=5):
"""
Priority: 1 = highest, 10 = lowest
"""
self.counter += 1
heapq.heappush(
self.queue,
(priority, self.counter, request)
)
async def process(self):
while True:
if self.queue:
_, _, request = heapq.heappop(self.queue)
await self._execute(request)
await asyncio.sleep(60 / self.max_rpm)
# Usage
queue = PriorityRequestQueue()
# High priority request
await queue.enqueue(important_request, priority=1)
# Normal request
await queue.enqueue(normal_request, priority=5)
Batch by Priority
class BatchProcessor:
def __init__(self, batch_size=10, max_wait=5):
self.batch_size = batch_size
self.max_wait = max_wait
self.high_priority = []
self.normal_priority = []
async def add(self, request, high_priority=False):
if high_priority:
self.high_priority.append(request)
else:
self.normal_priority.append(request)
# Process high priority immediately if batch ready
if len(self.high_priority) >= self.batch_size:
await self._process_batch(self.high_priority)
self.high_priority = []
async def flush(self):
# Process high priority first
if self.high_priority:
await self._process_batch(self.high_priority)
self.high_priority = []
# Then normal
if self.normal_priority:
await self._process_batch(self.normal_priority)
self.normal_priority = []
Monitoring & Alerting
Track Rate Limit Events
import logging
from datetime import datetime
class RateLimitMonitor:
def __init__(self):
self.events = []
self.logger = logging.getLogger(__name__)
def log_rate_limit(self, endpoint, wait_time):
event = {
"timestamp": datetime.now(),
"endpoint": endpoint,
"wait_time": wait_time
}
self.events.append(event)
self.logger.warning(f"Rate limited on {endpoint}. Wait: {wait_time}s")
# Alert if too many events
recent = [e for e in self.events
if (datetime.now() - e["timestamp"]).seconds < 300]
if len(recent) > 10:
self.alert("High rate limit frequency")
def alert(self, message):
# Send to Slack, PagerDuty, etc.
self.logger.critical(f"ALERT: {message}")
Usage Dashboard
class UsageTracker:
def __init__(self):
self.requests = []
self.tokens = []
def log_request(self, tokens_used):
now = datetime.now()
self.requests.append(now)
self.tokens.append({"time": now, "tokens": tokens_used})
def get_current_rpm(self):
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
return len([r for r in self.requests if r > minute_ago])
def get_current_tpm(self):
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
return sum(
t["tokens"] for t in self.tokens
if t["time"] > minute_ago
)
def get_utilization(self, max_rpm, max_tpm):
return {
"rpm_utilization": self.get_current_rpm() / max_rpm,
"tpm_utilization": self.get_current_tpm() / max_tpm
}
Best Practices
1. Implement Graceful Degradation
async def get_response(prompt, timeout=30):
try:
return await asyncio.wait_for(
call_api_with_retry(prompt),
timeout=timeout
)
except (RateLimitError, asyncio.TimeoutError):
# Fallback to cached/default response
return get_fallback_response(prompt)
2. Pre-estimate Token Usage
import tiktoken
def estimate_tokens(text, model="gpt-4o-mini"):
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def can_process(prompts, max_tpm):
total_tokens = sum(estimate_tokens(p) for p in prompts)
return total_tokens < max_tpm
3. Use Multiple API Keys
class KeyRotator:
def __init__(self, api_keys):
self.keys = api_keys
self.current_index = 0
self.rate_limited = {}
def get_key(self):
# Skip rate-limited keys
for _ in range(len(self.keys)):
key = self.keys[self.current_index]
self.current_index = (self.current_index + 1) % len(self.keys)
if key not in self.rate_limited:
return key
# Check if cooldown expired
if time.time() > self.rate_limited[key]:
del self.rate_limited[key]
return key
raise Exception("All keys rate limited")
def mark_rate_limited(self, key, cooldown=60):
self.rate_limited[key] = time.time() + cooldown
สรุป
Rate Limit Strategies:
- Retry Logic: Exponential backoff
- Token Bucket: Smooth request flow
- Request Queue: Ordered processing
- Monitoring: Track and alert
Best Practices:
- Implement graceful degradation
- Pre-estimate token usage
- Use multiple API keys
- Monitor utilization
Remember:
- Rate limits protect everyone
- Plan for limits in architecture
- Test with realistic load
- Have fallback strategies
อ่านเพิ่มเติม:
เขียนโดย
AI Unlocked Team
บทความอื่นๆ ที่น่าสนใจ
วิธีติดตั้ง FFmpeg บน Windows และ Mac: คู่มือฉบับสมบูรณ์
เรียนรู้วิธีติดตั้ง FFmpeg บน Windows และ macOS พร้อมการตั้งค่า PATH อย่างละเอียด เพื่อใช้งานโปรแกรมตัดต่อวิดีโอและเสียงระดับมืออาชีพ
04/12/2568
สร้าง AI-Powered SaaS: จากไอเดียสู่ผลิตภัณฑ์
คู่มือครบวงจรในการสร้าง AI-Powered SaaS ตั้งแต่การวางแผน พัฒนา ไปจนถึง launch และ scale รวมถึง tech stack, pricing และ business model
03/02/2568
AI Security: วิธีใช้ AI อย่างปลอดภัย
เรียนรู้แนวทางการใช้ AI อย่างปลอดภัย ครอบคลุม prompt injection, data privacy, API security และ best practices สำหรับองค์กร
02/02/2568