156 lines
5.3 KiB
Python
156 lines
5.3 KiB
Python
# database.py
|
||
import mysql.connector
|
||
from mysql.connector import Error
|
||
from config import Config
|
||
|
||
|
||
def create_database():
|
||
"""创建数据库(如果不存在)"""
|
||
try:
|
||
# 连接到MySQL服务器(不带数据库名)
|
||
conn = mysql.connector.connect(
|
||
host=Config.DB_CONFIG['host'],
|
||
user=Config.DB_CONFIG['user'],
|
||
password=Config.DB_CONFIG['password']
|
||
)
|
||
|
||
cursor = conn.cursor()
|
||
|
||
# 创建数据库
|
||
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {Config.DB_CONFIG['database']}")
|
||
print(f"数据库 {Config.DB_CONFIG['database']} 已创建或已存在")
|
||
|
||
# 切换到新数据库
|
||
cursor.execute(f"USE {Config.DB_CONFIG['database']}")
|
||
|
||
# 创建表
|
||
sql_scripts = [
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS users (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
username VARCHAR(50) UNIQUE NOT NULL,
|
||
password_hash VARCHAR(128) NOT NULL,
|
||
email VARCHAR(100),
|
||
is_admin BOOLEAN DEFAULT FALSE,
|
||
is_active tinyint(1) DEFAULT '1',
|
||
verification_token VARCHAR(255),
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS certificate_authorities (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
name VARCHAR(100) NOT NULL,
|
||
common_name VARCHAR(100) NOT NULL,
|
||
organization VARCHAR(100),
|
||
organizational_unit VARCHAR(100),
|
||
country VARCHAR(2),
|
||
state VARCHAR(100),
|
||
locality VARCHAR(100),
|
||
key_size INT DEFAULT 2048,
|
||
days_valid INT DEFAULT 3650,
|
||
cert_path VARCHAR(255),
|
||
key_path VARCHAR(255),
|
||
created_by INT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (created_by) REFERENCES users(id)
|
||
)
|
||
""",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS certificates (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
common_name VARCHAR(100) NOT NULL,
|
||
san_dns TEXT,
|
||
san_ip TEXT,
|
||
organization VARCHAR(100),
|
||
organizational_unit VARCHAR(100),
|
||
country VARCHAR(2),
|
||
state VARCHAR(100),
|
||
locality VARCHAR(100),
|
||
key_size INT DEFAULT 2048,
|
||
days_valid INT DEFAULT 365,
|
||
cert_path VARCHAR(255),
|
||
key_path VARCHAR(255),
|
||
csr_path VARCHAR(255),
|
||
ca_id INT,
|
||
status ENUM('active', 'revoked', 'expired') DEFAULT 'active',
|
||
created_by INT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
expires_at TIMESTAMP NULL,
|
||
revoked_at TIMESTAMP NULL,
|
||
revocation_reason VARCHAR(255),
|
||
FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id),
|
||
FOREIGN KEY (created_by) REFERENCES users(id)
|
||
)
|
||
""",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS certificate_revocation_list (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
ca_id INT NOT NULL,
|
||
crl_path VARCHAR(255),
|
||
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
next_update TIMESTAMP NULL,
|
||
FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id)
|
||
)
|
||
""",
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS captcha (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
code VARCHAR(10) NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
"""
|
||
]
|
||
|
||
for script in sql_scripts:
|
||
cursor.execute(script)
|
||
|
||
conn.commit()
|
||
print("所有表已创建或已存在")
|
||
|
||
except Error as e:
|
||
print(f"数据库初始化错误: {e}")
|
||
raise
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
def create_admin_user():
|
||
"""创建初始管理员用户"""
|
||
from werkzeug.security import generate_password_hash
|
||
|
||
try:
|
||
conn = mysql.connector.connect(**Config.DB_CONFIG)
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 检查管理员是否已存在
|
||
cursor.execute("SELECT id FROM users WHERE username = %s", (Config.ADMIN_USERNAME,))
|
||
if cursor.fetchone():
|
||
print("管理员用户已存在")
|
||
return
|
||
|
||
# 创建管理员
|
||
password_hash = generate_password_hash(Config.ADMIN_PASSWORD)
|
||
cursor.execute("""
|
||
INSERT INTO users (username, password_hash, email, is_admin, is_active)
|
||
VALUES (%s, %s, %s, %s, %s)
|
||
""", (Config.ADMIN_USERNAME, password_hash, Config.ADMIN_EMAIL, True, True))
|
||
|
||
conn.commit()
|
||
print("管理员用户创建成功")
|
||
|
||
except Error as e:
|
||
print(f"创建管理员用户错误: {e}")
|
||
raise
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
def initialize_database():
|
||
"""初始化数据库"""
|
||
create_database()
|
||
create_admin_user() |