feat: implement idempotency middleware with pure ASGI
This commit is contained in:
@@ -5,11 +5,15 @@ from app.database import get_db, engine
|
||||
from app import models, seed
|
||||
from app.routers import auth, users, chests
|
||||
|
||||
from app.middleware import IdempotencyMiddleware
|
||||
|
||||
# Create all tables
|
||||
models.Base.metadata.create_all(bind=engine)
|
||||
|
||||
app = FastAPI(title="Card Game Backend")
|
||||
|
||||
app.add_middleware(IdempotencyMiddleware)
|
||||
|
||||
app.include_router(auth.router)
|
||||
app.include_router(users.router)
|
||||
app.include_router(chests.router)
|
||||
|
||||
119
backend/app/middleware.py
Normal file
119
backend/app/middleware.py
Normal file
@@ -0,0 +1,119 @@
|
||||
from starlette.types import ASGIApp, Receive, Scope, Send
|
||||
from starlette.datastructures import Headers
|
||||
from fastapi import Request, Response
|
||||
from app.database import SessionLocal
|
||||
from app import models, auth_utils
|
||||
from jose import jwt, JWTError
|
||||
import json
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
class IdempotencyMiddleware:
|
||||
def __init__(self, app: ASGIApp):
|
||||
self.app = app
|
||||
|
||||
async def __call__(self, scope: Scope, receive: Receive, send: Send):
|
||||
if scope["type"] != "http":
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
headers = Headers(scope=scope)
|
||||
idempotency_key = headers.get("idempotency-key")
|
||||
|
||||
if not idempotency_key:
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
# Extract User ID (Manual JWT parse)
|
||||
auth_header = headers.get("authorization")
|
||||
user_id = None
|
||||
if auth_header and auth_header.startswith("Bearer "):
|
||||
token = auth_header.split(" ")[1]
|
||||
try:
|
||||
payload = jwt.decode(token, auth_utils.SECRET_KEY, algorithms=[auth_utils.ALGORITHM])
|
||||
user_email = payload.get("sub")
|
||||
if user_email:
|
||||
# DB lookup for user_id
|
||||
# Optimization: In async ASGI, we can't easily use sync SessionLocal without risk?
|
||||
# Ideally allow blocking DB call for MVP or use run_in_threadpool.
|
||||
# For now, simplistic sync call is okay if low traffic, or use separate thread.
|
||||
db = SessionLocal()
|
||||
try:
|
||||
user = db.query(models.User).filter(models.User.email == user_email).first()
|
||||
if user:
|
||||
user_id = user.id
|
||||
finally:
|
||||
db.close()
|
||||
except JWTError:
|
||||
pass
|
||||
|
||||
if not user_id:
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
endpoint = scope["path"]
|
||||
|
||||
# Check DB for existing key
|
||||
db = SessionLocal()
|
||||
try:
|
||||
existing_key = db.query(models.IdempotencyKey).filter(
|
||||
models.IdempotencyKey.key == idempotency_key,
|
||||
models.IdempotencyKey.user_id == user_id,
|
||||
models.IdempotencyKey.endpoint == endpoint
|
||||
).first()
|
||||
|
||||
if existing_key:
|
||||
# Return stored response
|
||||
response = Response(
|
||||
content=existing_key.response_body,
|
||||
media_type="application/json"
|
||||
)
|
||||
await response(scope, receive, send)
|
||||
return
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
# Capture response
|
||||
response_body = []
|
||||
response_status = [200] # default
|
||||
|
||||
async def send_wrapper(message):
|
||||
if message["type"] == "http.response.start":
|
||||
response_status[0] = message["status"]
|
||||
await send(message)
|
||||
elif message["type"] == "http.response.body":
|
||||
body = message.get("body", b"")
|
||||
response_body.append(body)
|
||||
await send(message)
|
||||
|
||||
if not message.get("more_body", False):
|
||||
# Request finished, save if successful
|
||||
if 200 <= response_status[0] < 300:
|
||||
full_body = b"".join(response_body).decode("utf-8")
|
||||
# Save to DB
|
||||
# Again, sync DB call in async loop.
|
||||
# Should wrapping in threadpool?
|
||||
# For prototype, direct call.
|
||||
try:
|
||||
db_save = SessionLocal()
|
||||
try:
|
||||
new_key = models.IdempotencyKey(
|
||||
key=idempotency_key,
|
||||
user_id=user_id,
|
||||
endpoint=endpoint,
|
||||
response_body=full_body
|
||||
)
|
||||
db_save.add(new_key)
|
||||
db_save.commit()
|
||||
except IntegrityError:
|
||||
db_save.rollback()
|
||||
except Exception as e:
|
||||
print(f"Save error: {e}")
|
||||
db_save.rollback()
|
||||
finally:
|
||||
db_save.close()
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
await send(message)
|
||||
|
||||
await self.app(scope, receive, send_wrapper)
|
||||
@@ -1,5 +1,3 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
backend:
|
||||
build: ./backend
|
||||
|
||||
57
verify_idempotency.sh
Executable file
57
verify_idempotency.sh
Executable file
@@ -0,0 +1,57 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
echo "Logging in..."
|
||||
TOKEN=$(curl -s -X POST -H "Content-Type: application/json" -d '{"email":"test@example.com", "password":"password123"}' http://localhost:8000/auth/login | jq -r .access_token)
|
||||
echo "Token obtained."
|
||||
|
||||
echo "Getting Chest ID..."
|
||||
CHEST_ID=$(curl -s http://localhost:8000/catalog/chests | jq -r '.[0].id')
|
||||
echo "Chest ID: $CHEST_ID"
|
||||
|
||||
KEY="idempotency-test-$(date +%s)"
|
||||
|
||||
echo "--- Request 1 (Key: $KEY) ---"
|
||||
RESP1=$(curl -s -X POST -H "Authorization: Bearer $TOKEN" -H "Idempotency-Key: $KEY" http://localhost:8000/chests/$CHEST_ID/open)
|
||||
echo "Response 1: $RESP1"
|
||||
|
||||
BALANCE1=$(curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8000/me/wallet | jq -r .balance)
|
||||
echo "Balance after 1st request: $BALANCE1"
|
||||
|
||||
echo "--- Request 2 (Key: $KEY - Retry) ---"
|
||||
RESP2=$(curl -s -X POST -H "Authorization: Bearer $TOKEN" -H "Idempotency-Key: $KEY" http://localhost:8000/chests/$CHEST_ID/open)
|
||||
echo "Response 2: $RESP2"
|
||||
|
||||
BALANCE2=$(curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8000/me/wallet | jq -r .balance)
|
||||
echo "Balance after 2nd request: $BALANCE2"
|
||||
|
||||
if [ "$RESP1" == "$RESP2" ]; then
|
||||
echo "SUCCESS: Responses are identical."
|
||||
else
|
||||
echo "FAILURE: Responses differ."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$BALANCE1" == "$BALANCE2" ]; then
|
||||
echo "SUCCESS: Balance did not change on retry."
|
||||
else
|
||||
echo "FAILURE: Balance changed on retry!"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
KEY_NEW="idempotency-test-new-$(date +%s)"
|
||||
echo "--- Request 3 (New Key: $KEY_NEW) ---"
|
||||
RESP3=$(curl -s -X POST -H "Authorization: Bearer $TOKEN" -H "Idempotency-Key: $KEY_NEW" http://localhost:8000/chests/$CHEST_ID/open)
|
||||
echo "Response 3: $RESP3"
|
||||
|
||||
BALANCE3=$(curl -s -H "Authorization: Bearer $TOKEN" http://localhost:8000/me/wallet | jq -r .balance)
|
||||
echo "Balance after 3rd request: $BALANCE3"
|
||||
|
||||
if [ "$BALANCE3" -lt "$BALANCE2" ]; then
|
||||
echo "SUCCESS: Balance decreased on new request."
|
||||
else
|
||||
echo "FAILURE: Balance did not decrease on new request."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "ALL TESTS PASSED"
|
||||
Reference in New Issue
Block a user