Dnishe/database.py
2026-06-08 21:56:14 +00:00

112 lines
3.8 KiB
Python

"""Database initialization and connection management."""
import sqlite3
from flask import g
def get_db(app):
"""Get database connection from Flask app context."""
if 'db' not in g:
g.db = sqlite3.connect(app.config['DATABASE'])
g.db.row_factory = sqlite3.Row
return g.db
def close_db(app, error=None):
"""Close database connection on app teardown."""
db = g.pop('db', None)
if db is not None:
db.close()
def init_db(app):
"""Initialize database with all required tables and columns.
Uses parameterized queries and whitelist approach for all dynamic values
to prevent SQL injection.
"""
db = sqlite3.connect(app.config['DATABASE'])
db.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
token TEXT UNIQUE NOT NULL
)''')
db.execute('''CREATE TABLE IF NOT EXISTS registration_codes (
id INTEGER PRIMARY KEY,
code TEXT UNIQUE NOT NULL,
is_used INTEGER DEFAULT 0
)''')
# Whitelist of allowed columns to add to users table
# Format: column_name -> SQL definition
users_columns_whitelist = {
'avatar_url': 'TEXT',
'description': 'TEXT',
'pronouns': 'TEXT',
'is_banned': 'INTEGER DEFAULT 0'
}
cols = [row[1] for row in db.execute('PRAGMA table_info(users)').fetchall()]
# Add missing columns using explicit ALTER TABLE statements
# (SQLite does not support parameterized column/table names)
for col_name in users_columns_whitelist:
if col_name not in cols:
try:
col_type = users_columns_whitelist[col_name]
db.execute(f'ALTER TABLE users ADD COLUMN {col_name} {col_type}')
except sqlite3.OperationalError:
pass
db.execute('''CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY,
sender_id INTEGER NOT NULL,
receiver_id INTEGER,
content TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
is_global INTEGER DEFAULT 1,
FOREIGN KEY(sender_id) REFERENCES users(id)
)''')
# Whitelist of allowed columns to add to messages table
messages_columns_whitelist = {
'channel_id': 'INTEGER',
'reply_to': 'INTEGER'
}
cols = [row[1] for row in db.execute('PRAGMA table_info(messages)').fetchall()]
for col_name in messages_columns_whitelist:
if col_name not in cols:
try:
col_type = messages_columns_whitelist[col_name]
db.execute(f'ALTER TABLE messages ADD COLUMN {col_name} {col_type}')
except sqlite3.OperationalError:
pass
db.execute('''CREATE TABLE IF NOT EXISTS friends (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
friend_id INTEGER NOT NULL,
status TEXT DEFAULT 'pending',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id),
FOREIGN KEY(friend_id) REFERENCES users(id),
UNIQUE(user_id, friend_id)
)''')
db.execute('''CREATE TABLE IF NOT EXISTS servers (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
)''')
db.execute('''CREATE TABLE IF NOT EXISTS channels (
id INTEGER PRIMARY KEY,
server_id INTEGER NOT NULL,
name TEXT NOT NULL,
FOREIGN KEY(server_id) REFERENCES servers(id)
)''')
cur = db.execute('SELECT id FROM servers WHERE id=1')
if not cur.fetchone():
db.execute('INSERT INTO servers (id, name) VALUES (1, ?)', ('Это днище интернета 2',))
db.execute('INSERT INTO channels (server_id, name) VALUES (1, ?)', ('general',))
db.commit()
db.close()