# app.py import os import subprocess from datetime import datetime, timedelta from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, Response, current_app from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash import mysql.connector from mysql.connector import Error import random import string from io import BytesIO import zipfile import shutil import re from pypinyin import pinyin, Style from flask_mail import Mail from PIL import Image, ImageDraw, ImageFont from flask_migrate import Migrate from dotenv import load_dotenv from pathlib import Path from urllib.parse import urljoin # 先加载环境变量(必须在创建app之前) load_dotenv(Path(__file__).parent / '.env', override=True) def generate_full_url(endpoint, **values): """生成完整的URL,考虑自定义域名和协议""" base_url = f"{current_app.config['APP_PROTOCOL']}://{current_app.config['APP_DOMAIN']}" path = url_for(endpoint, **values) return urljoin(base_url, path) # 从配置文件中导入配置 from config import Config app = Flask(__name__, static_folder='static') app.config.from_object(Config) # 初始化邮件扩展 mail = Mail(app) @app.context_processor def inject_now(): return {'now': datetime.now()} # 确保证书存储目录存在 os.makedirs(Config.CERT_STORE, exist_ok=True) # 初始化数据库 from database import initialize_database initialize_database() # 初始化数据库迁移 #migrate = Migrate(app, db) # Flask-Login 配置 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' from io import BytesIO from flask import send_file import random import string from PIL import Image, ImageDraw, ImageFont def generate_captcha_image(): # 生成4位随机验证码(字母和数字) captcha_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) # 创建图片(120x40像素,白色背景) image = Image.new('RGB', (120, 40), color=(255, 255, 255)) draw = ImageDraw.Draw(image) # 尝试加载字体,失败则使用默认字体 try: # 指定字体路径(假设字体文件是 static/arial.ttf) font_path = os.path.join(current_app.static_folder, "arial.ttf") font = ImageFont.truetype(font_path, 24) except Exception as e: print(f"加载字体失败: {e}") font = ImageFont.load_default() # 回退到默认字体 # 绘制验证码文本(每个字符随机颜色) for i, char in enumerate(captcha_code): text_color = ( random.randint(0, 150), # R random.randint(0, 150), # G random.randint(0, 150) # B ) draw.text( (10 + i * 25, 5), # 位置 char, # 文本 font=font, # 字体 fill=text_color # 颜色 ) # 添加5条干扰线(随机位置和颜色) for _ in range(5): line_color = ( random.randint(0, 255), # R random.randint(0, 255), # G random.randint(0, 255) # B ) draw.line( [ random.randint(0, 120), # x1 random.randint(0, 40), # y1 random.randint(0, 120), # x2 random.randint(0, 40) # y2 ], fill=line_color, width=1 ) # 保存验证码到数据库 conn = get_db_connection() if conn: try: cursor = conn.cursor() # 清除10分钟前的旧验证码 cursor.execute("DELETE FROM captcha WHERE created_at < NOW() - INTERVAL 10 MINUTE") # 插入新验证码 cursor.execute("INSERT INTO captcha (code) VALUES (%s)", (captcha_code,)) conn.commit() except Error as e: print(f"Database error: {e}") finally: if conn.is_connected(): cursor.close() conn.close() # 返回图片字节流 img_io = BytesIO() image.save(img_io, 'PNG') img_io.seek(0) return img_io, captcha_code @app.route('/captcha') def captcha(): img_io, _ = generate_captcha_image() return send_file(img_io, mimetype='image/png') def to_pinyin(text): """将中文转换为拼音""" if not text: return "" # 获取拼音列表,不带声调 pinyin_list = pinyin(text, style=Style.NORMAL) # 拼接成字符串 return "_".join([item[0] for item in pinyin_list]) # 注册模板过滤器 @app.template_filter('to_pinyin') def jinja2_to_pinyin(text): return to_pinyin(text) class User(UserMixin): pass @login_manager.user_loader def load_user(user_id): try: conn = mysql.connector.connect(**Config.DB_CONFIG) cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user_data = cursor.fetchone() if user_data: user = User() user.id = user_data['id'] user.username = user_data['username'] user.is_admin = user_data['is_admin'] return user return None except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() # 辅助函数 def get_db_connection(): try: conn = mysql.connector.connect(**Config.DB_CONFIG) return conn except Error as e: print(f"Database connection error: {e}") return None def verify_captcha(user_input): """验证用户输入的验证码是否正确(只验证最新的4位验证码)""" conn = get_db_connection() if conn: try: cursor = conn.cursor() # 只查询最新的验证码(确保是4位的) cursor.execute(""" SELECT code FROM captcha WHERE LENGTH(code) = 4 -- 只查询4位验证码 ORDER BY created_at DESC LIMIT 1 """) result = cursor.fetchone() if result and user_input.upper() == result[0]: # 验证成功后删除已使用的验证码 cursor.execute("DELETE FROM captcha WHERE code = %s", (result[0],)) conn.commit() return True return False except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() return False def validate_name(name, max_length=64): """ 校验名称是否符合规范 规则: 1. 长度1-64个字符 2. 只能包含字母、数字、中文、下划线、短横线 3. 不能以短横线开头或结尾 """ if not name or len(name) > max_length: return False # 允许中文、字母、数字、下划线、短横线 pattern = r'^[a-zA-Z0-9_\-\u4e00-\u9fa5]+$' if not re.match(pattern, name): return False # 不能以短横线开头或结尾 if name.startswith('-') or name.endswith('-'): return False return True def validate_common_name(cn): """ 校验通用名(Common Name)是否符合规范 规则: 1. 长度1-64个字符 2. 只能包含字母、数字、点号(.)和短横线(-) 3. 不能以点号或短横线开头或结尾 4. 不能连续两个点号或短横线 """ if not cn or len(cn) > 64: return False # 只允许字母、数字、点号和短横线 if not re.match(r'^[a-zA-Z0-9.-]+$', cn): return False # 不能以点号或短横线开头或结尾 if cn.startswith('.') or cn.endswith('.') or cn.startswith('-') or cn.endswith('-'): return False # 不能连续两个点号或短横线 if '..' in cn or '--' in cn: return False return True def create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, created_by): # 创建拼音格式的目录名 pinyin_name = to_pinyin(ca_name) ca_dir = os.path.join(Config.CERT_STORE, f"ca_{pinyin_name}") os.makedirs(ca_dir, exist_ok=True) # 使用原始common_name作为文件名 key_path = os.path.join(ca_dir, f"{common_name}.key") cert_path = os.path.join(ca_dir, f"{common_name}.crt") # 创建OpenSSL配置文件 openssl_cnf = f""" [ req ] default_bits = {key_size} distinguished_name = req_distinguished_name x509_extensions = v3_ca prompt = no [ req_distinguished_name ] C = {country} ST = {state} L = {locality} O = {organization} OU = {organizational_unit} CN = {common_name} [ v3_ca ] subjectKeyIdentifier = hash authorityKeyIdentifier = keyid:always,issuer:always basicConstraints = CA:TRUE keyUsage = digitalSignature, keyCertSign, cRLSign """ config_path = os.path.join(ca_dir, 'openssl.cnf') with open(config_path, 'w') as f: f.write(openssl_cnf.strip()) # 生成CA私钥 subprocess.run([ 'openssl', 'genrsa', '-out', key_path, str(key_size) ], check=True) # 生成CA自签名证书(使用配置文件) subprocess.run([ 'openssl', 'req', '-new', '-x509', '-days', str(days_valid), '-key', key_path, '-out', cert_path, '-config', config_path ], check=True) # 保存到数据库 conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" INSERT INTO certificate_authorities (name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, cert_path, key_path, created_by) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, (ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, cert_path, key_path, created_by)) conn.commit() return cursor.lastrowid except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() return None def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, key_size, days_valid, created_by): """创建证书并返回证书ID""" # 获取CA信息 ca = get_ca_by_id(ca_id) if not ca: print(f"CA ID {ca_id} 不存在") return None # 创建证书目录 cert_dir = os.path.join(Config.CERT_STORE, f"certs_{common_name}") os.makedirs(cert_dir, exist_ok=True) key_path = os.path.join(cert_dir, f"{common_name}.key") csr_path = os.path.join(cert_dir, f"{common_name}.csr") cert_path = os.path.join(cert_dir, f"{common_name}.crt") # 1. 生成私钥 try: subprocess.run([ 'openssl', 'genrsa', '-out', key_path, str(key_size) ], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) except subprocess.CalledProcessError as e: print(f"生成私钥失败: {e.stderr.decode()}") return None # 2. 创建CSR配置文件 has_san = bool(san_dns or san_ip) dns_entries = [dns.strip() for dns in san_dns.split(',') if dns.strip()] if san_dns else [] ip_entries = [ip.strip() for ip in san_ip.split(',') if ip.strip()] if san_ip else [] # 构建CSR配置 csr_config = f"""[req] default_bits = {key_size} prompt = no default_md = sha256 distinguished_name = dn """ if has_san: csr_config += "req_extensions = req_ext\n" csr_config += f""" [dn] CN = {common_name} O = {organization} OU = {organizational_unit} C = {country} ST = {state} L = {locality} """ if has_san: csr_config += """ [req_ext] basicConstraints = CA:FALSE keyUsage = digitalSignature, keyEncipherment subjectAltName = @alt_names extendedKeyUsage = serverAuth, clientAuth [alt_names]""" # 添加DNS SAN条目 for i, dns in enumerate(dns_entries, 1): csr_config += f"\nDNS.{i} = {dns}" # 添加IP SAN条目 for i, ip in enumerate(ip_entries, 1): csr_config += f"\nIP.{i} = {ip}" # 写入CSR配置文件 csr_config_path = os.path.join(cert_dir, 'csr_config.cnf') with open(csr_config_path, 'w') as f: f.write(csr_config.strip()) # 确保没有多余空行 # 3. 生成CSR try: subprocess.run([ 'openssl', 'req', '-new', '-key', key_path, '-out', csr_path, '-config', csr_config_path ], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) except subprocess.CalledProcessError as e: print(f"生成CSR失败: {e.stderr.decode()}") print("CSR配置文件内容:") print(csr_config) return None # 4. 创建证书扩展配置文件 ext_config = """authorityKeyIdentifier=keyid,issuer basicConstraints=CA:FALSE keyUsage=digitalSignature, keyEncipherment extendedKeyUsage=serverAuth, clientAuth """ if has_san: ext_config += "subjectAltName=@alt_names\n\n[alt_names]\n" # 添加DNS SAN条目 for i, dns in enumerate(dns_entries, 1): ext_config += f"DNS.{i} = {dns}\n" # 添加IP SAN条目 for i, ip in enumerate(ip_entries, 1): ext_config += f"IP.{i} = {ip}\n" ext_config_path = os.path.join(cert_dir, 'ext.cnf') with open(ext_config_path, 'w') as f: f.write(ext_config.strip()) # 5. 使用CA签名证书 try: cmd = [ 'openssl', 'x509', '-req', '-in', csr_path, '-CA', ca['cert_path'], '-CAkey', ca['key_path'], '-CAcreateserial', '-out', cert_path, '-days', str(days_valid), '-sha256' ] # 只有有扩展内容时才添加-extfile参数 if os.path.getsize(ext_config_path) > 0: cmd.extend(['-extfile', ext_config_path]) subprocess.run(cmd, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) except subprocess.CalledProcessError as e: print(f"证书签名失败: {e.stderr.decode()}") print("扩展配置文件内容:") print(ext_config) return None # 6. 保存到数据库 expires_at = datetime.now() + timedelta(days=days_valid) conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" INSERT INTO certificates (common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at )) conn.commit() return cursor.lastrowid except Error as e: print(f"数据库错误: {e}") conn.rollback() return None finally: if conn.is_connected(): cursor.close() conn.close() return None def get_ca_by_id(ca_id): conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM certificate_authorities WHERE id = %s", (ca_id,)) return cursor.fetchone() except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() return None def get_certificate_by_id(cert_id): conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM certificates WHERE id = %s", (cert_id,)) return cursor.fetchone() except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() return None def revoke_certificate(cert_id, reason): conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" UPDATE certificates SET status = 'revoked', revoked_at = NOW(), revocation_reason = %s WHERE id = %s """, (reason, cert_id)) conn.commit() return True except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() return False def renew_certificate(cert_id, days_valid): cert = get_certificate_by_id(cert_id) if not cert: return False ca = get_ca_by_id(cert['ca_id']) if not ca: return False # 使用现有CSR重新签名 new_cert_path = os.path.join(os.path.dirname(cert['cert_path']), f"renewed_{cert['common_name']}.crt") subprocess.run([ 'openssl', 'x509', '-req', '-in', cert['csr_path'], '-CA', ca['cert_path'], '-CAkey', ca['key_path'], '-CAcreateserial', '-out', new_cert_path, '-days', str(days_valid), '-sha256' ], check=True) # 更新数据库 new_expires_at = datetime.now() + timedelta(days=days_valid) conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" UPDATE certificates SET cert_path = %s, days_valid = %s, expires_at = %s, status = 'active', revoked_at = NULL, revocation_reason = NULL WHERE id = %s """, (new_cert_path, days_valid, new_expires_at, cert_id)) conn.commit() return True except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() return False def generate_crl(ca_id): ca = get_ca_by_id(ca_id) if not ca: return False ca_dir = os.path.dirname(ca['cert_path']) crl_path = os.path.join(ca_dir, f"crl_{ca['name']}.pem") # 创建完整的OpenSSL配置文件 openssl_cnf = f""" [ ca ] default_ca = CA_default [ CA_default ] database = {os.path.join(ca_dir, 'index.txt')} certificate = {ca['cert_path']} private_key = {ca['key_path']} crl = {crl_path} RANDFILE = {os.path.join(ca_dir, '.rand')} default_days = 365 default_crl_days = 30 default_md = sha256 preserve = no policy = policy_anything [ policy_anything ] countryName = optional stateOrProvinceName = optional localityName = optional organizationName = optional organizationalUnitName = optional commonName = optional emailAddress = optional """ cnf_path = os.path.join(ca_dir, 'openssl.cnf') with open(cnf_path, 'w') as f: f.write(openssl_cnf) # 确保index.txt存在 index_file = os.path.join(ca_dir, 'index.txt') if not os.path.exists(index_file): open(index_file, 'a').close() # 生成CRL try: subprocess.run([ 'openssl', 'ca', '-gencrl', '-config', cnf_path, '-out', crl_path ], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) # 更新数据库 next_update = datetime.now() + timedelta(days=30) conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" INSERT INTO certificate_revocation_list (ca_id, crl_path, next_update) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE crl_path = VALUES(crl_path), last_updated = NOW(), next_update = VALUES(next_update) """, (ca_id, crl_path, next_update)) conn.commit() return True except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else "Unknown OpenSSL error" print(f"OpenSSL error: {error_msg}") flash(f'CRL生成失败: {error_msg}', 'danger') except Exception as e: print(f"Error generating CRL: {str(e)}") flash(f'CRL生成异常: {str(e)}', 'danger') return False def export_pkcs12(cert_id, password): cert = get_certificate_by_id(cert_id) if not cert: return None ca = get_ca_by_id(cert['ca_id']) if not ca: return None pkcs12_path = os.path.join(os.path.dirname(cert['cert_path']), f"{cert['common_name']}.p12") subprocess.run([ 'openssl', 'pkcs12', '-export', '-out', pkcs12_path, '-inkey', cert['key_path'], '-in', cert['cert_path'], '-certfile', ca['cert_path'], '-passout', f'pass:{password}' ], check=True) return pkcs12_path @app.route('/register', methods=['GET', 'POST']) def register(): # 检查用户是否已登录 if current_user.is_authenticated: return redirect(url_for('index')) # 检查注册功能是否开放 if not current_app.config['REGISTRATION_OPEN']: flash('系统暂未开放注册', 'warning') return redirect(url_for('login')) if request.method == 'POST': username = request.form['username'].strip() password = request.form['password'] confirm_password = request.form['confirm_password'] email = request.form.get('email', '').strip() captcha = request.form['captcha'] # 验证验证码 if not verify_captcha(captcha): flash('验证码错误', 'danger') return redirect(url_for('register')) # 验证密码匹配 if password != confirm_password: flash('两次输入的密码不匹配', 'danger') return redirect(url_for('register')) # 密码策略检查 password_errors = [] policy = current_app.config['PASSWORD_POLICY'] if len(password) < policy['min_length']: password_errors.append(f"密码长度至少{policy['min_length']}位") if policy['require_uppercase'] and not re.search(r'[A-Z]', password): password_errors.append("必须包含大写字母") if policy['require_lowercase'] and not re.search(r'[a-z]', password): password_errors.append("必须包含小写字母") if policy['require_digits'] and not re.search(r'[0-9]', password): password_errors.append("必须包含数字") if policy['require_special_chars'] and not re.search(r'[^A-Za-z0-9]', password): password_errors.append("必须包含特殊字符") if password_errors: flash('密码不符合要求: ' + ', '.join(password_errors), 'danger') return redirect(url_for('register')) # 验证用户名是否已存在 conn = get_db_connection() if conn: try: cursor = conn.cursor() # 检查用户名和邮箱是否已存在 cursor.execute("SELECT id FROM users WHERE username = %s OR email = %s", (username, email)) if cursor.fetchone(): flash('用户名或邮箱已被注册', 'danger') return redirect(url_for('register')) # 根据配置决定是否立即激活账户 is_active = not current_app.config['EMAIL_VERIFICATION_REQUIRED'] verification_token = None # 创建新用户 password_hash = generate_password_hash(password) cursor.execute(""" INSERT INTO users (username, password_hash, email, is_admin, is_active, verification_token) VALUES (%s, %s, %s, %s, %s, %s) """, (username, password_hash, email, False, is_active, verification_token)) if current_app.config['EMAIL_VERIFICATION_REQUIRED']: # 生成验证令牌 from itsdangerous import URLSafeTimedSerializer serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY']) token = serializer.dumps({'username': username, 'email': email}, salt='email-verify') # 更新用户验证令牌 cursor.execute(""" UPDATE users SET verification_token = %s WHERE username = %s """, (token, username)) # 发送验证邮件 try: from flask_mail import Message msg = Message('请验证您的邮箱 - 自签证书管理系统', sender=current_app.config['MAIL_DEFAULT_SENDER'], recipients=[email]) confirm_url = generate_full_url('verify_email', token=token) # 使用HTML格式的邮件内容 msg.html = f"""

感谢注册!

请点击以下链接完成邮箱验证:

{confirm_url}

(链接24小时内有效)

如果链接无法点击,请复制到浏览器地址栏访问

""" mail = Mail(current_app) mail.send(msg) flash('验证邮件已发送至您的邮箱,请查收并完成验证', 'info') except Exception as e: current_app.logger.error(f'邮件发送失败: {str(e)}') conn.rollback() flash('发送验证邮件失败,请稍后再试或联系管理员', 'danger') return redirect(url_for('register')) conn.commit() if is_active: flash('注册成功,请登录', 'success') return redirect(url_for('login')) else: return redirect(url_for('register')) except Error as e: conn.rollback() current_app.logger.error(f'数据库错误: {str(e)}') flash('注册失败,请稍后再试', 'danger') finally: if conn.is_connected(): cursor.close() conn.close() # 生成新验证码 captcha_url = url_for('captcha') # 使用图片验证码 return render_template('register.html', captcha_url=captcha_url, # 前端改为显示图片验证码 registration_open=current_app.config['REGISTRATION_OPEN'], email_required=current_app.config['EMAIL_VERIFICATION_REQUIRED']) @app.route('/verify-email/') def verify_email(token): if current_user.is_authenticated: return redirect(generate_full_url('index')) try: from itsdangerous import URLSafeTimedSerializer serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY']) data = serializer.loads(token, salt='email-verify', max_age=86400) # 24小时有效期 username = data['username'] email = data['email'] conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) # 验证令牌是否匹配 cursor.execute(""" SELECT id FROM users WHERE username = %s AND email = %s AND verification_token = %s """, (username, email, token)) user = cursor.fetchone() if not user: flash('无效的验证链接', 'danger') return redirect(generate_full_url('login')) # 激活账户 cursor.execute(""" UPDATE users SET is_active = TRUE, verification_token = NULL WHERE id = %s """, (user['id'],)) conn.commit() flash('邮箱验证成功,您现在可以登录了', 'success') return redirect(generate_full_url('login')) except Error as e: conn.rollback() current_app.logger.error(f'数据库错误: {str(e)}') flash('验证过程中出现错误', 'danger') finally: if conn.is_connected(): cursor.close() conn.close() except Exception as e: current_app.logger.error(f'令牌验证失败: {str(e)}') flash('验证链接无效或已过期', 'danger') return redirect(generate_full_url('login')) # 路由定义 @app.route('/') @login_required def index(): conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) # 获取CA数量 if current_user.is_admin: cursor.execute("SELECT COUNT(*) as count FROM certificate_authorities") ca_count = cursor.fetchone()['count'] cursor.fetchall() # 确保清空结果集 cursor.execute(""" SELECT * FROM certificate_authorities ORDER BY created_at DESC LIMIT 5 """) else: cursor.execute(""" SELECT COUNT(*) as count FROM certificate_authorities WHERE created_by = %s """, (current_user.id,)) ca_count = cursor.fetchone()['count'] cursor.fetchall() # 确保清空结果集 cursor.execute(""" SELECT * FROM certificate_authorities WHERE created_by = %s ORDER BY created_at DESC LIMIT 5 """, (current_user.id,)) recent_cas = cursor.fetchall() # 获取证书数量 if current_user.is_admin: cursor.execute("SELECT COUNT(*) as count FROM certificates") cert_count = cursor.fetchone()['count'] cursor.fetchall() # 确保清空结果集 cursor.execute(""" SELECT * FROM certificates ORDER BY created_at DESC LIMIT 5 """) else: cursor.execute(""" SELECT COUNT(*) as count FROM certificates WHERE created_by = %s """, (current_user.id,)) cert_count = cursor.fetchone()['count'] cursor.fetchall() # 确保清空结果集 cursor.execute(""" SELECT * FROM certificates WHERE created_by = %s ORDER BY created_at DESC LIMIT 5 """, (current_user.id,)) recent_certs = cursor.fetchall() # 获取即将过期证书数量(30天内) if current_user.is_admin: cursor.execute(""" SELECT COUNT(*) as count FROM certificates WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY) AND status = 'active' """) else: cursor.execute(""" SELECT COUNT(*) as count FROM certificates WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY) AND status = 'active' AND created_by = %s """, (current_user.id,)) expiring_soon_count = cursor.fetchone()['count'] cursor.fetchall() # 确保清空结果集 # 获取活跃证书数量 if current_user.is_admin: cursor.execute(""" SELECT COUNT(*) as count FROM certificates WHERE status = 'active' """) else: cursor.execute(""" SELECT COUNT(*) as count FROM certificates WHERE status = 'active' AND created_by = %s """, (current_user.id,)) active_count = cursor.fetchone()['count'] cursor.fetchall() # 确保清空结果集 return render_template('index.html', ca_count=ca_count, cert_count=cert_count, expiring_soon_count=expiring_soon_count, active_count=active_count, recent_cas=recent_cas, recent_certs=recent_certs) except Error as e: print(f"Database error: {e}") flash('获取系统统计信息失败', 'danger') return render_template('index.html', ca_count=0, cert_count=0, expiring_soon_count=0, active_count=0, recent_cas=[], recent_certs=[]) finally: if conn.is_connected(): cursor.close() conn.close() return render_template('index.html', ca_count=0, cert_count=0, expiring_soon_count=0, active_count=0, recent_cas=[], recent_certs=[]) @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': username = request.form['username'] password = request.form['password'] captcha = request.form['captcha'] if not verify_captcha(captcha): flash('验证码错误', 'danger') return redirect(url_for('login')) conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT * FROM users WHERE username = %s AND is_active = TRUE """, (username,)) user_data = cursor.fetchone() if user_data and check_password_hash(user_data['password_hash'], password): user = User() user.id = user_data['id'] user.username = user_data['username'] user.is_admin = user_data['is_admin'] login_user(user) flash('登录成功', 'success') return redirect(url_for('index')) else: flash('用户名或密码错误,或账户未激活', 'danger') except Error as e: print(f"Database error: {e}") flash('登录失败,请稍后再试', 'danger') finally: if conn.is_connected(): cursor.close() conn.close() captcha_url = url_for('captcha') return render_template('login.html', captcha_url=captcha_url) @app.route('/logout') @login_required def logout(): logout_user() flash('您已成功登出', 'success') return redirect(url_for('login')) from math import ceil # 每页显示的数量 PER_PAGE = 10 @app.route('/cas') @login_required def ca_list(): page = request.args.get('page', 1, type=int) conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) # 获取总数 if current_user.is_admin: cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities") else: cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities WHERE created_by = %s", (current_user.id,)) total = cursor.fetchone()['total'] total_pages = ceil(total / PER_PAGE) # 获取分页数据 offset = (page - 1) * PER_PAGE if current_user.is_admin: cursor.execute("SELECT * FROM certificate_authorities ORDER BY created_at DESC LIMIT %s OFFSET %s", (PER_PAGE, offset)) else: cursor.execute(""" SELECT * FROM certificate_authorities WHERE created_by = %s ORDER BY created_at DESC LIMIT %s OFFSET %s """, (current_user.id, PER_PAGE, offset)) cas = cursor.fetchall() return render_template('ca_list.html', cas=cas, page=page, total_pages=total_pages, total=total, get_username=get_username) except Error as e: print(f"Database error: {e}") flash('获取CA列表失败', 'danger') return redirect(url_for('index')) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('index')) @app.route('/cas/batch_delete', methods=['POST']) @login_required def batch_delete_cas(): if not request.is_json: return jsonify({'success': False, 'message': 'Invalid request'}), 400 data = request.get_json() ca_ids = data.get('ids', []) if not ca_ids: return jsonify({'success': False, 'message': 'No CAs selected'}), 400 conn = get_db_connection() if not conn: return jsonify({'success': False, 'message': 'Database connection failed'}), 500 try: cursor = conn.cursor() # 检查权限并删除 for ca_id in ca_ids: # 验证CA存在且用户有权限 cursor.execute("SELECT created_by FROM certificate_authorities WHERE id = %s", (ca_id,)) ca = cursor.fetchone() if not ca: continue if not current_user.is_admin and ca['created_by'] != current_user.id: continue # 检查是否有关联证书 cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,)) result = cursor.fetchone() if result['count'] > 0: continue # 获取CA信息以便删除文件 cursor.execute("SELECT cert_path, key_path FROM certificate_authorities WHERE id = %s", (ca_id,)) ca_info = cursor.fetchone() if ca_info: # 删除文件 try: if os.path.exists(ca_info['cert_path']): os.remove(ca_info['cert_path']) if os.path.exists(ca_info['key_path']): os.remove(ca_info['key_path']) # 删除CA目录 ca_dir = os.path.dirname(ca_info['cert_path']) if os.path.exists(ca_dir): shutil.rmtree(ca_dir) # 递归删除目录 except OSError as e: print(f"文件删除错误: {e}") continue # 删除数据库记录 cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,)) cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,)) conn.commit() return jsonify({'success': True, 'message': '批量删除成功'}) except Error as e: conn.rollback() print(f"Database error: {e}") return jsonify({'success': False, 'message': '数据库操作失败'}), 500 finally: if conn.is_connected(): cursor.close() conn.close() @app.route('/certificates') @login_required def certificate_list(): page = request.args.get('page', 1, type=int) conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) # 获取总数 if current_user.is_admin: cursor.execute("SELECT COUNT(*) as total FROM certificates") else: cursor.execute("SELECT COUNT(*) as total FROM certificates WHERE created_by = %s", (current_user.id,)) total = cursor.fetchone()['total'] total_pages = ceil(total / PER_PAGE) # 获取分页数据 offset = (page - 1) * PER_PAGE if current_user.is_admin: cursor.execute(""" SELECT c.*, ca.name as ca_name FROM certificates c JOIN certificate_authorities ca ON c.ca_id = ca.id ORDER BY c.created_at DESC LIMIT %s OFFSET %s """, (PER_PAGE, offset)) else: cursor.execute(""" SELECT c.*, ca.name as ca_name FROM certificates c JOIN certificate_authorities ca ON c.ca_id = ca.id WHERE c.created_by = %s ORDER BY c.created_at DESC LIMIT %s OFFSET %s """, (current_user.id, PER_PAGE, offset)) certificates = cursor.fetchall() return render_template('certificate_list.html', certificates=certificates, page=page, total_pages=total_pages, total=total, get_username=get_username) except Error as e: print(f"Database error: {e}") flash('获取证书列表失败', 'danger') return redirect(url_for('index')) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('index')) @app.route('/certificates/batch_delete', methods=['POST']) @login_required def batch_delete_certificates(): if not request.is_json: return jsonify({'success': False, 'message': 'Invalid request'}), 400 data = request.get_json() cert_ids = data.get('ids', []) if not cert_ids: return jsonify({'success': False, 'message': 'No certificates selected'}), 400 conn = get_db_connection() if not conn: return jsonify({'success': False, 'message': 'Database connection failed'}), 500 try: cursor = conn.cursor(dictionary=True) # 检查权限并删除 for cert_id in cert_ids: # 验证证书存在且用户有权限 cursor.execute("SELECT created_by, cert_path, key_path, csr_path FROM certificates WHERE id = %s", (cert_id,)) cert = cursor.fetchone() if not cert: continue if not current_user.is_admin and cert['created_by'] != current_user.id: continue # 删除文件 try: files_to_delete = [ cert['cert_path'], cert['key_path'], cert['csr_path'] ] # 删除所有指定文件 for file_path in files_to_delete: if file_path and os.path.exists(file_path): os.remove(file_path) # 删除证书目录 cert_dir = os.path.dirname(cert['cert_path']) if os.path.exists(cert_dir): shutil.rmtree(cert_dir) # 递归删除目录 except OSError as e: print(f"文件删除错误: {e}") continue # 删除数据库记录 cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,)) conn.commit() return jsonify({'success': True, 'message': '批量删除成功'}) except Error as e: conn.rollback() print(f"Database error: {e}") return jsonify({'success': False, 'message': '数据库操作失败'}), 500 finally: if conn.is_connected(): cursor.close() conn.close() @app.route('/cas/create', methods=['GET', 'POST']) @login_required def create_ca_view(): if request.method == 'POST': ca_name = request.form['ca_name'].strip() common_name = request.form['common_name'].strip() organization = request.form['organization'].strip() organizational_unit = request.form['organizational_unit'].strip() country = request.form['country'].strip() state = request.form['state'].strip() locality = request.form['locality'].strip() key_size = int(request.form['key_size']) days_valid = int(request.form['days_valid']) # 名称校验 if not validate_name(ca_name): flash('CA名称无效:只能包含中文、字母、数字、下划线和短横线,且不能以短横线开头或结尾', 'danger') return render_template('create_ca.html') if not validate_common_name(common_name): flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger') return render_template('create_ca.html') # 检查CA名称是否已存在 conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id FROM certificate_authorities WHERE name = %s", (ca_name,)) if cursor.fetchone(): flash('CA名称已存在,请使用其他名称', 'danger') return render_template('create_ca.html') except Error as e: print(f"Database error: {e}") flash('检查CA名称失败', 'danger') return render_template('create_ca.html') finally: if conn.is_connected(): cursor.close() conn.close() ca_id = create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, current_user.id) if ca_id: flash('CA创建成功', 'success') return redirect(url_for('ca_list')) else: flash('CA创建失败', 'danger') return render_template('create_ca.html') from datetime import timedelta # 确保顶部已导入 @app.route('/cas/') @login_required def ca_detail(ca_id): page = request.args.get('page', 1, type=int) ca = get_ca_by_id(ca_id) if not ca: flash('CA不存在', 'danger') return redirect(url_for('ca_list')) # 检查权限 if not current_user.is_admin and ca['created_by'] != current_user.id: flash('无权访问此CA', 'danger') return redirect(url_for('ca_list')) # 获取该CA颁发的证书(分页) conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) # 获取证书总数 cursor.execute(""" SELECT COUNT(*) as total FROM certificates WHERE ca_id = %s """, (ca_id,)) total = cursor.fetchone()['total'] total_pages = ceil(total / PER_PAGE) # 获取分页数据 offset = (page - 1) * PER_PAGE cursor.execute(""" SELECT * FROM certificates WHERE ca_id = %s ORDER BY created_at DESC LIMIT %s OFFSET %s """, (ca_id, PER_PAGE, offset)) certificates = cursor.fetchall() # 获取CRL信息 cursor.execute(""" SELECT * FROM certificate_revocation_list WHERE ca_id = %s """, (ca_id,)) crl = cursor.fetchone() return render_template( 'ca_detail.html', ca=ca, certificates=certificates, crl=crl, page=page, total_pages=total_pages, total=total, timedelta=timedelta, get_username=get_username ) except Error as e: print(f"Database error: {e}") flash('获取CA详情失败', 'danger') return redirect(url_for('ca_list')) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('ca_list')) def get_username(user_id): """根据用户ID获取用户名""" conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT username FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() return user['username'] if user else str(user_id) except Error as e: print(f"Database error: {e}") return str(user_id) finally: if conn.is_connected(): cursor.close() conn.close() return str(user_id) @app.route('/cas//generate_crl') @login_required def generate_crl_view(ca_id): ca = get_ca_by_id(ca_id) if not ca: flash('CA不存在', 'danger') return redirect(url_for('ca_list')) # 检查权限 if not current_user.is_admin and ca['created_by'] != current_user.id: flash('无权操作此CA', 'danger') return redirect(url_for('ca_list')) if generate_crl(ca_id): flash('CRL生成成功', 'success') else: flash('CRL生成失败', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) @app.route('/cas//download_crl') @login_required def download_crl(ca_id): ca = get_ca_by_id(ca_id) if not ca: flash('CA不存在', 'danger') return redirect(url_for('ca_list')) # 检查权限 if not current_user.is_admin and ca['created_by'] != current_user.id: flash('无权操作此CA', 'danger') return redirect(url_for('ca_list')) # 获取CRL路径 conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT crl_path FROM certificate_revocation_list WHERE ca_id = %s """, (ca_id,)) crl = cursor.fetchone() if crl and os.path.exists(crl['crl_path']): return send_from_directory( os.path.dirname(crl['crl_path']), os.path.basename(crl['crl_path']), as_attachment=True ) else: flash('CRL文件不存在', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) except Error as e: print(f"Database error: {e}") flash('获取CRL失败', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('ca_detail', ca_id=ca_id)) @app.route('/certificates/create', methods=['GET', 'POST']) @login_required def create_certificate_view(): if request.method == 'POST': common_name = request.form['common_name'].strip() san_dns = request.form.get('san_dns', '').strip() san_ip = request.form.get('san_ip', '').strip() organization = request.form['organization'].strip() organizational_unit = request.form['organizational_unit'].strip() country = request.form['country'].strip() state = request.form['state'].strip() locality = request.form['locality'].strip() key_size = int(request.form['key_size']) days_valid = int(request.form['days_valid']) ca_id = int(request.form['ca_id']) # 通用名校验 if not validate_common_name(common_name): flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger') return redirect(url_for('create_certificate_view')) # SAN DNS校验 if san_dns: for dns in san_dns.split(','): dns = dns.strip() if not validate_common_name(dns): flash(f'DNS SAN条目无效: {dns},只能包含字母、数字、点号和短横线', 'danger') return redirect(url_for('create_certificate_view')) # SAN IP校验 if san_ip: for ip in san_ip.split(','): ip = ip.strip() if not re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip): flash(f'IP SAN条目无效: {ip},请输入有效的IPv4地址', 'danger') return redirect(url_for('create_certificate_view')) # 检查证书是否已存在 conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT id FROM certificates WHERE common_name = %s AND ca_id = %s", (common_name, ca_id)) if cursor.fetchone(): flash('该CA下已存在相同通用名的证书', 'danger') return redirect(url_for('create_certificate_view')) except Error as e: print(f"Database error: {e}") flash('检查证书名称失败', 'danger') return redirect(url_for('create_certificate_view')) finally: if conn.is_connected(): cursor.close() conn.close() cert_id = create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, key_size, days_valid, current_user.id) if cert_id: flash('证书创建成功', 'success') return redirect(url_for('certificate_detail', cert_id=cert_id)) else: flash('证书创建失败', 'danger') # 获取可用的CA conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) if current_user.is_admin: cursor.execute("SELECT id, name FROM certificate_authorities") else: cursor.execute(""" SELECT id, name FROM certificate_authorities WHERE created_by = %s """, (current_user.id,)) cas = cursor.fetchall() return render_template('create_certificate.html', cas=cas) except Error as e: print(f"Database error: {e}") flash('获取CA列表失败', 'danger') return redirect(url_for('certificate_list')) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('certificate_list')) @app.route('/certificates/') @login_required def certificate_detail(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权访问此证书', 'danger') return redirect(url_for('certificate_list')) # 获取CA信息 ca = get_ca_by_id(cert['ca_id']) return render_template('certificate_detail.html', cert=cert, ca=ca, get_username=get_username) @app.route('/certificates//revoke', methods=['GET', 'POST']) @login_required def revoke_certificate_view(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权操作此证书', 'danger') return redirect(url_for('certificate_list')) if cert['status'] == 'revoked': flash('证书已被吊销', 'warning') return redirect(url_for('certificate_detail', cert_id=cert_id)) if request.method == 'POST': reason = request.form['reason'] if revoke_certificate(cert_id, reason): flash('证书吊销成功', 'success') # 更新CRL generate_crl(cert['ca_id']) return redirect(url_for('certificate_detail', cert_id=cert_id)) else: flash('证书吊销失败', 'danger') return render_template('revoke_certificate.html', cert=cert) @app.route('/certificates//renew', methods=['GET', 'POST']) @login_required def renew_certificate_view(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权操作此证书', 'danger') return redirect(url_for('certificate_list')) if request.method == 'POST': days_valid = int(request.form['days_valid']) if renew_certificate(cert_id, days_valid): flash('证书续期成功', 'success') return redirect(url_for('certificate_detail', cert_id=cert_id)) else: flash('证书续期失败', 'danger') return render_template('renew_certificate.html', cert=cert) @app.route('/cas//export') @login_required def export_ca_view(ca_id): ca = get_ca_by_id(ca_id) if not ca: flash('CA not found', 'danger') return redirect(url_for('ca_list')) # 检查权限 if not current_user.is_admin and ca['created_by'] != current_user.id: flash('No permission', 'danger') return redirect(url_for('ca_list')) # 创建临时zip文件 memory_file = BytesIO() try: with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf: # 添加CA证书文件 with open(ca['cert_path'], 'rb') as f: # 使用二进制模式读取 cert_content = f.read() zf.writestr(f"{ca['common_name']}_ca.crt", cert_content) # 添加CA私钥文件(可选,只有管理员可以导出私钥) if current_user.is_admin: with open(ca['key_path'], 'rb') as f: # 使用二进制模式读取 key_content = f.read() zf.writestr(f"{ca['common_name']}_ca.key", key_content) memory_file.seek(0) # 确保文件名只包含ASCII字符 safe_filename = f"{ca['name']}_ca_bundle.zip".encode('ascii', 'ignore').decode('ascii') return Response( memory_file.getvalue(), mimetype="application/zip", headers={ "Content-Disposition": f"attachment; filename={safe_filename}" } ) except Exception as e: flash(f'Export failed: {str(e)}', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) @app.route('/certificates//export', methods=['GET', 'POST']) @login_required def export_certificate_view(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权操作此证书', 'danger') return redirect(url_for('certificate_list')) if request.method == 'POST': format_type = request.form['format'] password = request.form.get('password', '') if format_type == 'pkcs12': if not password: flash('PKCS#12格式需要密码', 'danger') return redirect(url_for('export_certificate_view', cert_id=cert_id)) pkcs12_path = export_pkcs12(cert_id, password) if pkcs12_path: return send_from_directory( os.path.dirname(pkcs12_path), os.path.basename(pkcs12_path), as_attachment=True ) else: flash('导出PKCS#12失败', 'danger') elif format_type == 'pem': # 合并证书和私钥为PEM pem_content = "" with open(cert['cert_path'], 'r') as f: pem_content += f.read() with open(cert['key_path'], 'r') as f: pem_content += f.read() return Response( pem_content, mimetype="application/x-pem-file", headers={ "Content-Disposition": f"attachment; filename={cert['common_name']}.pem" } ) elif format_type == 'pem_separate': # 创建内存中的zip文件(.pem + .key) return generate_separate_files_zip( cert, cert_ext=".pem", zip_suffix="_pem_key" ) elif format_type == 'crt_separate': # 创建内存中的zip文件(.crt + .key) return generate_separate_files_zip( cert, cert_ext=".crt", zip_suffix="_crt_key" ) else: flash('不支持的导出格式', 'danger') return render_template('export_certificate.html', cert=cert) # 在app.py中添加以下路由 @app.route('/cas//delete', methods=['GET', 'POST']) @login_required def delete_ca(ca_id): ca = get_ca_by_id(ca_id) if not ca: flash('CA不存在', 'danger') return redirect(url_for('ca_list')) # 检查权限 if not current_user.is_admin and ca['created_by'] != current_user.id: flash('无权删除此CA', 'danger') return redirect(url_for('ca_list')) # 检查是否有关联的证书 conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,)) result = cursor.fetchone() if result['count'] > 0: flash('无法删除CA,因为存在关联的证书', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) except Error as e: print(f"Database error: {e}") flash('检查关联证书失败', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) finally: if conn.is_connected(): cursor.close() conn.close() if request.method == 'POST': # 删除文件 try: # 删除CA证书和私钥文件 if os.path.exists(ca['cert_path']): os.remove(ca['cert_path']) if os.path.exists(ca['key_path']): os.remove(ca['key_path']) # 删除CA目录及其所有内容 ca_dir = os.path.dirname(ca['cert_path']) if os.path.exists(ca_dir): # 删除目录中的所有文件和子目录 for filename in os.listdir(ca_dir): file_path = os.path.join(ca_dir, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(f'删除文件/目录失败 {file_path}. 原因: {e}') flash(f'删除文件/目录失败: {str(e)}', 'warning') # 现在删除空目录 try: os.rmdir(ca_dir) except OSError as e: print(f"删除目录失败: {e}") flash(f'删除目录失败: {str(e)}', 'warning') except OSError as e: print(f"文件删除错误: {e}") flash(f'删除文件时出错: {str(e)}', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) # 删除数据库记录 conn = get_db_connection() if conn: try: cursor = conn.cursor() # 先删除CRL记录 cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,)) # 再删除CA记录 cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,)) conn.commit() flash('CA删除成功', 'success') return redirect(url_for('ca_list')) except Error as e: print(f"Database error: {e}") conn.rollback() flash('删除CA记录失败', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) finally: if conn.is_connected(): cursor.close() conn.close() return render_template('confirm_delete_ca.html', ca=ca) @app.route('/certificates//delete', methods=['GET', 'POST']) @login_required def delete_certificate(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权删除此证书', 'danger') return redirect(url_for('certificate_list')) if request.method == 'POST': # 删除文件 try: # 确保所有文件路径都存在 files_to_delete = [ cert['cert_path'], cert['key_path'], cert['csr_path'] ] # 删除所有指定文件 for file_path in files_to_delete: if file_path and os.path.exists(file_path): os.remove(file_path) # 删除证书目录及其所有内容 cert_dir = os.path.dirname(cert['cert_path']) if os.path.exists(cert_dir): # 删除目录中的所有文件 for filename in os.listdir(cert_dir): file_path = os.path.join(cert_dir, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(f'Failed to delete {file_path}. Reason: {e}') # 现在删除空目录 os.rmdir(cert_dir) except OSError as e: print(f"文件删除错误: {e}") flash(f'删除文件时出错: {str(e)}', 'danger') return redirect(url_for('certificate_detail', cert_id=cert_id)) # 删除数据库记录 conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,)) conn.commit() flash('证书删除成功', 'success') return redirect(url_for('certificate_list')) except Error as e: print(f"Database error: {e}") flash('删除证书记录失败', 'danger') return redirect(url_for('certificate_detail', cert_id=cert_id)) finally: if conn.is_connected(): cursor.close() conn.close() return render_template('confirm_delete_certificate.html', cert=cert) def generate_separate_files_zip(cert, cert_ext, zip_suffix): """生成包含分开文件的ZIP包通用函数""" memory_file = BytesIO() try: with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf: # 添加证书文件 with open(cert['cert_path'], 'r') as f: cert_content = f.read() zf.writestr(f"{cert['common_name']}{cert_ext}", cert_content) # 添加私钥文件 with open(cert['key_path'], 'r') as f: key_content = f.read() zf.writestr(f"{cert['common_name']}.key", key_content) memory_file.seek(0) return Response( memory_file.getvalue(), mimetype="application/zip", headers={ "Content-Disposition": f"attachment; filename={cert['common_name']}{zip_suffix}.zip" } ) except Exception as e: flash(f'创建ZIP文件失败: {str(e)}', 'danger') return redirect(url_for('export_certificate_view', cert_id=cert['id'])) @app.route('/download/') @login_required def download_file(filename): # 安全检查:确保文件路径在证书存储目录内 file_path = os.path.join(Config.CERT_STORE, filename) if not os.path.exists(file_path): flash('文件不存在', 'danger') return redirect(url_for('index')) # 检查权限 # 这里需要根据实际业务逻辑检查用户是否有权下载该文件 # 简单示例:只允许下载自己的证书文件 return send_from_directory( os.path.dirname(file_path), os.path.basename(file_path), as_attachment=True ) if __name__ == '__main__': app.run(debug=Config.DEBUG, host=Config.APP_HOST, port=Config.APP_PORT)