# app.py
import os
import subprocess
from datetime import datetime, timedelta
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, Response, current_app
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
import mysql.connector
from mysql.connector import Error
import random
import string
from io import BytesIO
import zipfile
import shutil
import re
from pypinyin import pinyin, Style
from flask_mail import Mail
from PIL import Image, ImageDraw, ImageFont
from flask_migrate import Migrate
from dotenv import load_dotenv
from pathlib import Path
from urllib.parse import urljoin
# 先加载环境变量(必须在创建app之前)
load_dotenv(Path(__file__).parent / '.env', override=True)
def generate_full_url(endpoint, **values):
"""生成完整的URL,考虑自定义域名和协议"""
base_url = f"{current_app.config['APP_PROTOCOL']}://{current_app.config['APP_DOMAIN']}"
path = url_for(endpoint, **values)
return urljoin(base_url, path)
# 从配置文件中导入配置
from config import Config
app = Flask(__name__, static_folder='static')
app.config.from_object(Config)
# 初始化邮件扩展
mail = Mail(app)
@app.context_processor
def inject_now():
return {'now': datetime.now()}
# 确保证书存储目录存在
os.makedirs(Config.CERT_STORE, exist_ok=True)
# 初始化数据库
from database import initialize_database
initialize_database()
# 初始化数据库迁移
#migrate = Migrate(app, db)
# Flask-Login 配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
from io import BytesIO
from flask import send_file
import random
import string
from PIL import Image, ImageDraw, ImageFont
def generate_captcha_image():
# 生成4位随机验证码(字母和数字)
captcha_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
# 创建图片(120x40像素,白色背景)
image = Image.new('RGB', (120, 40), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
# 尝试加载字体,失败则使用默认字体
try:
# 指定字体路径(假设字体文件是 static/arial.ttf)
font_path = os.path.join(current_app.static_folder, "arial.ttf")
font = ImageFont.truetype(font_path, 24)
except Exception as e:
print(f"加载字体失败: {e}")
font = ImageFont.load_default() # 回退到默认字体
# 绘制验证码文本(每个字符随机颜色)
for i, char in enumerate(captcha_code):
text_color = (
random.randint(0, 150), # R
random.randint(0, 150), # G
random.randint(0, 150) # B
)
draw.text(
(10 + i * 25, 5), # 位置
char, # 文本
font=font, # 字体
fill=text_color # 颜色
)
# 添加5条干扰线(随机位置和颜色)
for _ in range(5):
line_color = (
random.randint(0, 255), # R
random.randint(0, 255), # G
random.randint(0, 255) # B
)
draw.line(
[
random.randint(0, 120), # x1
random.randint(0, 40), # y1
random.randint(0, 120), # x2
random.randint(0, 40) # y2
],
fill=line_color,
width=1
)
# 保存验证码到数据库
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 清除10分钟前的旧验证码
cursor.execute("DELETE FROM captcha WHERE created_at < NOW() - INTERVAL 10 MINUTE")
# 插入新验证码
cursor.execute("INSERT INTO captcha (code) VALUES (%s)", (captcha_code,))
conn.commit()
except Error as e:
print(f"Database error: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 返回图片字节流
img_io = BytesIO()
image.save(img_io, 'PNG')
img_io.seek(0)
return img_io, captcha_code
@app.route('/captcha')
def captcha():
img_io, _ = generate_captcha_image()
return send_file(img_io, mimetype='image/png')
def to_pinyin(text):
"""将中文转换为拼音"""
if not text:
return ""
# 获取拼音列表,不带声调
pinyin_list = pinyin(text, style=Style.NORMAL)
# 拼接成字符串
return "_".join([item[0] for item in pinyin_list])
# 注册模板过滤器
@app.template_filter('to_pinyin')
def jinja2_to_pinyin(text):
return to_pinyin(text)
class User(UserMixin):
pass
@login_manager.user_loader
def load_user(user_id):
try:
conn = mysql.connector.connect(**Config.DB_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user_data = cursor.fetchone()
if user_data:
user = User()
user.id = user_data['id']
user.username = user_data['username']
user.is_admin = user_data['is_admin']
return user
return None
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 辅助函数
def get_db_connection():
try:
conn = mysql.connector.connect(**Config.DB_CONFIG)
return conn
except Error as e:
print(f"Database connection error: {e}")
return None
def verify_captcha(user_input):
"""验证用户输入的验证码是否正确(只验证最新的4位验证码)"""
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 只查询最新的验证码(确保是4位的)
cursor.execute("""
SELECT code FROM captcha
WHERE LENGTH(code) = 4 -- 只查询4位验证码
ORDER BY created_at DESC
LIMIT 1
""")
result = cursor.fetchone()
if result and user_input.upper() == result[0]:
# 验证成功后删除已使用的验证码
cursor.execute("DELETE FROM captcha WHERE code = %s", (result[0],))
conn.commit()
return True
return False
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def validate_name(name, max_length=64):
"""
校验名称是否符合规范
规则:
1. 长度1-64个字符
2. 只能包含字母、数字、中文、下划线、短横线
3. 不能以短横线开头或结尾
"""
if not name or len(name) > max_length:
return False
# 允许中文、字母、数字、下划线、短横线
pattern = r'^[a-zA-Z0-9_\-\u4e00-\u9fa5]+$'
if not re.match(pattern, name):
return False
# 不能以短横线开头或结尾
if name.startswith('-') or name.endswith('-'):
return False
return True
def validate_common_name(cn):
"""
校验通用名(Common Name)是否符合规范
规则:
1. 长度1-64个字符
2. 只能包含字母、数字、点号(.)和短横线(-)
3. 不能以点号或短横线开头或结尾
4. 不能连续两个点号或短横线
"""
if not cn or len(cn) > 64:
return False
# 只允许字母、数字、点号和短横线
if not re.match(r'^[a-zA-Z0-9.-]+$', cn):
return False
# 不能以点号或短横线开头或结尾
if cn.startswith('.') or cn.endswith('.') or cn.startswith('-') or cn.endswith('-'):
return False
# 不能连续两个点号或短横线
if '..' in cn or '--' in cn:
return False
return True
def create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid,
created_by):
# 创建拼音格式的目录名
pinyin_name = to_pinyin(ca_name)
ca_dir = os.path.join(Config.CERT_STORE, f"ca_{pinyin_name}")
os.makedirs(ca_dir, exist_ok=True)
# 使用原始common_name作为文件名
key_path = os.path.join(ca_dir, f"{common_name}.key")
cert_path = os.path.join(ca_dir, f"{common_name}.crt")
# 创建OpenSSL配置文件
openssl_cnf = f"""
[ req ]
default_bits = {key_size}
distinguished_name = req_distinguished_name
x509_extensions = v3_ca
prompt = no
[ req_distinguished_name ]
C = {country}
ST = {state}
L = {locality}
O = {organization}
OU = {organizational_unit}
CN = {common_name}
[ v3_ca ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid:always,issuer:always
basicConstraints = CA:TRUE
keyUsage = digitalSignature, keyCertSign, cRLSign
"""
config_path = os.path.join(ca_dir, 'openssl.cnf')
with open(config_path, 'w') as f:
f.write(openssl_cnf.strip())
# 生成CA私钥
subprocess.run([
'openssl', 'genrsa', '-out', key_path, str(key_size)
], check=True)
# 生成CA自签名证书(使用配置文件)
subprocess.run([
'openssl', 'req', '-new', '-x509', '-days', str(days_valid),
'-key', key_path, '-out', cert_path,
'-config', config_path
], check=True)
# 保存到数据库
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO certificate_authorities
(name, common_name, organization, organizational_unit, country, state, locality,
key_size, days_valid, cert_path, key_path, created_by)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (ca_name, common_name, organization, organizational_unit, country, state, locality,
key_size, days_valid, cert_path, key_path, created_by))
conn.commit()
return cursor.lastrowid
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, created_by):
"""创建证书并返回证书ID"""
# 获取CA信息
ca = get_ca_by_id(ca_id)
if not ca:
print(f"CA ID {ca_id} 不存在")
return None
# 创建证书目录
cert_dir = os.path.join(Config.CERT_STORE, f"certs_{common_name}")
os.makedirs(cert_dir, exist_ok=True)
key_path = os.path.join(cert_dir, f"{common_name}.key")
csr_path = os.path.join(cert_dir, f"{common_name}.csr")
cert_path = os.path.join(cert_dir, f"{common_name}.crt")
# 1. 生成私钥
try:
subprocess.run([
'openssl', 'genrsa', '-out', key_path, str(key_size)
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"生成私钥失败: {e.stderr.decode()}")
return None
# 2. 创建CSR配置文件
has_san = bool(san_dns or san_ip)
dns_entries = [dns.strip() for dns in san_dns.split(',') if dns.strip()] if san_dns else []
ip_entries = [ip.strip() for ip in san_ip.split(',') if ip.strip()] if san_ip else []
# 构建CSR配置
csr_config = f"""[req]
default_bits = {key_size}
prompt = no
default_md = sha256
distinguished_name = dn
"""
if has_san:
csr_config += "req_extensions = req_ext\n"
csr_config += f"""
[dn]
CN = {common_name}
O = {organization}
OU = {organizational_unit}
C = {country}
ST = {state}
L = {locality}
"""
if has_san:
csr_config += """
[req_ext]
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alt_names
extendedKeyUsage = serverAuth, clientAuth
[alt_names]"""
# 添加DNS SAN条目
for i, dns in enumerate(dns_entries, 1):
csr_config += f"\nDNS.{i} = {dns}"
# 添加IP SAN条目
for i, ip in enumerate(ip_entries, 1):
csr_config += f"\nIP.{i} = {ip}"
# 写入CSR配置文件
csr_config_path = os.path.join(cert_dir, 'csr_config.cnf')
with open(csr_config_path, 'w') as f:
f.write(csr_config.strip()) # 确保没有多余空行
# 3. 生成CSR
try:
subprocess.run([
'openssl', 'req', '-new', '-key', key_path,
'-out', csr_path, '-config', csr_config_path
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"生成CSR失败: {e.stderr.decode()}")
print("CSR配置文件内容:")
print(csr_config)
return None
# 4. 创建证书扩展配置文件
ext_config = """authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage=digitalSignature, keyEncipherment
extendedKeyUsage=serverAuth, clientAuth
"""
if has_san:
ext_config += "subjectAltName=@alt_names\n\n[alt_names]\n"
# 添加DNS SAN条目
for i, dns in enumerate(dns_entries, 1):
ext_config += f"DNS.{i} = {dns}\n"
# 添加IP SAN条目
for i, ip in enumerate(ip_entries, 1):
ext_config += f"IP.{i} = {ip}\n"
ext_config_path = os.path.join(cert_dir, 'ext.cnf')
with open(ext_config_path, 'w') as f:
f.write(ext_config.strip())
# 5. 使用CA签名证书
try:
cmd = [
'openssl', 'x509', '-req', '-in', csr_path,
'-CA', ca['cert_path'], '-CAkey', ca['key_path'],
'-CAcreateserial', '-out', cert_path,
'-days', str(days_valid), '-sha256'
]
# 只有有扩展内容时才添加-extfile参数
if os.path.getsize(ext_config_path) > 0:
cmd.extend(['-extfile', ext_config_path])
subprocess.run(cmd, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"证书签名失败: {e.stderr.decode()}")
print("扩展配置文件内容:")
print(ext_config)
return None
# 6. 保存到数据库
expires_at = datetime.now() + timedelta(days=days_valid)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO certificates
(common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, cert_path,
key_path, csr_path, ca_id, created_by, expires_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, cert_path,
key_path, csr_path, ca_id, created_by, expires_at
))
conn.commit()
return cursor.lastrowid
except Error as e:
print(f"数据库错误: {e}")
conn.rollback()
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def get_ca_by_id(ca_id):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM certificate_authorities WHERE id = %s", (ca_id,))
return cursor.fetchone()
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def get_certificate_by_id(cert_id):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM certificates WHERE id = %s", (cert_id,))
return cursor.fetchone()
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def revoke_certificate(cert_id, reason):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE certificates
SET status = 'revoked', revoked_at = NOW(), revocation_reason = %s
WHERE id = %s
""", (reason, cert_id))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def renew_certificate(cert_id, days_valid):
cert = get_certificate_by_id(cert_id)
if not cert:
return False
ca = get_ca_by_id(cert['ca_id'])
if not ca:
return False
# 使用现有CSR重新签名
new_cert_path = os.path.join(os.path.dirname(cert['cert_path']), f"renewed_{cert['common_name']}.crt")
subprocess.run([
'openssl', 'x509', '-req', '-in', cert['csr_path'], '-CA', ca['cert_path'],
'-CAkey', ca['key_path'], '-CAcreateserial', '-out', new_cert_path,
'-days', str(days_valid), '-sha256'
], check=True)
# 更新数据库
new_expires_at = datetime.now() + timedelta(days=days_valid)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE certificates
SET cert_path = %s, days_valid = %s, expires_at = %s, status = 'active',
revoked_at = NULL, revocation_reason = NULL
WHERE id = %s
""", (new_cert_path, days_valid, new_expires_at, cert_id))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def generate_crl(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
return False
ca_dir = os.path.dirname(ca['cert_path'])
crl_path = os.path.join(ca_dir, f"crl_{ca['name']}.pem")
# 创建完整的OpenSSL配置文件
openssl_cnf = f"""
[ ca ]
default_ca = CA_default
[ CA_default ]
database = {os.path.join(ca_dir, 'index.txt')}
certificate = {ca['cert_path']}
private_key = {ca['key_path']}
crl = {crl_path}
RANDFILE = {os.path.join(ca_dir, '.rand')}
default_days = 365
default_crl_days = 30
default_md = sha256
preserve = no
policy = policy_anything
[ policy_anything ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName = optional
commonName = optional
emailAddress = optional
"""
cnf_path = os.path.join(ca_dir, 'openssl.cnf')
with open(cnf_path, 'w') as f:
f.write(openssl_cnf)
# 确保index.txt存在
index_file = os.path.join(ca_dir, 'index.txt')
if not os.path.exists(index_file):
open(index_file, 'a').close()
# 生成CRL
try:
subprocess.run([
'openssl', 'ca', '-gencrl',
'-config', cnf_path,
'-out', crl_path
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
# 更新数据库
next_update = datetime.now() + timedelta(days=30)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO certificate_revocation_list
(ca_id, crl_path, next_update)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE
crl_path = VALUES(crl_path),
last_updated = NOW(),
next_update = VALUES(next_update)
""", (ca_id, crl_path, next_update))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode() if e.stderr else "Unknown OpenSSL error"
print(f"OpenSSL error: {error_msg}")
flash(f'CRL生成失败: {error_msg}', 'danger')
except Exception as e:
print(f"Error generating CRL: {str(e)}")
flash(f'CRL生成异常: {str(e)}', 'danger')
return False
def export_pkcs12(cert_id, password):
cert = get_certificate_by_id(cert_id)
if not cert:
return None
ca = get_ca_by_id(cert['ca_id'])
if not ca:
return None
pkcs12_path = os.path.join(os.path.dirname(cert['cert_path']), f"{cert['common_name']}.p12")
subprocess.run([
'openssl', 'pkcs12', '-export', '-out', pkcs12_path,
'-inkey', cert['key_path'], '-in', cert['cert_path'],
'-certfile', ca['cert_path'], '-passout', f'pass:{password}'
], check=True)
return pkcs12_path
@app.route('/register', methods=['GET', 'POST'])
def register():
# 检查用户是否已登录
if current_user.is_authenticated:
return redirect(url_for('index'))
# 检查注册功能是否开放
if not current_app.config['REGISTRATION_OPEN']:
flash('系统暂未开放注册', 'warning')
return redirect(url_for('login'))
if request.method == 'POST':
username = request.form['username'].strip()
password = request.form['password']
confirm_password = request.form['confirm_password']
email = request.form.get('email', '').strip()
captcha = request.form['captcha']
# 验证验证码
if not verify_captcha(captcha):
flash('验证码错误', 'danger')
return redirect(url_for('register'))
# 验证密码匹配
if password != confirm_password:
flash('两次输入的密码不匹配', 'danger')
return redirect(url_for('register'))
# 密码策略检查
password_errors = []
policy = current_app.config['PASSWORD_POLICY']
if len(password) < policy['min_length']:
password_errors.append(f"密码长度至少{policy['min_length']}位")
if policy['require_uppercase'] and not re.search(r'[A-Z]', password):
password_errors.append("必须包含大写字母")
if policy['require_lowercase'] and not re.search(r'[a-z]', password):
password_errors.append("必须包含小写字母")
if policy['require_digits'] and not re.search(r'[0-9]', password):
password_errors.append("必须包含数字")
if policy['require_special_chars'] and not re.search(r'[^A-Za-z0-9]', password):
password_errors.append("必须包含特殊字符")
if password_errors:
flash('密码不符合要求: ' + ', '.join(password_errors), 'danger')
return redirect(url_for('register'))
# 验证用户名是否已存在
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 检查用户名和邮箱是否已存在
cursor.execute("SELECT id FROM users WHERE username = %s OR email = %s",
(username, email))
if cursor.fetchone():
flash('用户名或邮箱已被注册', 'danger')
return redirect(url_for('register'))
# 根据配置决定是否立即激活账户
is_active = not current_app.config['EMAIL_VERIFICATION_REQUIRED']
verification_token = None
# 创建新用户
password_hash = generate_password_hash(password)
cursor.execute("""
INSERT INTO users (username, password_hash, email, is_admin, is_active, verification_token)
VALUES (%s, %s, %s, %s, %s, %s)
""", (username, password_hash, email, False, is_active, verification_token))
if current_app.config['EMAIL_VERIFICATION_REQUIRED']:
# 生成验证令牌
from itsdangerous import URLSafeTimedSerializer
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
token = serializer.dumps({'username': username, 'email': email}, salt='email-verify')
# 更新用户验证令牌
cursor.execute("""
UPDATE users SET verification_token = %s WHERE username = %s
""", (token, username))
# 发送验证邮件
try:
from flask_mail import Message
msg = Message('请验证您的邮箱 - 自签证书管理系统',
sender=current_app.config['MAIL_DEFAULT_SENDER'],
recipients=[email])
confirm_url = generate_full_url('verify_email', token=token)
# 使用HTML格式的邮件内容
msg.html = f"""
感谢注册!
请点击以下链接完成邮箱验证:
{confirm_url}
(链接24小时内有效)
如果链接无法点击,请复制到浏览器地址栏访问
"""
mail = Mail(current_app)
mail.send(msg)
flash('验证邮件已发送至您的邮箱,请查收并完成验证', 'info')
except Exception as e:
current_app.logger.error(f'邮件发送失败: {str(e)}')
conn.rollback()
flash('发送验证邮件失败,请稍后再试或联系管理员', 'danger')
return redirect(url_for('register'))
conn.commit()
if is_active:
flash('注册成功,请登录', 'success')
return redirect(url_for('login'))
else:
return redirect(url_for('register'))
except Error as e:
conn.rollback()
current_app.logger.error(f'数据库错误: {str(e)}')
flash('注册失败,请稍后再试', 'danger')
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 生成新验证码
captcha_url = url_for('captcha') # 使用图片验证码
return render_template('register.html',
captcha_url=captcha_url, # 前端改为显示图片验证码
registration_open=current_app.config['REGISTRATION_OPEN'],
email_required=current_app.config['EMAIL_VERIFICATION_REQUIRED'])
@app.route('/verify-email/')
def verify_email(token):
if current_user.is_authenticated:
return redirect(generate_full_url('index'))
try:
from itsdangerous import URLSafeTimedSerializer
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
data = serializer.loads(token, salt='email-verify', max_age=86400) # 24小时有效期
username = data['username']
email = data['email']
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 验证令牌是否匹配
cursor.execute("""
SELECT id FROM users
WHERE username = %s AND email = %s AND verification_token = %s
""", (username, email, token))
user = cursor.fetchone()
if not user:
flash('无效的验证链接', 'danger')
return redirect(generate_full_url('login'))
# 激活账户
cursor.execute("""
UPDATE users
SET is_active = TRUE, verification_token = NULL
WHERE id = %s
""", (user['id'],))
conn.commit()
flash('邮箱验证成功,您现在可以登录了', 'success')
return redirect(generate_full_url('login'))
except Error as e:
conn.rollback()
current_app.logger.error(f'数据库错误: {str(e)}')
flash('验证过程中出现错误', 'danger')
finally:
if conn.is_connected():
cursor.close()
conn.close()
except Exception as e:
current_app.logger.error(f'令牌验证失败: {str(e)}')
flash('验证链接无效或已过期', 'danger')
return redirect(generate_full_url('login'))
# 路由定义
@app.route('/')
@login_required
def index():
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取CA数量
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as count FROM certificate_authorities")
ca_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
cursor.execute("""
SELECT * FROM certificate_authorities
ORDER BY created_at DESC LIMIT 5
""")
else:
cursor.execute("""
SELECT COUNT(*) as count FROM certificate_authorities
WHERE created_by = %s
""", (current_user.id,))
ca_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
cursor.execute("""
SELECT * FROM certificate_authorities
WHERE created_by = %s
ORDER BY created_at DESC LIMIT 5
""", (current_user.id,))
recent_cas = cursor.fetchall()
# 获取证书数量
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as count FROM certificates")
cert_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
cursor.execute("""
SELECT * FROM certificates
ORDER BY created_at DESC LIMIT 5
""")
else:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE created_by = %s
""", (current_user.id,))
cert_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
cursor.execute("""
SELECT * FROM certificates
WHERE created_by = %s
ORDER BY created_at DESC LIMIT 5
""", (current_user.id,))
recent_certs = cursor.fetchall()
# 获取即将过期证书数量(30天内)
if current_user.is_admin:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY)
AND status = 'active'
""")
else:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY)
AND status = 'active' AND created_by = %s
""", (current_user.id,))
expiring_soon_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
# 获取活跃证书数量
if current_user.is_admin:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE status = 'active'
""")
else:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE status = 'active' AND created_by = %s
""", (current_user.id,))
active_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
return render_template('index.html',
ca_count=ca_count,
cert_count=cert_count,
expiring_soon_count=expiring_soon_count,
active_count=active_count,
recent_cas=recent_cas,
recent_certs=recent_certs)
except Error as e:
print(f"Database error: {e}")
flash('获取系统统计信息失败', 'danger')
return render_template('index.html',
ca_count=0,
cert_count=0,
expiring_soon_count=0,
active_count=0,
recent_cas=[],
recent_certs=[])
finally:
if conn.is_connected():
cursor.close()
conn.close()
return render_template('index.html',
ca_count=0,
cert_count=0,
expiring_soon_count=0,
active_count=0,
recent_cas=[],
recent_certs=[])
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
captcha = request.form['captcha']
if not verify_captcha(captcha):
flash('验证码错误', 'danger')
return redirect(url_for('login'))
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT * FROM users
WHERE username = %s AND is_active = TRUE
""", (username,))
user_data = cursor.fetchone()
if user_data and check_password_hash(user_data['password_hash'], password):
user = User()
user.id = user_data['id']
user.username = user_data['username']
user.is_admin = user_data['is_admin']
login_user(user)
flash('登录成功', 'success')
return redirect(url_for('index'))
else:
flash('用户名或密码错误,或账户未激活', 'danger')
except Error as e:
print(f"Database error: {e}")
flash('登录失败,请稍后再试', 'danger')
finally:
if conn.is_connected():
cursor.close()
conn.close()
captcha_url = url_for('captcha')
return render_template('login.html', captcha_url=captcha_url)
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('您已成功登出', 'success')
return redirect(url_for('login'))
from math import ceil
# 每页显示的数量
PER_PAGE = 10
@app.route('/cas')
@login_required
def ca_list():
page = request.args.get('page', 1, type=int)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取总数
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities")
else:
cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities WHERE created_by = %s",
(current_user.id,))
total = cursor.fetchone()['total']
total_pages = ceil(total / PER_PAGE)
# 获取分页数据
offset = (page - 1) * PER_PAGE
if current_user.is_admin:
cursor.execute("SELECT * FROM certificate_authorities ORDER BY created_at DESC LIMIT %s OFFSET %s",
(PER_PAGE, offset))
else:
cursor.execute("""
SELECT * FROM certificate_authorities
WHERE created_by = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (current_user.id, PER_PAGE, offset))
cas = cursor.fetchall()
return render_template('ca_list.html',
cas=cas,
page=page,
total_pages=total_pages,
total=total,
get_username=get_username)
except Error as e:
print(f"Database error: {e}")
flash('获取CA列表失败', 'danger')
return redirect(url_for('index'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('index'))
@app.route('/cas/batch_delete', methods=['POST'])
@login_required
def batch_delete_cas():
if not request.is_json:
return jsonify({'success': False, 'message': 'Invalid request'}), 400
data = request.get_json()
ca_ids = data.get('ids', [])
if not ca_ids:
return jsonify({'success': False, 'message': 'No CAs selected'}), 400
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
try:
cursor = conn.cursor()
# 检查权限并删除
for ca_id in ca_ids:
# 验证CA存在且用户有权限
cursor.execute("SELECT created_by FROM certificate_authorities WHERE id = %s", (ca_id,))
ca = cursor.fetchone()
if not ca:
continue
if not current_user.is_admin and ca['created_by'] != current_user.id:
continue
# 检查是否有关联证书
cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,))
result = cursor.fetchone()
if result['count'] > 0:
continue
# 获取CA信息以便删除文件
cursor.execute("SELECT cert_path, key_path FROM certificate_authorities WHERE id = %s", (ca_id,))
ca_info = cursor.fetchone()
if ca_info:
# 删除文件
try:
if os.path.exists(ca_info['cert_path']):
os.remove(ca_info['cert_path'])
if os.path.exists(ca_info['key_path']):
os.remove(ca_info['key_path'])
# 删除CA目录
ca_dir = os.path.dirname(ca_info['cert_path'])
if os.path.exists(ca_dir):
shutil.rmtree(ca_dir) # 递归删除目录
except OSError as e:
print(f"文件删除错误: {e}")
continue
# 删除数据库记录
cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,))
cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,))
conn.commit()
return jsonify({'success': True, 'message': '批量删除成功'})
except Error as e:
conn.rollback()
print(f"Database error: {e}")
return jsonify({'success': False, 'message': '数据库操作失败'}), 500
finally:
if conn.is_connected():
cursor.close()
conn.close()
@app.route('/certificates')
@login_required
def certificate_list():
page = request.args.get('page', 1, type=int)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取总数
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as total FROM certificates")
else:
cursor.execute("SELECT COUNT(*) as total FROM certificates WHERE created_by = %s", (current_user.id,))
total = cursor.fetchone()['total']
total_pages = ceil(total / PER_PAGE)
# 获取分页数据
offset = (page - 1) * PER_PAGE
if current_user.is_admin:
cursor.execute("""
SELECT c.*, ca.name as ca_name
FROM certificates c
JOIN certificate_authorities ca ON c.ca_id = ca.id
ORDER BY c.created_at DESC
LIMIT %s OFFSET %s
""", (PER_PAGE, offset))
else:
cursor.execute("""
SELECT c.*, ca.name as ca_name
FROM certificates c
JOIN certificate_authorities ca ON c.ca_id = ca.id
WHERE c.created_by = %s
ORDER BY c.created_at DESC
LIMIT %s OFFSET %s
""", (current_user.id, PER_PAGE, offset))
certificates = cursor.fetchall()
return render_template('certificate_list.html',
certificates=certificates,
page=page,
total_pages=total_pages,
total=total,
get_username=get_username)
except Error as e:
print(f"Database error: {e}")
flash('获取证书列表失败', 'danger')
return redirect(url_for('index'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('index'))
@app.route('/certificates/batch_delete', methods=['POST'])
@login_required
def batch_delete_certificates():
if not request.is_json:
return jsonify({'success': False, 'message': 'Invalid request'}), 400
data = request.get_json()
cert_ids = data.get('ids', [])
if not cert_ids:
return jsonify({'success': False, 'message': 'No certificates selected'}), 400
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
try:
cursor = conn.cursor(dictionary=True)
# 检查权限并删除
for cert_id in cert_ids:
# 验证证书存在且用户有权限
cursor.execute("SELECT created_by, cert_path, key_path, csr_path FROM certificates WHERE id = %s",
(cert_id,))
cert = cursor.fetchone()
if not cert:
continue
if not current_user.is_admin and cert['created_by'] != current_user.id:
continue
# 删除文件
try:
files_to_delete = [
cert['cert_path'],
cert['key_path'],
cert['csr_path']
]
# 删除所有指定文件
for file_path in files_to_delete:
if file_path and os.path.exists(file_path):
os.remove(file_path)
# 删除证书目录
cert_dir = os.path.dirname(cert['cert_path'])
if os.path.exists(cert_dir):
shutil.rmtree(cert_dir) # 递归删除目录
except OSError as e:
print(f"文件删除错误: {e}")
continue
# 删除数据库记录
cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,))
conn.commit()
return jsonify({'success': True, 'message': '批量删除成功'})
except Error as e:
conn.rollback()
print(f"Database error: {e}")
return jsonify({'success': False, 'message': '数据库操作失败'}), 500
finally:
if conn.is_connected():
cursor.close()
conn.close()
@app.route('/cas/create', methods=['GET', 'POST'])
@login_required
def create_ca_view():
if request.method == 'POST':
ca_name = request.form['ca_name'].strip()
common_name = request.form['common_name'].strip()
organization = request.form['organization'].strip()
organizational_unit = request.form['organizational_unit'].strip()
country = request.form['country'].strip()
state = request.form['state'].strip()
locality = request.form['locality'].strip()
key_size = int(request.form['key_size'])
days_valid = int(request.form['days_valid'])
# 名称校验
if not validate_name(ca_name):
flash('CA名称无效:只能包含中文、字母、数字、下划线和短横线,且不能以短横线开头或结尾', 'danger')
return render_template('create_ca.html')
if not validate_common_name(common_name):
flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger')
return render_template('create_ca.html')
# 检查CA名称是否已存在
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM certificate_authorities WHERE name = %s", (ca_name,))
if cursor.fetchone():
flash('CA名称已存在,请使用其他名称', 'danger')
return render_template('create_ca.html')
except Error as e:
print(f"Database error: {e}")
flash('检查CA名称失败', 'danger')
return render_template('create_ca.html')
finally:
if conn.is_connected():
cursor.close()
conn.close()
ca_id = create_ca(ca_name, common_name, organization, organizational_unit,
country, state, locality, key_size, days_valid, current_user.id)
if ca_id:
flash('CA创建成功', 'success')
return redirect(url_for('ca_list'))
else:
flash('CA创建失败', 'danger')
return render_template('create_ca.html')
from datetime import timedelta # 确保顶部已导入
@app.route('/cas/')
@login_required
def ca_detail(ca_id):
page = request.args.get('page', 1, type=int)
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权访问此CA', 'danger')
return redirect(url_for('ca_list'))
# 获取该CA颁发的证书(分页)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取证书总数
cursor.execute("""
SELECT COUNT(*) as total FROM certificates
WHERE ca_id = %s
""", (ca_id,))
total = cursor.fetchone()['total']
total_pages = ceil(total / PER_PAGE)
# 获取分页数据
offset = (page - 1) * PER_PAGE
cursor.execute("""
SELECT * FROM certificates
WHERE ca_id = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (ca_id, PER_PAGE, offset))
certificates = cursor.fetchall()
# 获取CRL信息
cursor.execute("""
SELECT * FROM certificate_revocation_list
WHERE ca_id = %s
""", (ca_id,))
crl = cursor.fetchone()
return render_template(
'ca_detail.html',
ca=ca,
certificates=certificates,
crl=crl,
page=page,
total_pages=total_pages,
total=total,
timedelta=timedelta,
get_username=get_username
)
except Error as e:
print(f"Database error: {e}")
flash('获取CA详情失败', 'danger')
return redirect(url_for('ca_list'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('ca_list'))
def get_username(user_id):
"""根据用户ID获取用户名"""
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT username FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
return user['username'] if user else str(user_id)
except Error as e:
print(f"Database error: {e}")
return str(user_id)
finally:
if conn.is_connected():
cursor.close()
conn.close()
return str(user_id)
@app.route('/cas//generate_crl')
@login_required
def generate_crl_view(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权操作此CA', 'danger')
return redirect(url_for('ca_list'))
if generate_crl(ca_id):
flash('CRL生成成功', 'success')
else:
flash('CRL生成失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/cas//download_crl')
@login_required
def download_crl(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权操作此CA', 'danger')
return redirect(url_for('ca_list'))
# 获取CRL路径
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT crl_path FROM certificate_revocation_list
WHERE ca_id = %s
""", (ca_id,))
crl = cursor.fetchone()
if crl and os.path.exists(crl['crl_path']):
return send_from_directory(
os.path.dirname(crl['crl_path']),
os.path.basename(crl['crl_path']),
as_attachment=True
)
else:
flash('CRL文件不存在', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
except Error as e:
print(f"Database error: {e}")
flash('获取CRL失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/certificates/create', methods=['GET', 'POST'])
@login_required
def create_certificate_view():
if request.method == 'POST':
common_name = request.form['common_name'].strip()
san_dns = request.form.get('san_dns', '').strip()
san_ip = request.form.get('san_ip', '').strip()
organization = request.form['organization'].strip()
organizational_unit = request.form['organizational_unit'].strip()
country = request.form['country'].strip()
state = request.form['state'].strip()
locality = request.form['locality'].strip()
key_size = int(request.form['key_size'])
days_valid = int(request.form['days_valid'])
ca_id = int(request.form['ca_id'])
# 通用名校验
if not validate_common_name(common_name):
flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger')
return redirect(url_for('create_certificate_view'))
# SAN DNS校验
if san_dns:
for dns in san_dns.split(','):
dns = dns.strip()
if not validate_common_name(dns):
flash(f'DNS SAN条目无效: {dns},只能包含字母、数字、点号和短横线', 'danger')
return redirect(url_for('create_certificate_view'))
# SAN IP校验
if san_ip:
for ip in san_ip.split(','):
ip = ip.strip()
if not re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip):
flash(f'IP SAN条目无效: {ip},请输入有效的IPv4地址', 'danger')
return redirect(url_for('create_certificate_view'))
# 检查证书是否已存在
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM certificates WHERE common_name = %s AND ca_id = %s",
(common_name, ca_id))
if cursor.fetchone():
flash('该CA下已存在相同通用名的证书', 'danger')
return redirect(url_for('create_certificate_view'))
except Error as e:
print(f"Database error: {e}")
flash('检查证书名称失败', 'danger')
return redirect(url_for('create_certificate_view'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
cert_id = create_certificate(ca_id, common_name, san_dns, san_ip, organization,
organizational_unit, country, state, locality,
key_size, days_valid, current_user.id)
if cert_id:
flash('证书创建成功', 'success')
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书创建失败', 'danger')
# 获取可用的CA
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
if current_user.is_admin:
cursor.execute("SELECT id, name FROM certificate_authorities")
else:
cursor.execute("""
SELECT id, name FROM certificate_authorities
WHERE created_by = %s
""", (current_user.id,))
cas = cursor.fetchall()
return render_template('create_certificate.html', cas=cas)
except Error as e:
print(f"Database error: {e}")
flash('获取CA列表失败', 'danger')
return redirect(url_for('certificate_list'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('certificate_list'))
@app.route('/certificates/')
@login_required
def certificate_detail(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权访问此证书', 'danger')
return redirect(url_for('certificate_list'))
# 获取CA信息
ca = get_ca_by_id(cert['ca_id'])
return render_template('certificate_detail.html', cert=cert, ca=ca, get_username=get_username)
@app.route('/certificates//revoke', methods=['GET', 'POST'])
@login_required
def revoke_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if cert['status'] == 'revoked':
flash('证书已被吊销', 'warning')
return redirect(url_for('certificate_detail', cert_id=cert_id))
if request.method == 'POST':
reason = request.form['reason']
if revoke_certificate(cert_id, reason):
flash('证书吊销成功', 'success')
# 更新CRL
generate_crl(cert['ca_id'])
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书吊销失败', 'danger')
return render_template('revoke_certificate.html', cert=cert)
@app.route('/certificates//renew', methods=['GET', 'POST'])
@login_required
def renew_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if request.method == 'POST':
days_valid = int(request.form['days_valid'])
if renew_certificate(cert_id, days_valid):
flash('证书续期成功', 'success')
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书续期失败', 'danger')
return render_template('renew_certificate.html', cert=cert)
@app.route('/cas//export')
@login_required
def export_ca_view(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA not found', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('No permission', 'danger')
return redirect(url_for('ca_list'))
# 创建临时zip文件
memory_file = BytesIO()
try:
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
# 添加CA证书文件
with open(ca['cert_path'], 'rb') as f: # 使用二进制模式读取
cert_content = f.read()
zf.writestr(f"{ca['common_name']}_ca.crt", cert_content)
# 添加CA私钥文件(可选,只有管理员可以导出私钥)
if current_user.is_admin:
with open(ca['key_path'], 'rb') as f: # 使用二进制模式读取
key_content = f.read()
zf.writestr(f"{ca['common_name']}_ca.key", key_content)
memory_file.seek(0)
# 确保文件名只包含ASCII字符
safe_filename = f"{ca['name']}_ca_bundle.zip".encode('ascii', 'ignore').decode('ascii')
return Response(
memory_file.getvalue(),
mimetype="application/zip",
headers={
"Content-Disposition": f"attachment; filename={safe_filename}"
}
)
except Exception as e:
flash(f'Export failed: {str(e)}', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/certificates//export', methods=['GET', 'POST'])
@login_required
def export_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if request.method == 'POST':
format_type = request.form['format']
password = request.form.get('password', '')
if format_type == 'pkcs12':
if not password:
flash('PKCS#12格式需要密码', 'danger')
return redirect(url_for('export_certificate_view', cert_id=cert_id))
pkcs12_path = export_pkcs12(cert_id, password)
if pkcs12_path:
return send_from_directory(
os.path.dirname(pkcs12_path),
os.path.basename(pkcs12_path),
as_attachment=True
)
else:
flash('导出PKCS#12失败', 'danger')
elif format_type == 'pem':
# 合并证书和私钥为PEM
pem_content = ""
with open(cert['cert_path'], 'r') as f:
pem_content += f.read()
with open(cert['key_path'], 'r') as f:
pem_content += f.read()
return Response(
pem_content,
mimetype="application/x-pem-file",
headers={
"Content-Disposition": f"attachment; filename={cert['common_name']}.pem"
}
)
elif format_type == 'pem_separate':
# 创建内存中的zip文件(.pem + .key)
return generate_separate_files_zip(
cert,
cert_ext=".pem",
zip_suffix="_pem_key"
)
elif format_type == 'crt_separate':
# 创建内存中的zip文件(.crt + .key)
return generate_separate_files_zip(
cert,
cert_ext=".crt",
zip_suffix="_crt_key"
)
else:
flash('不支持的导出格式', 'danger')
return render_template('export_certificate.html', cert=cert)
# 在app.py中添加以下路由
@app.route('/cas//delete', methods=['GET', 'POST'])
@login_required
def delete_ca(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权删除此CA', 'danger')
return redirect(url_for('ca_list'))
# 检查是否有关联的证书
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,))
result = cursor.fetchone()
if result['count'] > 0:
flash('无法删除CA,因为存在关联的证书', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
except Error as e:
print(f"Database error: {e}")
flash('检查关联证书失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
if request.method == 'POST':
# 删除文件
try:
# 删除CA证书和私钥文件
if os.path.exists(ca['cert_path']):
os.remove(ca['cert_path'])
if os.path.exists(ca['key_path']):
os.remove(ca['key_path'])
# 删除CA目录及其所有内容
ca_dir = os.path.dirname(ca['cert_path'])
if os.path.exists(ca_dir):
# 删除目录中的所有文件和子目录
for filename in os.listdir(ca_dir):
file_path = os.path.join(ca_dir, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f'删除文件/目录失败 {file_path}. 原因: {e}')
flash(f'删除文件/目录失败: {str(e)}', 'warning')
# 现在删除空目录
try:
os.rmdir(ca_dir)
except OSError as e:
print(f"删除目录失败: {e}")
flash(f'删除目录失败: {str(e)}', 'warning')
except OSError as e:
print(f"文件删除错误: {e}")
flash(f'删除文件时出错: {str(e)}', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
# 删除数据库记录
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 先删除CRL记录
cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,))
# 再删除CA记录
cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,))
conn.commit()
flash('CA删除成功', 'success')
return redirect(url_for('ca_list'))
except Error as e:
print(f"Database error: {e}")
conn.rollback()
flash('删除CA记录失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return render_template('confirm_delete_ca.html', ca=ca)
@app.route('/certificates//delete', methods=['GET', 'POST'])
@login_required
def delete_certificate(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权删除此证书', 'danger')
return redirect(url_for('certificate_list'))
if request.method == 'POST':
# 删除文件
try:
# 确保所有文件路径都存在
files_to_delete = [
cert['cert_path'],
cert['key_path'],
cert['csr_path']
]
# 删除所有指定文件
for file_path in files_to_delete:
if file_path and os.path.exists(file_path):
os.remove(file_path)
# 删除证书目录及其所有内容
cert_dir = os.path.dirname(cert['cert_path'])
if os.path.exists(cert_dir):
# 删除目录中的所有文件
for filename in os.listdir(cert_dir):
file_path = os.path.join(cert_dir, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f'Failed to delete {file_path}. Reason: {e}')
# 现在删除空目录
os.rmdir(cert_dir)
except OSError as e:
print(f"文件删除错误: {e}")
flash(f'删除文件时出错: {str(e)}', 'danger')
return redirect(url_for('certificate_detail', cert_id=cert_id))
# 删除数据库记录
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,))
conn.commit()
flash('证书删除成功', 'success')
return redirect(url_for('certificate_list'))
except Error as e:
print(f"Database error: {e}")
flash('删除证书记录失败', 'danger')
return redirect(url_for('certificate_detail', cert_id=cert_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return render_template('confirm_delete_certificate.html', cert=cert)
def generate_separate_files_zip(cert, cert_ext, zip_suffix):
"""生成包含分开文件的ZIP包通用函数"""
memory_file = BytesIO()
try:
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
# 添加证书文件
with open(cert['cert_path'], 'r') as f:
cert_content = f.read()
zf.writestr(f"{cert['common_name']}{cert_ext}", cert_content)
# 添加私钥文件
with open(cert['key_path'], 'r') as f:
key_content = f.read()
zf.writestr(f"{cert['common_name']}.key", key_content)
memory_file.seek(0)
return Response(
memory_file.getvalue(),
mimetype="application/zip",
headers={
"Content-Disposition": f"attachment; filename={cert['common_name']}{zip_suffix}.zip"
}
)
except Exception as e:
flash(f'创建ZIP文件失败: {str(e)}', 'danger')
return redirect(url_for('export_certificate_view', cert_id=cert['id']))
@app.route('/download/')
@login_required
def download_file(filename):
# 安全检查:确保文件路径在证书存储目录内
file_path = os.path.join(Config.CERT_STORE, filename)
if not os.path.exists(file_path):
flash('文件不存在', 'danger')
return redirect(url_for('index'))
# 检查权限
# 这里需要根据实际业务逻辑检查用户是否有权下载该文件
# 简单示例:只允许下载自己的证书文件
return send_from_directory(
os.path.dirname(file_path),
os.path.basename(file_path),
as_attachment=True
)
if __name__ == '__main__':
app.run(debug=Config.DEBUG, host=Config.APP_HOST, port=Config.APP_PORT)