2024 lines
70 KiB
Python
2024 lines
70 KiB
Python
# app.py
|
||
import os
|
||
import subprocess
|
||
from datetime import datetime, timedelta
|
||
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, Response, current_app
|
||
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
import mysql.connector
|
||
from mysql.connector import Error
|
||
import random
|
||
import string
|
||
from io import BytesIO
|
||
import zipfile
|
||
import shutil
|
||
import re
|
||
from pypinyin import pinyin, Style
|
||
from flask_mail import Mail
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
from flask_migrate import Migrate
|
||
|
||
from dotenv import load_dotenv
|
||
from pathlib import Path
|
||
|
||
from urllib.parse import urljoin
|
||
|
||
# 先加载环境变量(必须在创建app之前)
|
||
load_dotenv(Path(__file__).parent / '.env', override=True)
|
||
def generate_full_url(endpoint, **values):
|
||
"""生成完整的URL,考虑自定义域名和协议"""
|
||
base_url = f"{current_app.config['APP_PROTOCOL']}://{current_app.config['APP_DOMAIN']}"
|
||
path = url_for(endpoint, **values)
|
||
return urljoin(base_url, path)
|
||
|
||
# 从配置文件中导入配置
|
||
from config import Config
|
||
|
||
app = Flask(__name__, static_folder='static')
|
||
app.config.from_object(Config)
|
||
|
||
# 初始化邮件扩展
|
||
mail = Mail(app)
|
||
|
||
@app.context_processor
|
||
def inject_now():
|
||
return {'now': datetime.now()}
|
||
|
||
# 确保证书存储目录存在
|
||
os.makedirs(Config.CERT_STORE, exist_ok=True)
|
||
|
||
# 初始化数据库
|
||
from database import initialize_database
|
||
initialize_database()
|
||
|
||
# 初始化数据库迁移
|
||
#migrate = Migrate(app, db)
|
||
|
||
# Flask-Login 配置
|
||
login_manager = LoginManager()
|
||
login_manager.init_app(app)
|
||
login_manager.login_view = 'login'
|
||
|
||
from io import BytesIO
|
||
from flask import send_file
|
||
import random
|
||
import string
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
|
||
def generate_captcha_image():
|
||
# 生成4位随机验证码(字母和数字)
|
||
captcha_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
|
||
|
||
# 创建图片(120x40像素,白色背景)
|
||
image = Image.new('RGB', (120, 40), color=(255, 255, 255))
|
||
draw = ImageDraw.Draw(image)
|
||
|
||
# 尝试加载字体,失败则使用默认字体
|
||
try:
|
||
# 指定字体路径(假设字体文件是 static/arial.ttf)
|
||
font_path = os.path.join(current_app.static_folder, "arial.ttf")
|
||
font = ImageFont.truetype(font_path, 24)
|
||
except Exception as e:
|
||
print(f"加载字体失败: {e}")
|
||
font = ImageFont.load_default() # 回退到默认字体
|
||
|
||
# 绘制验证码文本(每个字符随机颜色)
|
||
for i, char in enumerate(captcha_code):
|
||
text_color = (
|
||
random.randint(0, 150), # R
|
||
random.randint(0, 150), # G
|
||
random.randint(0, 150) # B
|
||
)
|
||
draw.text(
|
||
(10 + i * 25, 5), # 位置
|
||
char, # 文本
|
||
font=font, # 字体
|
||
fill=text_color # 颜色
|
||
)
|
||
|
||
# 添加5条干扰线(随机位置和颜色)
|
||
for _ in range(5):
|
||
line_color = (
|
||
random.randint(0, 255), # R
|
||
random.randint(0, 255), # G
|
||
random.randint(0, 255) # B
|
||
)
|
||
draw.line(
|
||
[
|
||
random.randint(0, 120), # x1
|
||
random.randint(0, 40), # y1
|
||
random.randint(0, 120), # x2
|
||
random.randint(0, 40) # y2
|
||
],
|
||
fill=line_color,
|
||
width=1
|
||
)
|
||
|
||
# 保存验证码到数据库
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
# 清除10分钟前的旧验证码
|
||
cursor.execute("DELETE FROM captcha WHERE created_at < NOW() - INTERVAL 10 MINUTE")
|
||
# 插入新验证码
|
||
cursor.execute("INSERT INTO captcha (code) VALUES (%s)", (captcha_code,))
|
||
conn.commit()
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
# 返回图片字节流
|
||
img_io = BytesIO()
|
||
image.save(img_io, 'PNG')
|
||
img_io.seek(0)
|
||
return img_io, captcha_code
|
||
|
||
|
||
@app.route('/captcha')
|
||
def captcha():
|
||
img_io, _ = generate_captcha_image()
|
||
return send_file(img_io, mimetype='image/png')
|
||
|
||
|
||
def to_pinyin(text):
|
||
"""将中文转换为拼音"""
|
||
if not text:
|
||
return ""
|
||
# 获取拼音列表,不带声调
|
||
pinyin_list = pinyin(text, style=Style.NORMAL)
|
||
# 拼接成字符串
|
||
return "_".join([item[0] for item in pinyin_list])
|
||
|
||
# 注册模板过滤器
|
||
@app.template_filter('to_pinyin')
|
||
def jinja2_to_pinyin(text):
|
||
return to_pinyin(text)
|
||
|
||
class User(UserMixin):
|
||
pass
|
||
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
try:
|
||
conn = mysql.connector.connect(**Config.DB_CONFIG)
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
|
||
user_data = cursor.fetchone()
|
||
|
||
if user_data:
|
||
user = User()
|
||
user.id = user_data['id']
|
||
user.username = user_data['username']
|
||
user.is_admin = user_data['is_admin']
|
||
return user
|
||
return None
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
# 辅助函数
|
||
def get_db_connection():
|
||
try:
|
||
conn = mysql.connector.connect(**Config.DB_CONFIG)
|
||
return conn
|
||
except Error as e:
|
||
print(f"Database connection error: {e}")
|
||
return None
|
||
|
||
|
||
def verify_captcha(user_input):
|
||
"""验证用户输入的验证码是否正确(只验证最新的4位验证码)"""
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
# 只查询最新的验证码(确保是4位的)
|
||
cursor.execute("""
|
||
SELECT code FROM captcha
|
||
WHERE LENGTH(code) = 4 -- 只查询4位验证码
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
""")
|
||
result = cursor.fetchone()
|
||
if result and user_input.upper() == result[0]:
|
||
# 验证成功后删除已使用的验证码
|
||
cursor.execute("DELETE FROM captcha WHERE code = %s", (result[0],))
|
||
conn.commit()
|
||
return True
|
||
return False
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return False
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return False
|
||
|
||
def validate_name(name, max_length=64):
|
||
"""
|
||
校验名称是否符合规范
|
||
规则:
|
||
1. 长度1-64个字符
|
||
2. 只能包含字母、数字、中文、下划线、短横线
|
||
3. 不能以短横线开头或结尾
|
||
"""
|
||
if not name or len(name) > max_length:
|
||
return False
|
||
|
||
# 允许中文、字母、数字、下划线、短横线
|
||
pattern = r'^[a-zA-Z0-9_\-\u4e00-\u9fa5]+$'
|
||
if not re.match(pattern, name):
|
||
return False
|
||
|
||
# 不能以短横线开头或结尾
|
||
if name.startswith('-') or name.endswith('-'):
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def validate_common_name(cn):
|
||
"""
|
||
校验通用名(Common Name)是否符合规范
|
||
规则:
|
||
1. 长度1-64个字符
|
||
2. 只能包含字母、数字、点号(.)和短横线(-)
|
||
3. 不能以点号或短横线开头或结尾
|
||
4. 不能连续两个点号或短横线
|
||
"""
|
||
if not cn or len(cn) > 64:
|
||
return False
|
||
|
||
# 只允许字母、数字、点号和短横线
|
||
if not re.match(r'^[a-zA-Z0-9.-]+$', cn):
|
||
return False
|
||
|
||
# 不能以点号或短横线开头或结尾
|
||
if cn.startswith('.') or cn.endswith('.') or cn.startswith('-') or cn.endswith('-'):
|
||
return False
|
||
|
||
# 不能连续两个点号或短横线
|
||
if '..' in cn or '--' in cn:
|
||
return False
|
||
|
||
return True
|
||
|
||
def create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid,
|
||
created_by):
|
||
# 创建拼音格式的目录名
|
||
pinyin_name = to_pinyin(ca_name)
|
||
ca_dir = os.path.join(Config.CERT_STORE, f"ca_{pinyin_name}")
|
||
os.makedirs(ca_dir, exist_ok=True)
|
||
|
||
# 使用原始common_name作为文件名
|
||
key_path = os.path.join(ca_dir, f"{common_name}.key")
|
||
cert_path = os.path.join(ca_dir, f"{common_name}.crt")
|
||
|
||
# 创建OpenSSL配置文件
|
||
openssl_cnf = f"""
|
||
[ req ]
|
||
default_bits = {key_size}
|
||
distinguished_name = req_distinguished_name
|
||
x509_extensions = v3_ca
|
||
prompt = no
|
||
|
||
[ req_distinguished_name ]
|
||
C = {country}
|
||
ST = {state}
|
||
L = {locality}
|
||
O = {organization}
|
||
OU = {organizational_unit}
|
||
CN = {common_name}
|
||
|
||
[ v3_ca ]
|
||
subjectKeyIdentifier = hash
|
||
authorityKeyIdentifier = keyid:always,issuer:always
|
||
basicConstraints = CA:TRUE
|
||
keyUsage = digitalSignature, keyCertSign, cRLSign
|
||
"""
|
||
|
||
config_path = os.path.join(ca_dir, 'openssl.cnf')
|
||
with open(config_path, 'w') as f:
|
||
f.write(openssl_cnf.strip())
|
||
|
||
# 生成CA私钥
|
||
subprocess.run([
|
||
'openssl', 'genrsa', '-out', key_path, str(key_size)
|
||
], check=True)
|
||
|
||
# 生成CA自签名证书(使用配置文件)
|
||
subprocess.run([
|
||
'openssl', 'req', '-new', '-x509', '-days', str(days_valid),
|
||
'-key', key_path, '-out', cert_path,
|
||
'-config', config_path
|
||
], check=True)
|
||
|
||
# 保存到数据库
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO certificate_authorities
|
||
(name, common_name, organization, organizational_unit, country, state, locality,
|
||
key_size, days_valid, cert_path, key_path, created_by)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
""", (ca_name, common_name, organization, organizational_unit, country, state, locality,
|
||
key_size, days_valid, cert_path, key_path, created_by))
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return None
|
||
|
||
|
||
def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit,
|
||
country, state, locality, key_size, days_valid, created_by):
|
||
"""创建证书并返回证书ID"""
|
||
# 获取CA信息
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
print(f"CA ID {ca_id} 不存在")
|
||
return None
|
||
|
||
# 创建证书目录
|
||
cert_dir = os.path.join(Config.CERT_STORE, f"certs_{common_name}")
|
||
os.makedirs(cert_dir, exist_ok=True)
|
||
|
||
key_path = os.path.join(cert_dir, f"{common_name}.key")
|
||
csr_path = os.path.join(cert_dir, f"{common_name}.csr")
|
||
cert_path = os.path.join(cert_dir, f"{common_name}.crt")
|
||
|
||
# 1. 生成私钥
|
||
try:
|
||
subprocess.run([
|
||
'openssl', 'genrsa', '-out', key_path, str(key_size)
|
||
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"生成私钥失败: {e.stderr.decode()}")
|
||
return None
|
||
|
||
# 2. 创建CSR配置文件
|
||
has_san = bool(san_dns or san_ip)
|
||
dns_entries = [dns.strip() for dns in san_dns.split(',') if dns.strip()] if san_dns else []
|
||
ip_entries = [ip.strip() for ip in san_ip.split(',') if ip.strip()] if san_ip else []
|
||
|
||
# 构建CSR配置
|
||
csr_config = f"""[req]
|
||
default_bits = {key_size}
|
||
prompt = no
|
||
default_md = sha256
|
||
distinguished_name = dn
|
||
"""
|
||
|
||
if has_san:
|
||
csr_config += "req_extensions = req_ext\n"
|
||
|
||
csr_config += f"""
|
||
[dn]
|
||
CN = {common_name}
|
||
O = {organization}
|
||
OU = {organizational_unit}
|
||
C = {country}
|
||
ST = {state}
|
||
L = {locality}
|
||
"""
|
||
|
||
if has_san:
|
||
csr_config += """
|
||
[req_ext]
|
||
basicConstraints = CA:FALSE
|
||
keyUsage = digitalSignature, keyEncipherment
|
||
subjectAltName = @alt_names
|
||
extendedKeyUsage = serverAuth, clientAuth
|
||
|
||
[alt_names]"""
|
||
|
||
# 添加DNS SAN条目
|
||
for i, dns in enumerate(dns_entries, 1):
|
||
csr_config += f"\nDNS.{i} = {dns}"
|
||
|
||
# 添加IP SAN条目
|
||
for i, ip in enumerate(ip_entries, 1):
|
||
csr_config += f"\nIP.{i} = {ip}"
|
||
|
||
# 写入CSR配置文件
|
||
csr_config_path = os.path.join(cert_dir, 'csr_config.cnf')
|
||
with open(csr_config_path, 'w') as f:
|
||
f.write(csr_config.strip()) # 确保没有多余空行
|
||
|
||
# 3. 生成CSR
|
||
try:
|
||
subprocess.run([
|
||
'openssl', 'req', '-new', '-key', key_path,
|
||
'-out', csr_path, '-config', csr_config_path
|
||
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"生成CSR失败: {e.stderr.decode()}")
|
||
print("CSR配置文件内容:")
|
||
print(csr_config)
|
||
return None
|
||
|
||
# 4. 创建证书扩展配置文件
|
||
ext_config = """authorityKeyIdentifier=keyid,issuer
|
||
basicConstraints=CA:FALSE
|
||
keyUsage=digitalSignature, keyEncipherment
|
||
extendedKeyUsage=serverAuth, clientAuth
|
||
"""
|
||
|
||
if has_san:
|
||
ext_config += "subjectAltName=@alt_names\n\n[alt_names]\n"
|
||
|
||
# 添加DNS SAN条目
|
||
for i, dns in enumerate(dns_entries, 1):
|
||
ext_config += f"DNS.{i} = {dns}\n"
|
||
|
||
# 添加IP SAN条目
|
||
for i, ip in enumerate(ip_entries, 1):
|
||
ext_config += f"IP.{i} = {ip}\n"
|
||
|
||
ext_config_path = os.path.join(cert_dir, 'ext.cnf')
|
||
with open(ext_config_path, 'w') as f:
|
||
f.write(ext_config.strip())
|
||
|
||
# 5. 使用CA签名证书
|
||
try:
|
||
cmd = [
|
||
'openssl', 'x509', '-req', '-in', csr_path,
|
||
'-CA', ca['cert_path'], '-CAkey', ca['key_path'],
|
||
'-CAcreateserial', '-out', cert_path,
|
||
'-days', str(days_valid), '-sha256'
|
||
]
|
||
|
||
# 只有有扩展内容时才添加-extfile参数
|
||
if os.path.getsize(ext_config_path) > 0:
|
||
cmd.extend(['-extfile', ext_config_path])
|
||
|
||
subprocess.run(cmd, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"证书签名失败: {e.stderr.decode()}")
|
||
print("扩展配置文件内容:")
|
||
print(ext_config)
|
||
return None
|
||
|
||
# 6. 保存到数据库
|
||
expires_at = datetime.now() + timedelta(days=days_valid)
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO certificates
|
||
(common_name, san_dns, san_ip, organization, organizational_unit,
|
||
country, state, locality, key_size, days_valid, cert_path,
|
||
key_path, csr_path, ca_id, created_by, expires_at)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
""", (
|
||
common_name, san_dns, san_ip, organization, organizational_unit,
|
||
country, state, locality, key_size, days_valid, cert_path,
|
||
key_path, csr_path, ca_id, created_by, expires_at
|
||
))
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
except Error as e:
|
||
print(f"数据库错误: {e}")
|
||
conn.rollback()
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return None
|
||
|
||
|
||
def get_ca_by_id(ca_id):
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT * FROM certificate_authorities WHERE id = %s", (ca_id,))
|
||
return cursor.fetchone()
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return None
|
||
|
||
|
||
def get_certificate_by_id(cert_id):
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT * FROM certificates WHERE id = %s", (cert_id,))
|
||
return cursor.fetchone()
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return None
|
||
|
||
|
||
def revoke_certificate(cert_id, reason):
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE certificates
|
||
SET status = 'revoked', revoked_at = NOW(), revocation_reason = %s
|
||
WHERE id = %s
|
||
""", (reason, cert_id))
|
||
conn.commit()
|
||
return True
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return False
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return False
|
||
|
||
|
||
def renew_certificate(cert_id, days_valid):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
return False
|
||
|
||
ca = get_ca_by_id(cert['ca_id'])
|
||
if not ca:
|
||
return False
|
||
|
||
# 使用现有CSR重新签名
|
||
new_cert_path = os.path.join(os.path.dirname(cert['cert_path']), f"renewed_{cert['common_name']}.crt")
|
||
|
||
subprocess.run([
|
||
'openssl', 'x509', '-req', '-in', cert['csr_path'], '-CA', ca['cert_path'],
|
||
'-CAkey', ca['key_path'], '-CAcreateserial', '-out', new_cert_path,
|
||
'-days', str(days_valid), '-sha256'
|
||
], check=True)
|
||
|
||
# 更新数据库
|
||
new_expires_at = datetime.now() + timedelta(days=days_valid)
|
||
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE certificates
|
||
SET cert_path = %s, days_valid = %s, expires_at = %s, status = 'active',
|
||
revoked_at = NULL, revocation_reason = NULL
|
||
WHERE id = %s
|
||
""", (new_cert_path, days_valid, new_expires_at, cert_id))
|
||
conn.commit()
|
||
return True
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return False
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return False
|
||
|
||
|
||
def generate_crl(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
return False
|
||
|
||
ca_dir = os.path.dirname(ca['cert_path'])
|
||
crl_path = os.path.join(ca_dir, f"crl_{ca['name']}.pem")
|
||
|
||
# 创建完整的OpenSSL配置文件
|
||
openssl_cnf = f"""
|
||
[ ca ]
|
||
default_ca = CA_default
|
||
|
||
[ CA_default ]
|
||
database = {os.path.join(ca_dir, 'index.txt')}
|
||
certificate = {ca['cert_path']}
|
||
private_key = {ca['key_path']}
|
||
crl = {crl_path}
|
||
RANDFILE = {os.path.join(ca_dir, '.rand')}
|
||
default_days = 365
|
||
default_crl_days = 30
|
||
default_md = sha256
|
||
preserve = no
|
||
policy = policy_anything
|
||
|
||
[ policy_anything ]
|
||
countryName = optional
|
||
stateOrProvinceName = optional
|
||
localityName = optional
|
||
organizationName = optional
|
||
organizationalUnitName = optional
|
||
commonName = optional
|
||
emailAddress = optional
|
||
"""
|
||
|
||
cnf_path = os.path.join(ca_dir, 'openssl.cnf')
|
||
with open(cnf_path, 'w') as f:
|
||
f.write(openssl_cnf)
|
||
|
||
# 确保index.txt存在
|
||
index_file = os.path.join(ca_dir, 'index.txt')
|
||
if not os.path.exists(index_file):
|
||
open(index_file, 'a').close()
|
||
|
||
# 生成CRL
|
||
try:
|
||
subprocess.run([
|
||
'openssl', 'ca', '-gencrl',
|
||
'-config', cnf_path,
|
||
'-out', crl_path
|
||
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
||
|
||
# 更新数据库
|
||
next_update = datetime.now() + timedelta(days=30)
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO certificate_revocation_list
|
||
(ca_id, crl_path, next_update)
|
||
VALUES (%s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE
|
||
crl_path = VALUES(crl_path),
|
||
last_updated = NOW(),
|
||
next_update = VALUES(next_update)
|
||
""", (ca_id, crl_path, next_update))
|
||
conn.commit()
|
||
return True
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return False
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
except subprocess.CalledProcessError as e:
|
||
error_msg = e.stderr.decode() if e.stderr else "Unknown OpenSSL error"
|
||
print(f"OpenSSL error: {error_msg}")
|
||
flash(f'CRL生成失败: {error_msg}', 'danger')
|
||
except Exception as e:
|
||
print(f"Error generating CRL: {str(e)}")
|
||
flash(f'CRL生成异常: {str(e)}', 'danger')
|
||
|
||
return False
|
||
|
||
def export_pkcs12(cert_id, password):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
return None
|
||
|
||
ca = get_ca_by_id(cert['ca_id'])
|
||
if not ca:
|
||
return None
|
||
|
||
pkcs12_path = os.path.join(os.path.dirname(cert['cert_path']), f"{cert['common_name']}.p12")
|
||
|
||
subprocess.run([
|
||
'openssl', 'pkcs12', '-export', '-out', pkcs12_path,
|
||
'-inkey', cert['key_path'], '-in', cert['cert_path'],
|
||
'-certfile', ca['cert_path'], '-passout', f'pass:{password}'
|
||
], check=True)
|
||
|
||
return pkcs12_path
|
||
|
||
|
||
@app.route('/register', methods=['GET', 'POST'])
|
||
def register():
|
||
# 检查用户是否已登录
|
||
if current_user.is_authenticated:
|
||
return redirect(url_for('index'))
|
||
|
||
# 检查注册功能是否开放
|
||
if not current_app.config['REGISTRATION_OPEN']:
|
||
flash('系统暂未开放注册', 'warning')
|
||
return redirect(url_for('login'))
|
||
|
||
if request.method == 'POST':
|
||
username = request.form['username'].strip()
|
||
password = request.form['password']
|
||
confirm_password = request.form['confirm_password']
|
||
email = request.form.get('email', '').strip()
|
||
captcha = request.form['captcha']
|
||
|
||
# 验证验证码
|
||
if not verify_captcha(captcha):
|
||
flash('验证码错误', 'danger')
|
||
return redirect(url_for('register'))
|
||
|
||
# 验证密码匹配
|
||
if password != confirm_password:
|
||
flash('两次输入的密码不匹配', 'danger')
|
||
return redirect(url_for('register'))
|
||
|
||
# 密码策略检查
|
||
password_errors = []
|
||
policy = current_app.config['PASSWORD_POLICY']
|
||
|
||
if len(password) < policy['min_length']:
|
||
password_errors.append(f"密码长度至少{policy['min_length']}位")
|
||
if policy['require_uppercase'] and not re.search(r'[A-Z]', password):
|
||
password_errors.append("必须包含大写字母")
|
||
if policy['require_lowercase'] and not re.search(r'[a-z]', password):
|
||
password_errors.append("必须包含小写字母")
|
||
if policy['require_digits'] and not re.search(r'[0-9]', password):
|
||
password_errors.append("必须包含数字")
|
||
if policy['require_special_chars'] and not re.search(r'[^A-Za-z0-9]', password):
|
||
password_errors.append("必须包含特殊字符")
|
||
|
||
if password_errors:
|
||
flash('密码不符合要求: ' + ', '.join(password_errors), 'danger')
|
||
return redirect(url_for('register'))
|
||
|
||
# 验证用户名是否已存在
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
|
||
# 检查用户名和邮箱是否已存在
|
||
cursor.execute("SELECT id FROM users WHERE username = %s OR email = %s",
|
||
(username, email))
|
||
if cursor.fetchone():
|
||
flash('用户名或邮箱已被注册', 'danger')
|
||
return redirect(url_for('register'))
|
||
|
||
# 根据配置决定是否立即激活账户
|
||
is_active = not current_app.config['EMAIL_VERIFICATION_REQUIRED']
|
||
verification_token = None
|
||
|
||
# 创建新用户
|
||
password_hash = generate_password_hash(password)
|
||
cursor.execute("""
|
||
INSERT INTO users (username, password_hash, email, is_admin, is_active, verification_token)
|
||
VALUES (%s, %s, %s, %s, %s, %s)
|
||
""", (username, password_hash, email, False, is_active, verification_token))
|
||
|
||
if current_app.config['EMAIL_VERIFICATION_REQUIRED']:
|
||
# 生成验证令牌
|
||
from itsdangerous import URLSafeTimedSerializer
|
||
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
||
token = serializer.dumps({'username': username, 'email': email}, salt='email-verify')
|
||
|
||
# 更新用户验证令牌
|
||
cursor.execute("""
|
||
UPDATE users SET verification_token = %s WHERE username = %s
|
||
""", (token, username))
|
||
|
||
# 发送验证邮件
|
||
try:
|
||
from flask_mail import Message
|
||
msg = Message('请验证您的邮箱 - 自签证书管理系统',
|
||
sender=current_app.config['MAIL_DEFAULT_SENDER'],
|
||
recipients=[email])
|
||
|
||
confirm_url = generate_full_url('verify_email', token=token)
|
||
|
||
# 使用HTML格式的邮件内容
|
||
msg.html = f"""
|
||
<html>
|
||
<body>
|
||
<p>感谢注册!</p>
|
||
<p>请点击以下链接完成邮箱验证:</p>
|
||
<p><a href="{confirm_url}">{confirm_url}</a></p>
|
||
<p>(链接24小时内有效)</p>
|
||
<p>如果链接无法点击,请复制到浏览器地址栏访问</p>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
mail = Mail(current_app)
|
||
mail.send(msg)
|
||
flash('验证邮件已发送至您的邮箱,请查收并完成验证', 'info')
|
||
except Exception as e:
|
||
current_app.logger.error(f'邮件发送失败: {str(e)}')
|
||
conn.rollback()
|
||
flash('发送验证邮件失败,请稍后再试或联系管理员', 'danger')
|
||
return redirect(url_for('register'))
|
||
|
||
conn.commit()
|
||
|
||
if is_active:
|
||
flash('注册成功,请登录', 'success')
|
||
return redirect(url_for('login'))
|
||
else:
|
||
return redirect(url_for('register'))
|
||
|
||
except Error as e:
|
||
conn.rollback()
|
||
current_app.logger.error(f'数据库错误: {str(e)}')
|
||
flash('注册失败,请稍后再试', 'danger')
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
# 生成新验证码
|
||
captcha_url = url_for('captcha') # 使用图片验证码
|
||
return render_template('register.html',
|
||
captcha_url=captcha_url, # 前端改为显示图片验证码
|
||
registration_open=current_app.config['REGISTRATION_OPEN'],
|
||
email_required=current_app.config['EMAIL_VERIFICATION_REQUIRED'])
|
||
|
||
|
||
@app.route('/verify-email/<token>')
|
||
def verify_email(token):
|
||
if current_user.is_authenticated:
|
||
return redirect(generate_full_url('index'))
|
||
|
||
try:
|
||
from itsdangerous import URLSafeTimedSerializer
|
||
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
|
||
data = serializer.loads(token, salt='email-verify', max_age=86400) # 24小时有效期
|
||
|
||
username = data['username']
|
||
email = data['email']
|
||
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 验证令牌是否匹配
|
||
cursor.execute("""
|
||
SELECT id FROM users
|
||
WHERE username = %s AND email = %s AND verification_token = %s
|
||
""", (username, email, token))
|
||
|
||
user = cursor.fetchone()
|
||
if not user:
|
||
flash('无效的验证链接', 'danger')
|
||
return redirect(generate_full_url('login'))
|
||
|
||
# 激活账户
|
||
cursor.execute("""
|
||
UPDATE users
|
||
SET is_active = TRUE, verification_token = NULL
|
||
WHERE id = %s
|
||
""", (user['id'],))
|
||
conn.commit()
|
||
|
||
flash('邮箱验证成功,您现在可以登录了', 'success')
|
||
return redirect(generate_full_url('login'))
|
||
|
||
except Error as e:
|
||
conn.rollback()
|
||
current_app.logger.error(f'数据库错误: {str(e)}')
|
||
flash('验证过程中出现错误', 'danger')
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
except Exception as e:
|
||
current_app.logger.error(f'令牌验证失败: {str(e)}')
|
||
flash('验证链接无效或已过期', 'danger')
|
||
|
||
return redirect(generate_full_url('login'))
|
||
|
||
# 路由定义
|
||
@app.route('/')
|
||
@login_required
|
||
def index():
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 获取CA数量
|
||
if current_user.is_admin:
|
||
cursor.execute("SELECT COUNT(*) as count FROM certificate_authorities")
|
||
ca_count = cursor.fetchone()['count']
|
||
cursor.fetchall() # 确保清空结果集
|
||
|
||
cursor.execute("""
|
||
SELECT * FROM certificate_authorities
|
||
ORDER BY created_at DESC LIMIT 5
|
||
""")
|
||
else:
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as count FROM certificate_authorities
|
||
WHERE created_by = %s
|
||
""", (current_user.id,))
|
||
ca_count = cursor.fetchone()['count']
|
||
cursor.fetchall() # 确保清空结果集
|
||
|
||
cursor.execute("""
|
||
SELECT * FROM certificate_authorities
|
||
WHERE created_by = %s
|
||
ORDER BY created_at DESC LIMIT 5
|
||
""", (current_user.id,))
|
||
recent_cas = cursor.fetchall()
|
||
|
||
# 获取证书数量
|
||
if current_user.is_admin:
|
||
cursor.execute("SELECT COUNT(*) as count FROM certificates")
|
||
cert_count = cursor.fetchone()['count']
|
||
cursor.fetchall() # 确保清空结果集
|
||
|
||
cursor.execute("""
|
||
SELECT * FROM certificates
|
||
ORDER BY created_at DESC LIMIT 5
|
||
""")
|
||
else:
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as count FROM certificates
|
||
WHERE created_by = %s
|
||
""", (current_user.id,))
|
||
cert_count = cursor.fetchone()['count']
|
||
cursor.fetchall() # 确保清空结果集
|
||
|
||
cursor.execute("""
|
||
SELECT * FROM certificates
|
||
WHERE created_by = %s
|
||
ORDER BY created_at DESC LIMIT 5
|
||
""", (current_user.id,))
|
||
recent_certs = cursor.fetchall()
|
||
|
||
# 获取即将过期证书数量(30天内)
|
||
if current_user.is_admin:
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as count FROM certificates
|
||
WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY)
|
||
AND status = 'active'
|
||
""")
|
||
else:
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as count FROM certificates
|
||
WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY)
|
||
AND status = 'active' AND created_by = %s
|
||
""", (current_user.id,))
|
||
expiring_soon_count = cursor.fetchone()['count']
|
||
cursor.fetchall() # 确保清空结果集
|
||
|
||
# 获取活跃证书数量
|
||
if current_user.is_admin:
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as count FROM certificates
|
||
WHERE status = 'active'
|
||
""")
|
||
else:
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as count FROM certificates
|
||
WHERE status = 'active' AND created_by = %s
|
||
""", (current_user.id,))
|
||
active_count = cursor.fetchone()['count']
|
||
cursor.fetchall() # 确保清空结果集
|
||
|
||
return render_template('index.html',
|
||
ca_count=ca_count,
|
||
cert_count=cert_count,
|
||
expiring_soon_count=expiring_soon_count,
|
||
active_count=active_count,
|
||
recent_cas=recent_cas,
|
||
recent_certs=recent_certs)
|
||
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取系统统计信息失败', 'danger')
|
||
return render_template('index.html',
|
||
ca_count=0,
|
||
cert_count=0,
|
||
expiring_soon_count=0,
|
||
active_count=0,
|
||
recent_cas=[],
|
||
recent_certs=[])
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return render_template('index.html',
|
||
ca_count=0,
|
||
cert_count=0,
|
||
expiring_soon_count=0,
|
||
active_count=0,
|
||
recent_cas=[],
|
||
recent_certs=[])
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
if current_user.is_authenticated:
|
||
return redirect(url_for('index'))
|
||
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
captcha = request.form['captcha']
|
||
|
||
if not verify_captcha(captcha):
|
||
flash('验证码错误', 'danger')
|
||
return redirect(url_for('login'))
|
||
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("""
|
||
SELECT * FROM users
|
||
WHERE username = %s AND is_active = TRUE
|
||
""", (username,))
|
||
user_data = cursor.fetchone()
|
||
|
||
if user_data and check_password_hash(user_data['password_hash'], password):
|
||
user = User()
|
||
user.id = user_data['id']
|
||
user.username = user_data['username']
|
||
user.is_admin = user_data['is_admin']
|
||
login_user(user)
|
||
flash('登录成功', 'success')
|
||
return redirect(url_for('index'))
|
||
else:
|
||
flash('用户名或密码错误,或账户未激活', 'danger')
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('登录失败,请稍后再试', 'danger')
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
captcha_url = url_for('captcha')
|
||
return render_template('login.html', captcha_url=captcha_url)
|
||
|
||
|
||
@app.route('/logout')
|
||
@login_required
|
||
def logout():
|
||
logout_user()
|
||
flash('您已成功登出', 'success')
|
||
return redirect(url_for('login'))
|
||
|
||
|
||
from math import ceil
|
||
|
||
# 每页显示的数量
|
||
PER_PAGE = 10
|
||
|
||
|
||
@app.route('/cas')
|
||
@login_required
|
||
def ca_list():
|
||
page = request.args.get('page', 1, type=int)
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 获取总数
|
||
if current_user.is_admin:
|
||
cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities")
|
||
else:
|
||
cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities WHERE created_by = %s",
|
||
(current_user.id,))
|
||
total = cursor.fetchone()['total']
|
||
total_pages = ceil(total / PER_PAGE)
|
||
|
||
# 获取分页数据
|
||
offset = (page - 1) * PER_PAGE
|
||
if current_user.is_admin:
|
||
cursor.execute("SELECT * FROM certificate_authorities ORDER BY created_at DESC LIMIT %s OFFSET %s",
|
||
(PER_PAGE, offset))
|
||
else:
|
||
cursor.execute("""
|
||
SELECT * FROM certificate_authorities
|
||
WHERE created_by = %s
|
||
ORDER BY created_at DESC
|
||
LIMIT %s OFFSET %s
|
||
""", (current_user.id, PER_PAGE, offset))
|
||
cas = cursor.fetchall()
|
||
|
||
return render_template('ca_list.html',
|
||
cas=cas,
|
||
page=page,
|
||
total_pages=total_pages,
|
||
total=total,
|
||
get_username=get_username)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取CA列表失败', 'danger')
|
||
return redirect(url_for('index'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('index'))
|
||
|
||
|
||
@app.route('/cas/batch_delete', methods=['POST'])
|
||
@login_required
|
||
def batch_delete_cas():
|
||
if not request.is_json:
|
||
return jsonify({'success': False, 'message': 'Invalid request'}), 400
|
||
|
||
data = request.get_json()
|
||
ca_ids = data.get('ids', [])
|
||
|
||
if not ca_ids:
|
||
return jsonify({'success': False, 'message': 'No CAs selected'}), 400
|
||
|
||
conn = get_db_connection()
|
||
if not conn:
|
||
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
|
||
|
||
try:
|
||
cursor = conn.cursor()
|
||
|
||
# 检查权限并删除
|
||
for ca_id in ca_ids:
|
||
# 验证CA存在且用户有权限
|
||
cursor.execute("SELECT created_by FROM certificate_authorities WHERE id = %s", (ca_id,))
|
||
ca = cursor.fetchone()
|
||
|
||
if not ca:
|
||
continue
|
||
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
continue
|
||
|
||
# 检查是否有关联证书
|
||
cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,))
|
||
result = cursor.fetchone()
|
||
if result['count'] > 0:
|
||
continue
|
||
|
||
# 获取CA信息以便删除文件
|
||
cursor.execute("SELECT cert_path, key_path FROM certificate_authorities WHERE id = %s", (ca_id,))
|
||
ca_info = cursor.fetchone()
|
||
|
||
if ca_info:
|
||
# 删除文件
|
||
try:
|
||
if os.path.exists(ca_info['cert_path']):
|
||
os.remove(ca_info['cert_path'])
|
||
if os.path.exists(ca_info['key_path']):
|
||
os.remove(ca_info['key_path'])
|
||
|
||
# 删除CA目录
|
||
ca_dir = os.path.dirname(ca_info['cert_path'])
|
||
if os.path.exists(ca_dir):
|
||
shutil.rmtree(ca_dir) # 递归删除目录
|
||
except OSError as e:
|
||
print(f"文件删除错误: {e}")
|
||
continue
|
||
|
||
# 删除数据库记录
|
||
cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,))
|
||
cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,))
|
||
|
||
conn.commit()
|
||
return jsonify({'success': True, 'message': '批量删除成功'})
|
||
|
||
except Error as e:
|
||
conn.rollback()
|
||
print(f"Database error: {e}")
|
||
return jsonify({'success': False, 'message': '数据库操作失败'}), 500
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
@app.route('/certificates')
|
||
@login_required
|
||
def certificate_list():
|
||
page = request.args.get('page', 1, type=int)
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 获取总数
|
||
if current_user.is_admin:
|
||
cursor.execute("SELECT COUNT(*) as total FROM certificates")
|
||
else:
|
||
cursor.execute("SELECT COUNT(*) as total FROM certificates WHERE created_by = %s", (current_user.id,))
|
||
total = cursor.fetchone()['total']
|
||
total_pages = ceil(total / PER_PAGE)
|
||
|
||
# 获取分页数据
|
||
offset = (page - 1) * PER_PAGE
|
||
if current_user.is_admin:
|
||
cursor.execute("""
|
||
SELECT c.*, ca.name as ca_name
|
||
FROM certificates c
|
||
JOIN certificate_authorities ca ON c.ca_id = ca.id
|
||
ORDER BY c.created_at DESC
|
||
LIMIT %s OFFSET %s
|
||
""", (PER_PAGE, offset))
|
||
else:
|
||
cursor.execute("""
|
||
SELECT c.*, ca.name as ca_name
|
||
FROM certificates c
|
||
JOIN certificate_authorities ca ON c.ca_id = ca.id
|
||
WHERE c.created_by = %s
|
||
ORDER BY c.created_at DESC
|
||
LIMIT %s OFFSET %s
|
||
""", (current_user.id, PER_PAGE, offset))
|
||
certificates = cursor.fetchall()
|
||
|
||
return render_template('certificate_list.html',
|
||
certificates=certificates,
|
||
page=page,
|
||
total_pages=total_pages,
|
||
total=total,
|
||
get_username=get_username)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取证书列表失败', 'danger')
|
||
return redirect(url_for('index'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('index'))
|
||
|
||
|
||
@app.route('/certificates/batch_delete', methods=['POST'])
|
||
@login_required
|
||
def batch_delete_certificates():
|
||
if not request.is_json:
|
||
return jsonify({'success': False, 'message': 'Invalid request'}), 400
|
||
|
||
data = request.get_json()
|
||
cert_ids = data.get('ids', [])
|
||
|
||
if not cert_ids:
|
||
return jsonify({'success': False, 'message': 'No certificates selected'}), 400
|
||
|
||
conn = get_db_connection()
|
||
if not conn:
|
||
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
|
||
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 检查权限并删除
|
||
for cert_id in cert_ids:
|
||
# 验证证书存在且用户有权限
|
||
cursor.execute("SELECT created_by, cert_path, key_path, csr_path FROM certificates WHERE id = %s",
|
||
(cert_id,))
|
||
cert = cursor.fetchone()
|
||
|
||
if not cert:
|
||
continue
|
||
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
continue
|
||
|
||
# 删除文件
|
||
try:
|
||
files_to_delete = [
|
||
cert['cert_path'],
|
||
cert['key_path'],
|
||
cert['csr_path']
|
||
]
|
||
|
||
# 删除所有指定文件
|
||
for file_path in files_to_delete:
|
||
if file_path and os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
|
||
# 删除证书目录
|
||
cert_dir = os.path.dirname(cert['cert_path'])
|
||
if os.path.exists(cert_dir):
|
||
shutil.rmtree(cert_dir) # 递归删除目录
|
||
except OSError as e:
|
||
print(f"文件删除错误: {e}")
|
||
continue
|
||
|
||
# 删除数据库记录
|
||
cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,))
|
||
|
||
conn.commit()
|
||
return jsonify({'success': True, 'message': '批量删除成功'})
|
||
|
||
except Error as e:
|
||
conn.rollback()
|
||
print(f"Database error: {e}")
|
||
return jsonify({'success': False, 'message': '数据库操作失败'}), 500
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
@app.route('/cas/create', methods=['GET', 'POST'])
|
||
@login_required
|
||
def create_ca_view():
|
||
if request.method == 'POST':
|
||
ca_name = request.form['ca_name'].strip()
|
||
common_name = request.form['common_name'].strip()
|
||
organization = request.form['organization'].strip()
|
||
organizational_unit = request.form['organizational_unit'].strip()
|
||
country = request.form['country'].strip()
|
||
state = request.form['state'].strip()
|
||
locality = request.form['locality'].strip()
|
||
key_size = int(request.form['key_size'])
|
||
days_valid = int(request.form['days_valid'])
|
||
|
||
# 名称校验
|
||
if not validate_name(ca_name):
|
||
flash('CA名称无效:只能包含中文、字母、数字、下划线和短横线,且不能以短横线开头或结尾', 'danger')
|
||
return render_template('create_ca.html')
|
||
|
||
if not validate_common_name(common_name):
|
||
flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger')
|
||
return render_template('create_ca.html')
|
||
|
||
# 检查CA名称是否已存在
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT id FROM certificate_authorities WHERE name = %s", (ca_name,))
|
||
if cursor.fetchone():
|
||
flash('CA名称已存在,请使用其他名称', 'danger')
|
||
return render_template('create_ca.html')
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('检查CA名称失败', 'danger')
|
||
return render_template('create_ca.html')
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
ca_id = create_ca(ca_name, common_name, organization, organizational_unit,
|
||
country, state, locality, key_size, days_valid, current_user.id)
|
||
|
||
if ca_id:
|
||
flash('CA创建成功', 'success')
|
||
return redirect(url_for('ca_list'))
|
||
else:
|
||
flash('CA创建失败', 'danger')
|
||
|
||
return render_template('create_ca.html')
|
||
|
||
from datetime import timedelta # 确保顶部已导入
|
||
|
||
|
||
@app.route('/cas/<int:ca_id>')
|
||
@login_required
|
||
def ca_detail(ca_id):
|
||
page = request.args.get('page', 1, type=int)
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA不存在', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('无权访问此CA', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 获取该CA颁发的证书(分页)
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
|
||
# 获取证书总数
|
||
cursor.execute("""
|
||
SELECT COUNT(*) as total FROM certificates
|
||
WHERE ca_id = %s
|
||
""", (ca_id,))
|
||
total = cursor.fetchone()['total']
|
||
total_pages = ceil(total / PER_PAGE)
|
||
|
||
# 获取分页数据
|
||
offset = (page - 1) * PER_PAGE
|
||
cursor.execute("""
|
||
SELECT * FROM certificates
|
||
WHERE ca_id = %s
|
||
ORDER BY created_at DESC
|
||
LIMIT %s OFFSET %s
|
||
""", (ca_id, PER_PAGE, offset))
|
||
certificates = cursor.fetchall()
|
||
|
||
# 获取CRL信息
|
||
cursor.execute("""
|
||
SELECT * FROM certificate_revocation_list
|
||
WHERE ca_id = %s
|
||
""", (ca_id,))
|
||
crl = cursor.fetchone()
|
||
|
||
return render_template(
|
||
'ca_detail.html',
|
||
ca=ca,
|
||
certificates=certificates,
|
||
crl=crl,
|
||
page=page,
|
||
total_pages=total_pages,
|
||
total=total,
|
||
timedelta=timedelta,
|
||
get_username=get_username
|
||
)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取CA详情失败', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('ca_list'))
|
||
|
||
def get_username(user_id):
|
||
"""根据用户ID获取用户名"""
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT username FROM users WHERE id = %s", (user_id,))
|
||
user = cursor.fetchone()
|
||
return user['username'] if user else str(user_id)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return str(user_id)
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return str(user_id)
|
||
|
||
@app.route('/cas/<int:ca_id>/generate_crl')
|
||
@login_required
|
||
def generate_crl_view(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA不存在', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('无权操作此CA', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
if generate_crl(ca_id):
|
||
flash('CRL生成成功', 'success')
|
||
else:
|
||
flash('CRL生成失败', 'danger')
|
||
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
|
||
|
||
@app.route('/cas/<int:ca_id>/download_crl')
|
||
@login_required
|
||
def download_crl(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA不存在', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('无权操作此CA', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 获取CRL路径
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("""
|
||
SELECT crl_path FROM certificate_revocation_list
|
||
WHERE ca_id = %s
|
||
""", (ca_id,))
|
||
crl = cursor.fetchone()
|
||
|
||
if crl and os.path.exists(crl['crl_path']):
|
||
return send_from_directory(
|
||
os.path.dirname(crl['crl_path']),
|
||
os.path.basename(crl['crl_path']),
|
||
as_attachment=True
|
||
)
|
||
else:
|
||
flash('CRL文件不存在', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取CRL失败', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
|
||
|
||
@app.route('/certificates/create', methods=['GET', 'POST'])
|
||
@login_required
|
||
def create_certificate_view():
|
||
if request.method == 'POST':
|
||
common_name = request.form['common_name'].strip()
|
||
san_dns = request.form.get('san_dns', '').strip()
|
||
san_ip = request.form.get('san_ip', '').strip()
|
||
organization = request.form['organization'].strip()
|
||
organizational_unit = request.form['organizational_unit'].strip()
|
||
country = request.form['country'].strip()
|
||
state = request.form['state'].strip()
|
||
locality = request.form['locality'].strip()
|
||
key_size = int(request.form['key_size'])
|
||
days_valid = int(request.form['days_valid'])
|
||
ca_id = int(request.form['ca_id'])
|
||
|
||
# 通用名校验
|
||
if not validate_common_name(common_name):
|
||
flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
|
||
# SAN DNS校验
|
||
if san_dns:
|
||
for dns in san_dns.split(','):
|
||
dns = dns.strip()
|
||
if not validate_common_name(dns):
|
||
flash(f'DNS SAN条目无效: {dns},只能包含字母、数字、点号和短横线', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
|
||
# SAN IP校验
|
||
if san_ip:
|
||
for ip in san_ip.split(','):
|
||
ip = ip.strip()
|
||
if not re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip):
|
||
flash(f'IP SAN条目无效: {ip},请输入有效的IPv4地址', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
|
||
# 检查证书是否已存在
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT id FROM certificates WHERE common_name = %s AND ca_id = %s",
|
||
(common_name, ca_id))
|
||
if cursor.fetchone():
|
||
flash('该CA下已存在相同通用名的证书', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('检查证书名称失败', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
cert_id = create_certificate(ca_id, common_name, san_dns, san_ip, organization,
|
||
organizational_unit, country, state, locality,
|
||
key_size, days_valid, current_user.id)
|
||
|
||
if cert_id:
|
||
flash('证书创建成功', 'success')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
else:
|
||
flash('证书创建失败', 'danger')
|
||
|
||
# 获取可用的CA
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
if current_user.is_admin:
|
||
cursor.execute("SELECT id, name FROM certificate_authorities")
|
||
else:
|
||
cursor.execute("""
|
||
SELECT id, name FROM certificate_authorities
|
||
WHERE created_by = %s
|
||
""", (current_user.id,))
|
||
cas = cursor.fetchall()
|
||
return render_template('create_certificate.html', cas=cas)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取CA列表失败', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
@app.route('/certificates/<int:cert_id>')
|
||
@login_required
|
||
def certificate_detail(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权访问此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 获取CA信息
|
||
ca = get_ca_by_id(cert['ca_id'])
|
||
|
||
return render_template('certificate_detail.html', cert=cert, ca=ca, get_username=get_username)
|
||
|
||
|
||
@app.route('/certificates/<int:cert_id>/revoke', methods=['GET', 'POST'])
|
||
@login_required
|
||
def revoke_certificate_view(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权操作此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
if cert['status'] == 'revoked':
|
||
flash('证书已被吊销', 'warning')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
|
||
if request.method == 'POST':
|
||
reason = request.form['reason']
|
||
if revoke_certificate(cert_id, reason):
|
||
flash('证书吊销成功', 'success')
|
||
# 更新CRL
|
||
generate_crl(cert['ca_id'])
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
else:
|
||
flash('证书吊销失败', 'danger')
|
||
|
||
return render_template('revoke_certificate.html', cert=cert)
|
||
|
||
|
||
@app.route('/certificates/<int:cert_id>/renew', methods=['GET', 'POST'])
|
||
@login_required
|
||
def renew_certificate_view(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权操作此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
if request.method == 'POST':
|
||
days_valid = int(request.form['days_valid'])
|
||
if renew_certificate(cert_id, days_valid):
|
||
flash('证书续期成功', 'success')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
else:
|
||
flash('证书续期失败', 'danger')
|
||
|
||
return render_template('renew_certificate.html', cert=cert)
|
||
|
||
|
||
@app.route('/cas/<int:ca_id>/export')
|
||
@login_required
|
||
def export_ca_view(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA not found', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('No permission', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 创建临时zip文件
|
||
memory_file = BytesIO()
|
||
|
||
try:
|
||
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||
# 添加CA证书文件
|
||
with open(ca['cert_path'], 'rb') as f: # 使用二进制模式读取
|
||
cert_content = f.read()
|
||
zf.writestr(f"{ca['common_name']}_ca.crt", cert_content)
|
||
|
||
# 添加CA私钥文件(可选,只有管理员可以导出私钥)
|
||
if current_user.is_admin:
|
||
with open(ca['key_path'], 'rb') as f: # 使用二进制模式读取
|
||
key_content = f.read()
|
||
zf.writestr(f"{ca['common_name']}_ca.key", key_content)
|
||
|
||
memory_file.seek(0)
|
||
|
||
# 确保文件名只包含ASCII字符
|
||
safe_filename = f"{ca['name']}_ca_bundle.zip".encode('ascii', 'ignore').decode('ascii')
|
||
|
||
return Response(
|
||
memory_file.getvalue(),
|
||
mimetype="application/zip",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={safe_filename}"
|
||
}
|
||
)
|
||
except Exception as e:
|
||
flash(f'Export failed: {str(e)}', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
@app.route('/certificates/<int:cert_id>/export', methods=['GET', 'POST'])
|
||
@login_required
|
||
def export_certificate_view(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权操作此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
if request.method == 'POST':
|
||
format_type = request.form['format']
|
||
password = request.form.get('password', '')
|
||
|
||
if format_type == 'pkcs12':
|
||
if not password:
|
||
flash('PKCS#12格式需要密码', 'danger')
|
||
return redirect(url_for('export_certificate_view', cert_id=cert_id))
|
||
|
||
pkcs12_path = export_pkcs12(cert_id, password)
|
||
if pkcs12_path:
|
||
return send_from_directory(
|
||
os.path.dirname(pkcs12_path),
|
||
os.path.basename(pkcs12_path),
|
||
as_attachment=True
|
||
)
|
||
else:
|
||
flash('导出PKCS#12失败', 'danger')
|
||
elif format_type == 'pem':
|
||
# 合并证书和私钥为PEM
|
||
pem_content = ""
|
||
with open(cert['cert_path'], 'r') as f:
|
||
pem_content += f.read()
|
||
with open(cert['key_path'], 'r') as f:
|
||
pem_content += f.read()
|
||
|
||
return Response(
|
||
pem_content,
|
||
mimetype="application/x-pem-file",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={cert['common_name']}.pem"
|
||
}
|
||
)
|
||
elif format_type == 'pem_separate':
|
||
# 创建内存中的zip文件(.pem + .key)
|
||
return generate_separate_files_zip(
|
||
cert,
|
||
cert_ext=".pem",
|
||
zip_suffix="_pem_key"
|
||
)
|
||
elif format_type == 'crt_separate':
|
||
# 创建内存中的zip文件(.crt + .key)
|
||
return generate_separate_files_zip(
|
||
cert,
|
||
cert_ext=".crt",
|
||
zip_suffix="_crt_key"
|
||
)
|
||
else:
|
||
flash('不支持的导出格式', 'danger')
|
||
|
||
return render_template('export_certificate.html', cert=cert)
|
||
|
||
# 在app.py中添加以下路由
|
||
|
||
@app.route('/cas/<int:ca_id>/delete', methods=['GET', 'POST'])
|
||
@login_required
|
||
def delete_ca(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA不存在', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('无权删除此CA', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查是否有关联的证书
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,))
|
||
result = cursor.fetchone()
|
||
if result['count'] > 0:
|
||
flash('无法删除CA,因为存在关联的证书', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('检查关联证书失败', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
if request.method == 'POST':
|
||
# 删除文件
|
||
try:
|
||
# 删除CA证书和私钥文件
|
||
if os.path.exists(ca['cert_path']):
|
||
os.remove(ca['cert_path'])
|
||
if os.path.exists(ca['key_path']):
|
||
os.remove(ca['key_path'])
|
||
|
||
# 删除CA目录及其所有内容
|
||
ca_dir = os.path.dirname(ca['cert_path'])
|
||
if os.path.exists(ca_dir):
|
||
# 删除目录中的所有文件和子目录
|
||
for filename in os.listdir(ca_dir):
|
||
file_path = os.path.join(ca_dir, filename)
|
||
try:
|
||
if os.path.isfile(file_path) or os.path.islink(file_path):
|
||
os.unlink(file_path)
|
||
elif os.path.isdir(file_path):
|
||
shutil.rmtree(file_path)
|
||
except Exception as e:
|
||
print(f'删除文件/目录失败 {file_path}. 原因: {e}')
|
||
flash(f'删除文件/目录失败: {str(e)}', 'warning')
|
||
|
||
# 现在删除空目录
|
||
try:
|
||
os.rmdir(ca_dir)
|
||
except OSError as e:
|
||
print(f"删除目录失败: {e}")
|
||
flash(f'删除目录失败: {str(e)}', 'warning')
|
||
except OSError as e:
|
||
print(f"文件删除错误: {e}")
|
||
flash(f'删除文件时出错: {str(e)}', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
|
||
# 删除数据库记录
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
# 先删除CRL记录
|
||
cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,))
|
||
# 再删除CA记录
|
||
cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,))
|
||
conn.commit()
|
||
flash('CA删除成功', 'success')
|
||
return redirect(url_for('ca_list'))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
conn.rollback()
|
||
flash('删除CA记录失败', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return render_template('confirm_delete_ca.html', ca=ca)
|
||
|
||
@app.route('/certificates/<int:cert_id>/delete', methods=['GET', 'POST'])
|
||
@login_required
|
||
def delete_certificate(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权删除此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
if request.method == 'POST':
|
||
# 删除文件
|
||
try:
|
||
# 确保所有文件路径都存在
|
||
files_to_delete = [
|
||
cert['cert_path'],
|
||
cert['key_path'],
|
||
cert['csr_path']
|
||
]
|
||
|
||
# 删除所有指定文件
|
||
for file_path in files_to_delete:
|
||
if file_path and os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
|
||
# 删除证书目录及其所有内容
|
||
cert_dir = os.path.dirname(cert['cert_path'])
|
||
if os.path.exists(cert_dir):
|
||
# 删除目录中的所有文件
|
||
for filename in os.listdir(cert_dir):
|
||
file_path = os.path.join(cert_dir, filename)
|
||
try:
|
||
if os.path.isfile(file_path) or os.path.islink(file_path):
|
||
os.unlink(file_path)
|
||
elif os.path.isdir(file_path):
|
||
shutil.rmtree(file_path)
|
||
except Exception as e:
|
||
print(f'Failed to delete {file_path}. Reason: {e}')
|
||
|
||
# 现在删除空目录
|
||
os.rmdir(cert_dir)
|
||
|
||
except OSError as e:
|
||
print(f"文件删除错误: {e}")
|
||
flash(f'删除文件时出错: {str(e)}', 'danger')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
|
||
# 删除数据库记录
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,))
|
||
conn.commit()
|
||
flash('证书删除成功', 'success')
|
||
return redirect(url_for('certificate_list'))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('删除证书记录失败', 'danger')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return render_template('confirm_delete_certificate.html', cert=cert)
|
||
def generate_separate_files_zip(cert, cert_ext, zip_suffix):
|
||
"""生成包含分开文件的ZIP包通用函数"""
|
||
memory_file = BytesIO()
|
||
|
||
try:
|
||
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||
# 添加证书文件
|
||
with open(cert['cert_path'], 'r') as f:
|
||
cert_content = f.read()
|
||
zf.writestr(f"{cert['common_name']}{cert_ext}", cert_content)
|
||
|
||
# 添加私钥文件
|
||
with open(cert['key_path'], 'r') as f:
|
||
key_content = f.read()
|
||
zf.writestr(f"{cert['common_name']}.key", key_content)
|
||
|
||
memory_file.seek(0)
|
||
|
||
return Response(
|
||
memory_file.getvalue(),
|
||
mimetype="application/zip",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={cert['common_name']}{zip_suffix}.zip"
|
||
}
|
||
)
|
||
except Exception as e:
|
||
flash(f'创建ZIP文件失败: {str(e)}', 'danger')
|
||
return redirect(url_for('export_certificate_view', cert_id=cert['id']))
|
||
|
||
@app.route('/download/<path:filename>')
|
||
@login_required
|
||
def download_file(filename):
|
||
# 安全检查:确保文件路径在证书存储目录内
|
||
file_path = os.path.join(Config.CERT_STORE, filename)
|
||
if not os.path.exists(file_path):
|
||
flash('文件不存在', 'danger')
|
||
return redirect(url_for('index'))
|
||
|
||
# 检查权限
|
||
# 这里需要根据实际业务逻辑检查用户是否有权下载该文件
|
||
# 简单示例:只允许下载自己的证书文件
|
||
|
||
return send_from_directory(
|
||
os.path.dirname(file_path),
|
||
os.path.basename(file_path),
|
||
as_attachment=True
|
||
)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app.run(debug=Config.DEBUG, host=Config.APP_HOST, port=Config.APP_PORT) |