"""Database initialization and connection management.""" import sqlite3 from flask import g def get_db(app): """Get database connection from Flask app context.""" if 'db' not in g: g.db = sqlite3.connect(app.config['DATABASE']) g.db.row_factory = sqlite3.Row return g.db def close_db(app, error=None): """Close database connection on app teardown.""" db = g.pop('db', None) if db is not None: db.close() def init_db(app): """Initialize database with all required tables and columns. Uses parameterized queries and whitelist approach for all dynamic values to prevent SQL injection. """ db = sqlite3.connect(app.config['DATABASE']) db.execute('''CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL, token TEXT UNIQUE NOT NULL )''') db.execute('''CREATE TABLE IF NOT EXISTS registration_codes ( id INTEGER PRIMARY KEY, code TEXT UNIQUE NOT NULL, is_used INTEGER DEFAULT 0 )''') # Whitelist of allowed columns to add to users table # Format: column_name -> SQL definition users_columns_whitelist = { 'avatar_url': 'TEXT', 'description': 'TEXT', 'pronouns': 'TEXT', 'is_banned': 'INTEGER DEFAULT 0' } cols = [row[1] for row in db.execute('PRAGMA table_info(users)').fetchall()] # Add missing columns using explicit ALTER TABLE statements # (SQLite does not support parameterized column/table names) for col_name in users_columns_whitelist: if col_name not in cols: try: col_type = users_columns_whitelist[col_name] db.execute(f'ALTER TABLE users ADD COLUMN {col_name} {col_type}') except sqlite3.OperationalError: pass db.execute('''CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY, sender_id INTEGER NOT NULL, receiver_id INTEGER, content TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, is_global INTEGER DEFAULT 1, FOREIGN KEY(sender_id) REFERENCES users(id) )''') # Whitelist of allowed columns to add to messages table messages_columns_whitelist = { 'channel_id': 'INTEGER', 'reply_to': 'INTEGER' } cols = [row[1] for row in db.execute('PRAGMA table_info(messages)').fetchall()] for col_name in messages_columns_whitelist: if col_name not in cols: try: col_type = messages_columns_whitelist[col_name] db.execute(f'ALTER TABLE messages ADD COLUMN {col_name} {col_type}') except sqlite3.OperationalError: pass db.execute('''CREATE TABLE IF NOT EXISTS friends ( id INTEGER PRIMARY KEY, user_id INTEGER NOT NULL, friend_id INTEGER NOT NULL, status TEXT DEFAULT 'pending', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(user_id) REFERENCES users(id), FOREIGN KEY(friend_id) REFERENCES users(id), UNIQUE(user_id, friend_id) )''') db.execute('''CREATE TABLE IF NOT EXISTS servers ( id INTEGER PRIMARY KEY, name TEXT UNIQUE NOT NULL )''') db.execute('''CREATE TABLE IF NOT EXISTS channels ( id INTEGER PRIMARY KEY, server_id INTEGER NOT NULL, name TEXT NOT NULL, FOREIGN KEY(server_id) REFERENCES servers(id) )''') cur = db.execute('SELECT id FROM servers WHERE id=1') if not cur.fetchone(): db.execute('INSERT INTO servers (id, name) VALUES (1, ?)', ('Это днище интернета 2',)) db.execute('INSERT INTO channels (server_id, name) VALUES (1, ?)', ('general',)) db.commit() db.close()