95 lines
3.1 KiB
Python
95 lines
3.1 KiB
Python
import time
|
|
import sqlite3
|
|
from functools import wraps
|
|
from flask import g, jsonify, request, current_app
|
|
|
|
rate_limit_store = {}
|
|
RATE_LIMIT_WINDOW = 3600
|
|
RATE_LIMIT_MAX_ATTEMPTS = 10
|
|
|
|
SYSTEM_BADGES = {
|
|
"Beluga": "Dev",
|
|
"Admin": "Admin",
|
|
"TikyGaming": "Admin",
|
|
"idkk_cali": "Dev",
|
|
"ReBeluga": "Bot"
|
|
}
|
|
|
|
def get_user_badge(username):
|
|
return SYSTEM_BADGES.get(username)
|
|
|
|
def get_db():
|
|
from database import get_db as database_get_db
|
|
return database_get_db(current_app)
|
|
|
|
def rate_limit(key_func=None, max_attempts=None, window=None):
|
|
max_attempts = max_attempts or RATE_LIMIT_MAX_ATTEMPTS
|
|
window = window or RATE_LIMIT_WINDOW
|
|
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
key = key_func() if key_func else request.remote_addr
|
|
now = time.time()
|
|
|
|
if key not in rate_limit_store:
|
|
rate_limit_store[key] = []
|
|
|
|
rate_limit_store[key] = [t for t in rate_limit_store[key] if now - t < window]
|
|
|
|
if len(rate_limit_store[key]) >= max_attempts:
|
|
return jsonify({'error': 'Too many requests. Please try again later.'}), 429
|
|
|
|
rate_limit_store[key].append(now)
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
return decorator
|
|
|
|
def token_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
token = request.headers.get('Authorization') or request.cookies.get('auth_token')
|
|
if not token:
|
|
return jsonify({'error': 'Token is missing'}), 401
|
|
|
|
# Strip "Bearer " prefix if present
|
|
if token.startswith('Bearer '):
|
|
token = token[7:]
|
|
|
|
db = get_db()
|
|
user = db.execute('SELECT * FROM users WHERE token = ?', (token,)).fetchone()
|
|
if not user:
|
|
return jsonify({'error': 'Invalid token'}), 401
|
|
|
|
g.user = dict(user)
|
|
if g.user.get('is_banned', 0) == 1:
|
|
return jsonify({'error': 'Account banned'}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
def role_required(min_role):
|
|
"""
|
|
Role hierarchy: Dev (1) > Admin (2) > None (0)
|
|
Higher roles (lower numbers) inherit permissions of lower roles.
|
|
"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not hasattr(g, 'user'):
|
|
return jsonify({'error': 'No user context'}), 401
|
|
|
|
username = g.user.get('username')
|
|
badge = get_user_badge(username)
|
|
|
|
# 1 is highest, 2 is lower.
|
|
role_order = {'Dev': 1, 'Admin': 2}
|
|
user_level = role_order.get(badge, 99) # 99 for regular users
|
|
required_level = role_order.get(min_role, 99)
|
|
|
|
# Dev (1) passes check for Admin (2) because 1 <= 2
|
|
if user_level <= required_level:
|
|
return f(*args, **kwargs)
|
|
|
|
return jsonify({'error': f'Insufficient permissions. Required: {min_role}'}), 403
|
|
return decorated
|
|
return decorator |