certmanager/database.py
2025-06-14 19:22:42 +08:00

156 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# database.py
import mysql.connector
from mysql.connector import Error
from config import Config
def create_database():
"""创建数据库(如果不存在)"""
try:
# 连接到MySQL服务器不带数据库名
conn = mysql.connector.connect(
host=Config.DB_CONFIG['host'],
user=Config.DB_CONFIG['user'],
password=Config.DB_CONFIG['password']
)
cursor = conn.cursor()
# 创建数据库
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {Config.DB_CONFIG['database']}")
print(f"数据库 {Config.DB_CONFIG['database']} 已创建或已存在")
# 切换到新数据库
cursor.execute(f"USE {Config.DB_CONFIG['database']}")
# 创建表
sql_scripts = [
"""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
password_hash VARCHAR(128) NOT NULL,
email VARCHAR(100),
is_admin BOOLEAN DEFAULT FALSE,
is_active tinyint(1) DEFAULT '1',
verification_token VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
"""
CREATE TABLE IF NOT EXISTS certificate_authorities (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
common_name VARCHAR(100) NOT NULL,
organization VARCHAR(100),
organizational_unit VARCHAR(100),
country VARCHAR(2),
state VARCHAR(100),
locality VARCHAR(100),
key_size INT DEFAULT 2048,
days_valid INT DEFAULT 3650,
cert_path VARCHAR(255),
key_path VARCHAR(255),
created_by INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id)
)
""",
"""
CREATE TABLE IF NOT EXISTS certificates (
id INT AUTO_INCREMENT PRIMARY KEY,
common_name VARCHAR(100) NOT NULL,
san_dns TEXT,
san_ip TEXT,
organization VARCHAR(100),
organizational_unit VARCHAR(100),
country VARCHAR(2),
state VARCHAR(100),
locality VARCHAR(100),
key_size INT DEFAULT 2048,
days_valid INT DEFAULT 365,
cert_path VARCHAR(255),
key_path VARCHAR(255),
csr_path VARCHAR(255),
ca_id INT,
status ENUM('active', 'revoked', 'expired') DEFAULT 'active',
created_by INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NULL,
revoked_at TIMESTAMP NULL,
revocation_reason VARCHAR(255),
FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id),
FOREIGN KEY (created_by) REFERENCES users(id)
)
""",
"""
CREATE TABLE IF NOT EXISTS certificate_revocation_list (
id INT AUTO_INCREMENT PRIMARY KEY,
ca_id INT NOT NULL,
crl_path VARCHAR(255),
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
next_update TIMESTAMP NULL,
FOREIGN KEY (ca_id) REFERENCES certificate_authorities(id)
)
""",
"""
CREATE TABLE IF NOT EXISTS captcha (
id INT AUTO_INCREMENT PRIMARY KEY,
code VARCHAR(10) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
]
for script in sql_scripts:
cursor.execute(script)
conn.commit()
print("所有表已创建或已存在")
except Error as e:
print(f"数据库初始化错误: {e}")
raise
finally:
if conn.is_connected():
cursor.close()
conn.close()
def create_admin_user():
"""创建初始管理员用户"""
from werkzeug.security import generate_password_hash
try:
conn = mysql.connector.connect(**Config.DB_CONFIG)
cursor = conn.cursor(dictionary=True)
# 检查管理员是否已存在
cursor.execute("SELECT id FROM users WHERE username = %s", (Config.ADMIN_USERNAME,))
if cursor.fetchone():
print("管理员用户已存在")
return
# 创建管理员
password_hash = generate_password_hash(Config.ADMIN_PASSWORD)
cursor.execute("""
INSERT INTO users (username, password_hash, email, is_admin, is_active)
VALUES (%s, %s, %s, %s, %s)
""", (Config.ADMIN_USERNAME, password_hash, Config.ADMIN_EMAIL, True, True))
conn.commit()
print("管理员用户创建成功")
except Error as e:
print(f"创建管理员用户错误: {e}")
raise
finally:
if conn.is_connected():
cursor.close()
conn.close()
def initialize_database():
"""初始化数据库"""
create_database()
create_admin_user()