# database.py import mysql.connector from mysql.connector import Error from config import Config def create_database(): """创建数据库(如果不存在)""" try: # 连接到MySQL服务器(不带数据库名) conn = mysql.connector.connect( host=Config.DB_CONFIG['host'], user=Config.DB_CONFIG['user'], password=Config.DB_CONFIG['password'] ) cursor = conn.cursor() # 创建数据库 cursor.execute(f"CREATE DATABASE IF NOT EXISTS {Config.DB_CONFIG['database']}") print(f"数据库 {Config.DB_CONFIG['database']} 已创建或已存在") # 切换到新数据库 cursor.execute(f"USE {Config.DB_CONFIG['database']}") # 创建表 sql_scripts = [ """ CREATE TABLE IF NOT EXISTS users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, password_hash VARCHAR(128) NOT NULL, email VARCHAR(100), is_admin BOOLEAN DEFAULT FALSE, is_active tinyint(1) DEFAULT '1', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """, """ CREATE TABLE IF NOT EXISTS certificate_authorities ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, common_name VARCHAR(100) NOT NULL, organization VARCHAR(100), organizational_unit VARCHAR(100), country VARCHAR(2), state VARCHAR(100), locality VARCHAR(100), key_size INT DEFAULT 2048, days_valid INT DEFAULT 3650, cert_path VARCHAR(255), key_path VARCHAR(255), created_by INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (created_by) REFERENCES users(id) ) """, """ CREATE TABLE IF NOT EXISTS certificates ( id INT AUTO_INCREMENT PRIMARY KEY, common_name VARCHAR(100) NOT NULL, san_dns TEXT, san_ip TEXT, organization VARCHAR(100), organizational_unit VARCHAR(100), country VARCHAR(2), state VARCHAR(100), locality VARCHAR(100), key_size INT DEFAULT 2048, days_valid INT DEFAULT 365, cert_path VARCHAR(255), key_path VARCHAR(255), csr_path VARCHAR(255), ca_id INT, status ENUM('active', 'revoked', 'expired') DEFAULT 'active', created_by INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NULL, revoked_at TIMESTAMP NULL, revocation_reason VARCHAR(255), FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id), FOREIGN KEY (created_by) REFERENCES users(id) ) """, """ CREATE TABLE IF NOT EXISTS certificate_revocation_list ( id INT AUTO_INCREMENT PRIMARY KEY, ca_id INT NOT NULL, crl_path VARCHAR(255), last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, next_update TIMESTAMP NULL, FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id) ) """, """ CREATE TABLE IF NOT EXISTS captcha ( id INT AUTO_INCREMENT PRIMARY KEY, code VARCHAR(10) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ] for script in sql_scripts: cursor.execute(script) conn.commit() print("所有表已创建或已存在") except Error as e: print(f"数据库初始化错误: {e}") raise finally: if conn.is_connected(): cursor.close() conn.close() def create_admin_user(): """创建初始管理员用户""" from werkzeug.security import generate_password_hash try: conn = mysql.connector.connect(**Config.DB_CONFIG) cursor = conn.cursor(dictionary=True) # 检查管理员是否已存在 cursor.execute("SELECT id FROM users WHERE username = %s", (Config.ADMIN_USERNAME,)) if cursor.fetchone(): print("管理员用户已存在") return # 创建管理员 password_hash = generate_password_hash(Config.ADMIN_PASSWORD) cursor.execute(""" INSERT INTO users (username, password_hash, email, is_admin, is_active) VALUES (%s, %s, %s, %s, %s) """, (Config.ADMIN_USERNAME, password_hash, Config.ADMIN_EMAIL, True, True)) conn.commit() print("管理员用户创建成功") except Error as e: print(f"创建管理员用户错误: {e}") raise finally: if conn.is_connected(): cursor.close() conn.close() def initialize_database(): """初始化数据库""" create_database() create_admin_user()