Dnishe/blueprints/users.py
2026-06-08 21:56:14 +00:00

173 lines
6 KiB
Python

"""User profile, friends, and user listing routes."""
import sqlite3
from flask import Blueprint, g, jsonify, request, current_app
from database import get_db
from decorators import token_required, get_user_badge
from werkzeug.utils import secure_filename
from PIL import Image
import os
users_bp = Blueprint('users', __name__, url_prefix='/api')
AVATAR_SIZE = 520
def process_avatar_upload(file, user_id, current_app):
"""Process and store avatar file."""
avatar_dir = os.path.join(current_app.root_path, 'static', 'avatars')
os.makedirs(avatar_dir, exist_ok=True)
img = Image.open(file.stream)
w, h = img.size
side = min(w, h)
left = (w - side) // 2
top = (h - side) // 2
img = img.crop((left, top, left + side, top + side))
if side > AVATAR_SIZE:
img = img.resize((AVATAR_SIZE, AVATAR_SIZE), Image.LANCZOS)
fname = f"avatar_{user_id}.png"
path = os.path.join(avatar_dir, fname)
img.save(path, format='PNG')
return f'/static/avatars/{fname}'
@users_bp.route('/users', methods=['GET'])
@token_required
def get_users():
"""Get all users with their basic info."""
users = get_db(current_app).execute('SELECT id, username FROM users').fetchall()
result = []
for u in users:
user_dict = {'id': u['id'], 'username': u['username'], 'badge': get_user_badge(u['username'])}
result.append(user_dict)
return jsonify(result), 200
@users_bp.route('/users/<int:user_id>', methods=['GET'])
@token_required
def get_user_profile(user_id):
"""Get user profile by ID."""
user = get_db(current_app).execute(
'SELECT id, username, avatar_url, description, pronouns FROM users WHERE id = ?',
(user_id,)
).fetchone()
if not user:
return jsonify({'error': 'User not found'}), 404
user_dict = dict(user)
user_dict['badge'] = get_user_badge(user['username'])
return jsonify(user_dict), 200
@users_bp.route('/profile', methods=['GET'])
@token_required
def get_profile():
"""Get current user's profile."""
user = g.user.copy()
user.pop('password', None)
user.pop('token', None)
user['badge'] = get_user_badge(user.get('username'))
return jsonify(user), 200
@users_bp.route('/profile', methods=['POST'])
@token_required
def update_profile():
"""Update current user's profile."""
avatar = None
descr = None
pron = None
if 'avatar_file' in request.files and request.files['avatar_file'].filename:
avatar = process_avatar_upload(request.files['avatar_file'], g.user['id'], current_app)
if request.form:
descr = request.form.get('description')
pron = request.form.get('pronouns')
else:
data = request.json or {}
descr = data.get('description')
pron = data.get('pronouns')
avatar = avatar or data.get('avatar_url')
# Preserve existing avatar if no new one provided
current_avatar = g.user.get('avatar_url', '')
final_avatar = avatar if avatar is not None else current_avatar
db = get_db(current_app)
db.execute('UPDATE users SET avatar_url = ?, description = ?, pronouns = ? WHERE id = ?',
(final_avatar, descr or '', pron or '', g.user['id']))
db.commit()
user = db.execute('SELECT id, username, avatar_url, description, pronouns FROM users WHERE id = ?',
(g.user['id'],)).fetchone()
return jsonify(dict(user)), 200
@users_bp.route('/friends', methods=['GET'])
@token_required
def get_friends():
"""Get current user's accepted friends."""
db = get_db(current_app)
friends = db.execute('''
SELECT u.id, u.username, u.avatar_url, u.pronouns FROM friends f
JOIN users u ON f.friend_id = u.id WHERE f.user_id = ? AND f.status = 'accepted'
UNION
SELECT u.id, u.username, u.avatar_url, u.pronouns FROM friends f
JOIN users u ON f.user_id = u.id WHERE f.friend_id = ? AND f.status = 'accepted'
''', (g.user['id'], g.user['id'])).fetchall()
result = []
for f in friends:
fd = dict(f)
fd['badge'] = get_user_badge(f['username'])
result.append(fd)
return jsonify(result), 200
@users_bp.route('/friend-requests', methods=['GET'])
@token_required
def get_friend_requests():
"""Get pending friend requests for current user."""
reqs = get_db(current_app).execute(
'SELECT f.id, u.id as user_id, u.username FROM friends f JOIN users u ON f.user_id = u.id WHERE f.friend_id = ? AND f.status = "pending"',
(g.user['id'],)
).fetchall()
return jsonify([dict(r) for r in reqs]), 200
@users_bp.route('/friend-request', methods=['POST'])
@token_required
def send_friend_request():
"""Send a friend request to another user."""
target_username = request.json.get('username')
if not target_username:
return jsonify({'error': 'Username required'}), 400
db = get_db(current_app)
target = db.execute('SELECT id FROM users WHERE username = ?', (target_username,)).fetchone()
if not target:
return jsonify({'error': 'User not found'}), 404
if target['id'] == g.user['id']:
return jsonify({'error': 'Cannot add yourself'}), 400
try:
db.execute("INSERT INTO friends (user_id, friend_id, status) VALUES (?, ?, 'pending')",
(g.user['id'], target['id']))
db.commit()
return jsonify({'status': 'sent'}), 201
except sqlite3.IntegrityError:
return jsonify({'error': 'Friend request already exists'}), 409
@users_bp.route('/friend-request/<int:request_id>', methods=['POST'])
@token_required
def respond_friend_request(request_id):
"""Accept or reject a friend request."""
action = request.json.get('action')
db = get_db(current_app)
if action == 'accept':
db.execute("UPDATE friends SET status = 'accepted' WHERE id = ? AND friend_id = ?",
(request_id, g.user['id']))
elif action == 'reject':
db.execute("DELETE FROM friends WHERE id = ? AND friend_id = ?",
(request_id, g.user['id']))
db.commit()
return jsonify({'status': 'updated'}), 200