certmanager/app.py
2025-06-16 12:53:16 +08:00

2024 lines
70 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app.py
import os
import subprocess
from datetime import datetime, timedelta
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, Response, current_app
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
import mysql.connector
from mysql.connector import Error
import random
import string
from io import BytesIO
import zipfile
import shutil
import re
from pypinyin import pinyin, Style
from flask_mail import Mail
from PIL import Image, ImageDraw, ImageFont
from flask_migrate import Migrate
from dotenv import load_dotenv
from pathlib import Path
from urllib.parse import urljoin
# 先加载环境变量必须在创建app之前
load_dotenv(Path(__file__).parent / '.env', override=True)
def generate_full_url(endpoint, **values):
"""生成完整的URL考虑自定义域名和协议"""
base_url = f"{current_app.config['APP_PROTOCOL']}://{current_app.config['APP_DOMAIN']}"
path = url_for(endpoint, **values)
return urljoin(base_url, path)
# 从配置文件中导入配置
from config import Config
app = Flask(__name__, static_folder='static')
app.config.from_object(Config)
# 初始化邮件扩展
mail = Mail(app)
@app.context_processor
def inject_now():
return {'now': datetime.now()}
# 确保证书存储目录存在
os.makedirs(Config.CERT_STORE, exist_ok=True)
# 初始化数据库
from database import initialize_database
initialize_database()
# 初始化数据库迁移
#migrate = Migrate(app, db)
# Flask-Login 配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
from io import BytesIO
from flask import send_file
import random
import string
from PIL import Image, ImageDraw, ImageFont
def generate_captcha_image():
# 生成4位随机验证码字母和数字
captcha_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
# 创建图片120x40像素白色背景
image = Image.new('RGB', (120, 40), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
# 尝试加载字体,失败则使用默认字体
try:
# 指定字体路径(假设字体文件是 static/arial.ttf
font_path = os.path.join(current_app.static_folder, "arial.ttf")
font = ImageFont.truetype(font_path, 24)
except Exception as e:
print(f"加载字体失败: {e}")
font = ImageFont.load_default() # 回退到默认字体
# 绘制验证码文本(每个字符随机颜色)
for i, char in enumerate(captcha_code):
text_color = (
random.randint(0, 150), # R
random.randint(0, 150), # G
random.randint(0, 150) # B
)
draw.text(
(10 + i * 25, 5), # 位置
char, # 文本
font=font, # 字体
fill=text_color # 颜色
)
# 添加5条干扰线随机位置和颜色
for _ in range(5):
line_color = (
random.randint(0, 255), # R
random.randint(0, 255), # G
random.randint(0, 255) # B
)
draw.line(
[
random.randint(0, 120), # x1
random.randint(0, 40), # y1
random.randint(0, 120), # x2
random.randint(0, 40) # y2
],
fill=line_color,
width=1
)
# 保存验证码到数据库
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 清除10分钟前的旧验证码
cursor.execute("DELETE FROM captcha WHERE created_at < NOW() - INTERVAL 10 MINUTE")
# 插入新验证码
cursor.execute("INSERT INTO captcha (code) VALUES (%s)", (captcha_code,))
conn.commit()
except Error as e:
print(f"Database error: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 返回图片字节流
img_io = BytesIO()
image.save(img_io, 'PNG')
img_io.seek(0)
return img_io, captcha_code
@app.route('/captcha')
def captcha():
img_io, _ = generate_captcha_image()
return send_file(img_io, mimetype='image/png')
def to_pinyin(text):
"""将中文转换为拼音"""
if not text:
return ""
# 获取拼音列表,不带声调
pinyin_list = pinyin(text, style=Style.NORMAL)
# 拼接成字符串
return "_".join([item[0] for item in pinyin_list])
# 注册模板过滤器
@app.template_filter('to_pinyin')
def jinja2_to_pinyin(text):
return to_pinyin(text)
class User(UserMixin):
pass
@login_manager.user_loader
def load_user(user_id):
try:
conn = mysql.connector.connect(**Config.DB_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user_data = cursor.fetchone()
if user_data:
user = User()
user.id = user_data['id']
user.username = user_data['username']
user.is_admin = user_data['is_admin']
return user
return None
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 辅助函数
def get_db_connection():
try:
conn = mysql.connector.connect(**Config.DB_CONFIG)
return conn
except Error as e:
print(f"Database connection error: {e}")
return None
def verify_captcha(user_input):
"""验证用户输入的验证码是否正确只验证最新的4位验证码"""
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 只查询最新的验证码确保是4位的
cursor.execute("""
SELECT code FROM captcha
WHERE LENGTH(code) = 4 -- 只查询4位验证码
ORDER BY created_at DESC
LIMIT 1
""")
result = cursor.fetchone()
if result and user_input.upper() == result[0]:
# 验证成功后删除已使用的验证码
cursor.execute("DELETE FROM captcha WHERE code = %s", (result[0],))
conn.commit()
return True
return False
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def validate_name(name, max_length=64):
"""
校验名称是否符合规范
规则:
1. 长度1-64个字符
2. 只能包含字母、数字、中文、下划线、短横线
3. 不能以短横线开头或结尾
"""
if not name or len(name) > max_length:
return False
# 允许中文、字母、数字、下划线、短横线
pattern = r'^[a-zA-Z0-9_\-\u4e00-\u9fa5]+$'
if not re.match(pattern, name):
return False
# 不能以短横线开头或结尾
if name.startswith('-') or name.endswith('-'):
return False
return True
def validate_common_name(cn):
"""
校验通用名(Common Name)是否符合规范
规则:
1. 长度1-64个字符
2. 只能包含字母、数字、点号(.)和短横线(-)
3. 不能以点号或短横线开头或结尾
4. 不能连续两个点号或短横线
"""
if not cn or len(cn) > 64:
return False
# 只允许字母、数字、点号和短横线
if not re.match(r'^[a-zA-Z0-9.-]+$', cn):
return False
# 不能以点号或短横线开头或结尾
if cn.startswith('.') or cn.endswith('.') or cn.startswith('-') or cn.endswith('-'):
return False
# 不能连续两个点号或短横线
if '..' in cn or '--' in cn:
return False
return True
def create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid,
created_by):
# 创建拼音格式的目录名
pinyin_name = to_pinyin(ca_name)
ca_dir = os.path.join(Config.CERT_STORE, f"ca_{pinyin_name}")
os.makedirs(ca_dir, exist_ok=True)
# 使用原始common_name作为文件名
key_path = os.path.join(ca_dir, f"{common_name}.key")
cert_path = os.path.join(ca_dir, f"{common_name}.crt")
# 创建OpenSSL配置文件
openssl_cnf = f"""
[ req ]
default_bits = {key_size}
distinguished_name = req_distinguished_name
x509_extensions = v3_ca
prompt = no
[ req_distinguished_name ]
C = {country}
ST = {state}
L = {locality}
O = {organization}
OU = {organizational_unit}
CN = {common_name}
[ v3_ca ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid:always,issuer:always
basicConstraints = CA:TRUE
keyUsage = digitalSignature, keyCertSign, cRLSign
"""
config_path = os.path.join(ca_dir, 'openssl.cnf')
with open(config_path, 'w') as f:
f.write(openssl_cnf.strip())
# 生成CA私钥
subprocess.run([
'openssl', 'genrsa', '-out', key_path, str(key_size)
], check=True)
# 生成CA自签名证书使用配置文件
subprocess.run([
'openssl', 'req', '-new', '-x509', '-days', str(days_valid),
'-key', key_path, '-out', cert_path,
'-config', config_path
], check=True)
# 保存到数据库
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO certificate_authorities
(name, common_name, organization, organizational_unit, country, state, locality,
key_size, days_valid, cert_path, key_path, created_by)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (ca_name, common_name, organization, organizational_unit, country, state, locality,
key_size, days_valid, cert_path, key_path, created_by))
conn.commit()
return cursor.lastrowid
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, created_by):
"""创建证书并返回证书ID"""
# 获取CA信息
ca = get_ca_by_id(ca_id)
if not ca:
print(f"CA ID {ca_id} 不存在")
return None
# 创建证书目录
cert_dir = os.path.join(Config.CERT_STORE, f"certs_{common_name}")
os.makedirs(cert_dir, exist_ok=True)
key_path = os.path.join(cert_dir, f"{common_name}.key")
csr_path = os.path.join(cert_dir, f"{common_name}.csr")
cert_path = os.path.join(cert_dir, f"{common_name}.crt")
# 1. 生成私钥
try:
subprocess.run([
'openssl', 'genrsa', '-out', key_path, str(key_size)
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"生成私钥失败: {e.stderr.decode()}")
return None
# 2. 创建CSR配置文件
has_san = bool(san_dns or san_ip)
dns_entries = [dns.strip() for dns in san_dns.split(',') if dns.strip()] if san_dns else []
ip_entries = [ip.strip() for ip in san_ip.split(',') if ip.strip()] if san_ip else []
# 构建CSR配置
csr_config = f"""[req]
default_bits = {key_size}
prompt = no
default_md = sha256
distinguished_name = dn
"""
if has_san:
csr_config += "req_extensions = req_ext\n"
csr_config += f"""
[dn]
CN = {common_name}
O = {organization}
OU = {organizational_unit}
C = {country}
ST = {state}
L = {locality}
"""
if has_san:
csr_config += """
[req_ext]
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alt_names
extendedKeyUsage = serverAuth, clientAuth
[alt_names]"""
# 添加DNS SAN条目
for i, dns in enumerate(dns_entries, 1):
csr_config += f"\nDNS.{i} = {dns}"
# 添加IP SAN条目
for i, ip in enumerate(ip_entries, 1):
csr_config += f"\nIP.{i} = {ip}"
# 写入CSR配置文件
csr_config_path = os.path.join(cert_dir, 'csr_config.cnf')
with open(csr_config_path, 'w') as f:
f.write(csr_config.strip()) # 确保没有多余空行
# 3. 生成CSR
try:
subprocess.run([
'openssl', 'req', '-new', '-key', key_path,
'-out', csr_path, '-config', csr_config_path
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"生成CSR失败: {e.stderr.decode()}")
print("CSR配置文件内容:")
print(csr_config)
return None
# 4. 创建证书扩展配置文件
ext_config = """authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage=digitalSignature, keyEncipherment
extendedKeyUsage=serverAuth, clientAuth
"""
if has_san:
ext_config += "subjectAltName=@alt_names\n\n[alt_names]\n"
# 添加DNS SAN条目
for i, dns in enumerate(dns_entries, 1):
ext_config += f"DNS.{i} = {dns}\n"
# 添加IP SAN条目
for i, ip in enumerate(ip_entries, 1):
ext_config += f"IP.{i} = {ip}\n"
ext_config_path = os.path.join(cert_dir, 'ext.cnf')
with open(ext_config_path, 'w') as f:
f.write(ext_config.strip())
# 5. 使用CA签名证书
try:
cmd = [
'openssl', 'x509', '-req', '-in', csr_path,
'-CA', ca['cert_path'], '-CAkey', ca['key_path'],
'-CAcreateserial', '-out', cert_path,
'-days', str(days_valid), '-sha256'
]
# 只有有扩展内容时才添加-extfile参数
if os.path.getsize(ext_config_path) > 0:
cmd.extend(['-extfile', ext_config_path])
subprocess.run(cmd, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"证书签名失败: {e.stderr.decode()}")
print("扩展配置文件内容:")
print(ext_config)
return None
# 6. 保存到数据库
expires_at = datetime.now() + timedelta(days=days_valid)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO certificates
(common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, cert_path,
key_path, csr_path, ca_id, created_by, expires_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, cert_path,
key_path, csr_path, ca_id, created_by, expires_at
))
conn.commit()
return cursor.lastrowid
except Error as e:
print(f"数据库错误: {e}")
conn.rollback()
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def get_ca_by_id(ca_id):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM certificate_authorities WHERE id = %s", (ca_id,))
return cursor.fetchone()
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def get_certificate_by_id(cert_id):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM certificates WHERE id = %s", (cert_id,))
return cursor.fetchone()
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def revoke_certificate(cert_id, reason):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE certificates
SET status = 'revoked', revoked_at = NOW(), revocation_reason = %s
WHERE id = %s
""", (reason, cert_id))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def renew_certificate(cert_id, days_valid):
cert = get_certificate_by_id(cert_id)
if not cert:
return False
ca = get_ca_by_id(cert['ca_id'])
if not ca:
return False
# 使用现有CSR重新签名
new_cert_path = os.path.join(os.path.dirname(cert['cert_path']), f"renewed_{cert['common_name']}.crt")
subprocess.run([
'openssl', 'x509', '-req', '-in', cert['csr_path'], '-CA', ca['cert_path'],
'-CAkey', ca['key_path'], '-CAcreateserial', '-out', new_cert_path,
'-days', str(days_valid), '-sha256'
], check=True)
# 更新数据库
new_expires_at = datetime.now() + timedelta(days=days_valid)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE certificates
SET cert_path = %s, days_valid = %s, expires_at = %s, status = 'active',
revoked_at = NULL, revocation_reason = NULL
WHERE id = %s
""", (new_cert_path, days_valid, new_expires_at, cert_id))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def generate_crl(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
return False
ca_dir = os.path.dirname(ca['cert_path'])
crl_path = os.path.join(ca_dir, f"crl_{ca['name']}.pem")
# 创建完整的OpenSSL配置文件
openssl_cnf = f"""
[ ca ]
default_ca = CA_default
[ CA_default ]
database = {os.path.join(ca_dir, 'index.txt')}
certificate = {ca['cert_path']}
private_key = {ca['key_path']}
crl = {crl_path}
RANDFILE = {os.path.join(ca_dir, '.rand')}
default_days = 365
default_crl_days = 30
default_md = sha256
preserve = no
policy = policy_anything
[ policy_anything ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName = optional
commonName = optional
emailAddress = optional
"""
cnf_path = os.path.join(ca_dir, 'openssl.cnf')
with open(cnf_path, 'w') as f:
f.write(openssl_cnf)
# 确保index.txt存在
index_file = os.path.join(ca_dir, 'index.txt')
if not os.path.exists(index_file):
open(index_file, 'a').close()
# 生成CRL
try:
subprocess.run([
'openssl', 'ca', '-gencrl',
'-config', cnf_path,
'-out', crl_path
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
# 更新数据库
next_update = datetime.now() + timedelta(days=30)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO certificate_revocation_list
(ca_id, crl_path, next_update)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE
crl_path = VALUES(crl_path),
last_updated = NOW(),
next_update = VALUES(next_update)
""", (ca_id, crl_path, next_update))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode() if e.stderr else "Unknown OpenSSL error"
print(f"OpenSSL error: {error_msg}")
flash(f'CRL生成失败: {error_msg}', 'danger')
except Exception as e:
print(f"Error generating CRL: {str(e)}")
flash(f'CRL生成异常: {str(e)}', 'danger')
return False
def export_pkcs12(cert_id, password):
cert = get_certificate_by_id(cert_id)
if not cert:
return None
ca = get_ca_by_id(cert['ca_id'])
if not ca:
return None
pkcs12_path = os.path.join(os.path.dirname(cert['cert_path']), f"{cert['common_name']}.p12")
subprocess.run([
'openssl', 'pkcs12', '-export', '-out', pkcs12_path,
'-inkey', cert['key_path'], '-in', cert['cert_path'],
'-certfile', ca['cert_path'], '-passout', f'pass:{password}'
], check=True)
return pkcs12_path
@app.route('/register', methods=['GET', 'POST'])
def register():
# 检查用户是否已登录
if current_user.is_authenticated:
return redirect(url_for('index'))
# 检查注册功能是否开放
if not current_app.config['REGISTRATION_OPEN']:
flash('系统暂未开放注册', 'warning')
return redirect(url_for('login'))
if request.method == 'POST':
username = request.form['username'].strip()
password = request.form['password']
confirm_password = request.form['confirm_password']
email = request.form.get('email', '').strip()
captcha = request.form['captcha']
# 验证验证码
if not verify_captcha(captcha):
flash('验证码错误', 'danger')
return redirect(url_for('register'))
# 验证密码匹配
if password != confirm_password:
flash('两次输入的密码不匹配', 'danger')
return redirect(url_for('register'))
# 密码策略检查
password_errors = []
policy = current_app.config['PASSWORD_POLICY']
if len(password) < policy['min_length']:
password_errors.append(f"密码长度至少{policy['min_length']}")
if policy['require_uppercase'] and not re.search(r'[A-Z]', password):
password_errors.append("必须包含大写字母")
if policy['require_lowercase'] and not re.search(r'[a-z]', password):
password_errors.append("必须包含小写字母")
if policy['require_digits'] and not re.search(r'[0-9]', password):
password_errors.append("必须包含数字")
if policy['require_special_chars'] and not re.search(r'[^A-Za-z0-9]', password):
password_errors.append("必须包含特殊字符")
if password_errors:
flash('密码不符合要求: ' + ', '.join(password_errors), 'danger')
return redirect(url_for('register'))
# 验证用户名是否已存在
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 检查用户名和邮箱是否已存在
cursor.execute("SELECT id FROM users WHERE username = %s OR email = %s",
(username, email))
if cursor.fetchone():
flash('用户名或邮箱已被注册', 'danger')
return redirect(url_for('register'))
# 根据配置决定是否立即激活账户
is_active = not current_app.config['EMAIL_VERIFICATION_REQUIRED']
verification_token = None
# 创建新用户
password_hash = generate_password_hash(password)
cursor.execute("""
INSERT INTO users (username, password_hash, email, is_admin, is_active, verification_token)
VALUES (%s, %s, %s, %s, %s, %s)
""", (username, password_hash, email, False, is_active, verification_token))
if current_app.config['EMAIL_VERIFICATION_REQUIRED']:
# 生成验证令牌
from itsdangerous import URLSafeTimedSerializer
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
token = serializer.dumps({'username': username, 'email': email}, salt='email-verify')
# 更新用户验证令牌
cursor.execute("""
UPDATE users SET verification_token = %s WHERE username = %s
""", (token, username))
# 发送验证邮件
try:
from flask_mail import Message
msg = Message('请验证您的邮箱 - 自签证书管理系统',
sender=current_app.config['MAIL_DEFAULT_SENDER'],
recipients=[email])
confirm_url = generate_full_url('verify_email', token=token)
# 使用HTML格式的邮件内容
msg.html = f"""
<html>
<body>
<p>感谢注册!</p>
<p>请点击以下链接完成邮箱验证:</p>
<p><a href="{confirm_url}">{confirm_url}</a></p>
<p>(链接24小时内有效)</p>
<p>如果链接无法点击,请复制到浏览器地址栏访问</p>
</body>
</html>
"""
mail = Mail(current_app)
mail.send(msg)
flash('验证邮件已发送至您的邮箱,请查收并完成验证', 'info')
except Exception as e:
current_app.logger.error(f'邮件发送失败: {str(e)}')
conn.rollback()
flash('发送验证邮件失败,请稍后再试或联系管理员', 'danger')
return redirect(url_for('register'))
conn.commit()
if is_active:
flash('注册成功,请登录', 'success')
return redirect(url_for('login'))
else:
return redirect(url_for('register'))
except Error as e:
conn.rollback()
current_app.logger.error(f'数据库错误: {str(e)}')
flash('注册失败,请稍后再试', 'danger')
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 生成新验证码
captcha_url = url_for('captcha') # 使用图片验证码
return render_template('register.html',
captcha_url=captcha_url, # 前端改为显示图片验证码
registration_open=current_app.config['REGISTRATION_OPEN'],
email_required=current_app.config['EMAIL_VERIFICATION_REQUIRED'])
@app.route('/verify-email/<token>')
def verify_email(token):
if current_user.is_authenticated:
return redirect(generate_full_url('index'))
try:
from itsdangerous import URLSafeTimedSerializer
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
data = serializer.loads(token, salt='email-verify', max_age=86400) # 24小时有效期
username = data['username']
email = data['email']
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 验证令牌是否匹配
cursor.execute("""
SELECT id FROM users
WHERE username = %s AND email = %s AND verification_token = %s
""", (username, email, token))
user = cursor.fetchone()
if not user:
flash('无效的验证链接', 'danger')
return redirect(generate_full_url('login'))
# 激活账户
cursor.execute("""
UPDATE users
SET is_active = TRUE, verification_token = NULL
WHERE id = %s
""", (user['id'],))
conn.commit()
flash('邮箱验证成功,您现在可以登录了', 'success')
return redirect(generate_full_url('login'))
except Error as e:
conn.rollback()
current_app.logger.error(f'数据库错误: {str(e)}')
flash('验证过程中出现错误', 'danger')
finally:
if conn.is_connected():
cursor.close()
conn.close()
except Exception as e:
current_app.logger.error(f'令牌验证失败: {str(e)}')
flash('验证链接无效或已过期', 'danger')
return redirect(generate_full_url('login'))
# 路由定义
@app.route('/')
@login_required
def index():
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取CA数量
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as count FROM certificate_authorities")
ca_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
cursor.execute("""
SELECT * FROM certificate_authorities
ORDER BY created_at DESC LIMIT 5
""")
else:
cursor.execute("""
SELECT COUNT(*) as count FROM certificate_authorities
WHERE created_by = %s
""", (current_user.id,))
ca_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
cursor.execute("""
SELECT * FROM certificate_authorities
WHERE created_by = %s
ORDER BY created_at DESC LIMIT 5
""", (current_user.id,))
recent_cas = cursor.fetchall()
# 获取证书数量
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as count FROM certificates")
cert_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
cursor.execute("""
SELECT * FROM certificates
ORDER BY created_at DESC LIMIT 5
""")
else:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE created_by = %s
""", (current_user.id,))
cert_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
cursor.execute("""
SELECT * FROM certificates
WHERE created_by = %s
ORDER BY created_at DESC LIMIT 5
""", (current_user.id,))
recent_certs = cursor.fetchall()
# 获取即将过期证书数量30天内
if current_user.is_admin:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY)
AND status = 'active'
""")
else:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY)
AND status = 'active' AND created_by = %s
""", (current_user.id,))
expiring_soon_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
# 获取活跃证书数量
if current_user.is_admin:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE status = 'active'
""")
else:
cursor.execute("""
SELECT COUNT(*) as count FROM certificates
WHERE status = 'active' AND created_by = %s
""", (current_user.id,))
active_count = cursor.fetchone()['count']
cursor.fetchall() # 确保清空结果集
return render_template('index.html',
ca_count=ca_count,
cert_count=cert_count,
expiring_soon_count=expiring_soon_count,
active_count=active_count,
recent_cas=recent_cas,
recent_certs=recent_certs)
except Error as e:
print(f"Database error: {e}")
flash('获取系统统计信息失败', 'danger')
return render_template('index.html',
ca_count=0,
cert_count=0,
expiring_soon_count=0,
active_count=0,
recent_cas=[],
recent_certs=[])
finally:
if conn.is_connected():
cursor.close()
conn.close()
return render_template('index.html',
ca_count=0,
cert_count=0,
expiring_soon_count=0,
active_count=0,
recent_cas=[],
recent_certs=[])
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
captcha = request.form['captcha']
if not verify_captcha(captcha):
flash('验证码错误', 'danger')
return redirect(url_for('login'))
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT * FROM users
WHERE username = %s AND is_active = TRUE
""", (username,))
user_data = cursor.fetchone()
if user_data and check_password_hash(user_data['password_hash'], password):
user = User()
user.id = user_data['id']
user.username = user_data['username']
user.is_admin = user_data['is_admin']
login_user(user)
flash('登录成功', 'success')
return redirect(url_for('index'))
else:
flash('用户名或密码错误,或账户未激活', 'danger')
except Error as e:
print(f"Database error: {e}")
flash('登录失败,请稍后再试', 'danger')
finally:
if conn.is_connected():
cursor.close()
conn.close()
captcha_url = url_for('captcha')
return render_template('login.html', captcha_url=captcha_url)
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('您已成功登出', 'success')
return redirect(url_for('login'))
from math import ceil
# 每页显示的数量
PER_PAGE = 10
@app.route('/cas')
@login_required
def ca_list():
page = request.args.get('page', 1, type=int)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取总数
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities")
else:
cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities WHERE created_by = %s",
(current_user.id,))
total = cursor.fetchone()['total']
total_pages = ceil(total / PER_PAGE)
# 获取分页数据
offset = (page - 1) * PER_PAGE
if current_user.is_admin:
cursor.execute("SELECT * FROM certificate_authorities ORDER BY created_at DESC LIMIT %s OFFSET %s",
(PER_PAGE, offset))
else:
cursor.execute("""
SELECT * FROM certificate_authorities
WHERE created_by = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (current_user.id, PER_PAGE, offset))
cas = cursor.fetchall()
return render_template('ca_list.html',
cas=cas,
page=page,
total_pages=total_pages,
total=total,
get_username=get_username)
except Error as e:
print(f"Database error: {e}")
flash('获取CA列表失败', 'danger')
return redirect(url_for('index'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('index'))
@app.route('/cas/batch_delete', methods=['POST'])
@login_required
def batch_delete_cas():
if not request.is_json:
return jsonify({'success': False, 'message': 'Invalid request'}), 400
data = request.get_json()
ca_ids = data.get('ids', [])
if not ca_ids:
return jsonify({'success': False, 'message': 'No CAs selected'}), 400
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
try:
cursor = conn.cursor()
# 检查权限并删除
for ca_id in ca_ids:
# 验证CA存在且用户有权限
cursor.execute("SELECT created_by FROM certificate_authorities WHERE id = %s", (ca_id,))
ca = cursor.fetchone()
if not ca:
continue
if not current_user.is_admin and ca['created_by'] != current_user.id:
continue
# 检查是否有关联证书
cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,))
result = cursor.fetchone()
if result['count'] > 0:
continue
# 获取CA信息以便删除文件
cursor.execute("SELECT cert_path, key_path FROM certificate_authorities WHERE id = %s", (ca_id,))
ca_info = cursor.fetchone()
if ca_info:
# 删除文件
try:
if os.path.exists(ca_info['cert_path']):
os.remove(ca_info['cert_path'])
if os.path.exists(ca_info['key_path']):
os.remove(ca_info['key_path'])
# 删除CA目录
ca_dir = os.path.dirname(ca_info['cert_path'])
if os.path.exists(ca_dir):
shutil.rmtree(ca_dir) # 递归删除目录
except OSError as e:
print(f"文件删除错误: {e}")
continue
# 删除数据库记录
cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,))
cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,))
conn.commit()
return jsonify({'success': True, 'message': '批量删除成功'})
except Error as e:
conn.rollback()
print(f"Database error: {e}")
return jsonify({'success': False, 'message': '数据库操作失败'}), 500
finally:
if conn.is_connected():
cursor.close()
conn.close()
@app.route('/certificates')
@login_required
def certificate_list():
page = request.args.get('page', 1, type=int)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取总数
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as total FROM certificates")
else:
cursor.execute("SELECT COUNT(*) as total FROM certificates WHERE created_by = %s", (current_user.id,))
total = cursor.fetchone()['total']
total_pages = ceil(total / PER_PAGE)
# 获取分页数据
offset = (page - 1) * PER_PAGE
if current_user.is_admin:
cursor.execute("""
SELECT c.*, ca.name as ca_name
FROM certificates c
JOIN certificate_authorities ca ON c.ca_id = ca.id
ORDER BY c.created_at DESC
LIMIT %s OFFSET %s
""", (PER_PAGE, offset))
else:
cursor.execute("""
SELECT c.*, ca.name as ca_name
FROM certificates c
JOIN certificate_authorities ca ON c.ca_id = ca.id
WHERE c.created_by = %s
ORDER BY c.created_at DESC
LIMIT %s OFFSET %s
""", (current_user.id, PER_PAGE, offset))
certificates = cursor.fetchall()
return render_template('certificate_list.html',
certificates=certificates,
page=page,
total_pages=total_pages,
total=total,
get_username=get_username)
except Error as e:
print(f"Database error: {e}")
flash('获取证书列表失败', 'danger')
return redirect(url_for('index'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('index'))
@app.route('/certificates/batch_delete', methods=['POST'])
@login_required
def batch_delete_certificates():
if not request.is_json:
return jsonify({'success': False, 'message': 'Invalid request'}), 400
data = request.get_json()
cert_ids = data.get('ids', [])
if not cert_ids:
return jsonify({'success': False, 'message': 'No certificates selected'}), 400
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
try:
cursor = conn.cursor(dictionary=True)
# 检查权限并删除
for cert_id in cert_ids:
# 验证证书存在且用户有权限
cursor.execute("SELECT created_by, cert_path, key_path, csr_path FROM certificates WHERE id = %s",
(cert_id,))
cert = cursor.fetchone()
if not cert:
continue
if not current_user.is_admin and cert['created_by'] != current_user.id:
continue
# 删除文件
try:
files_to_delete = [
cert['cert_path'],
cert['key_path'],
cert['csr_path']
]
# 删除所有指定文件
for file_path in files_to_delete:
if file_path and os.path.exists(file_path):
os.remove(file_path)
# 删除证书目录
cert_dir = os.path.dirname(cert['cert_path'])
if os.path.exists(cert_dir):
shutil.rmtree(cert_dir) # 递归删除目录
except OSError as e:
print(f"文件删除错误: {e}")
continue
# 删除数据库记录
cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,))
conn.commit()
return jsonify({'success': True, 'message': '批量删除成功'})
except Error as e:
conn.rollback()
print(f"Database error: {e}")
return jsonify({'success': False, 'message': '数据库操作失败'}), 500
finally:
if conn.is_connected():
cursor.close()
conn.close()
@app.route('/cas/create', methods=['GET', 'POST'])
@login_required
def create_ca_view():
if request.method == 'POST':
ca_name = request.form['ca_name'].strip()
common_name = request.form['common_name'].strip()
organization = request.form['organization'].strip()
organizational_unit = request.form['organizational_unit'].strip()
country = request.form['country'].strip()
state = request.form['state'].strip()
locality = request.form['locality'].strip()
key_size = int(request.form['key_size'])
days_valid = int(request.form['days_valid'])
# 名称校验
if not validate_name(ca_name):
flash('CA名称无效只能包含中文、字母、数字、下划线和短横线且不能以短横线开头或结尾', 'danger')
return render_template('create_ca.html')
if not validate_common_name(common_name):
flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger')
return render_template('create_ca.html')
# 检查CA名称是否已存在
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM certificate_authorities WHERE name = %s", (ca_name,))
if cursor.fetchone():
flash('CA名称已存在请使用其他名称', 'danger')
return render_template('create_ca.html')
except Error as e:
print(f"Database error: {e}")
flash('检查CA名称失败', 'danger')
return render_template('create_ca.html')
finally:
if conn.is_connected():
cursor.close()
conn.close()
ca_id = create_ca(ca_name, common_name, organization, organizational_unit,
country, state, locality, key_size, days_valid, current_user.id)
if ca_id:
flash('CA创建成功', 'success')
return redirect(url_for('ca_list'))
else:
flash('CA创建失败', 'danger')
return render_template('create_ca.html')
from datetime import timedelta # 确保顶部已导入
@app.route('/cas/<int:ca_id>')
@login_required
def ca_detail(ca_id):
page = request.args.get('page', 1, type=int)
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权访问此CA', 'danger')
return redirect(url_for('ca_list'))
# 获取该CA颁发的证书(分页)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取证书总数
cursor.execute("""
SELECT COUNT(*) as total FROM certificates
WHERE ca_id = %s
""", (ca_id,))
total = cursor.fetchone()['total']
total_pages = ceil(total / PER_PAGE)
# 获取分页数据
offset = (page - 1) * PER_PAGE
cursor.execute("""
SELECT * FROM certificates
WHERE ca_id = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (ca_id, PER_PAGE, offset))
certificates = cursor.fetchall()
# 获取CRL信息
cursor.execute("""
SELECT * FROM certificate_revocation_list
WHERE ca_id = %s
""", (ca_id,))
crl = cursor.fetchone()
return render_template(
'ca_detail.html',
ca=ca,
certificates=certificates,
crl=crl,
page=page,
total_pages=total_pages,
total=total,
timedelta=timedelta,
get_username=get_username
)
except Error as e:
print(f"Database error: {e}")
flash('获取CA详情失败', 'danger')
return redirect(url_for('ca_list'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('ca_list'))
def get_username(user_id):
"""根据用户ID获取用户名"""
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT username FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
return user['username'] if user else str(user_id)
except Error as e:
print(f"Database error: {e}")
return str(user_id)
finally:
if conn.is_connected():
cursor.close()
conn.close()
return str(user_id)
@app.route('/cas/<int:ca_id>/generate_crl')
@login_required
def generate_crl_view(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权操作此CA', 'danger')
return redirect(url_for('ca_list'))
if generate_crl(ca_id):
flash('CRL生成成功', 'success')
else:
flash('CRL生成失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/cas/<int:ca_id>/download_crl')
@login_required
def download_crl(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权操作此CA', 'danger')
return redirect(url_for('ca_list'))
# 获取CRL路径
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT crl_path FROM certificate_revocation_list
WHERE ca_id = %s
""", (ca_id,))
crl = cursor.fetchone()
if crl and os.path.exists(crl['crl_path']):
return send_from_directory(
os.path.dirname(crl['crl_path']),
os.path.basename(crl['crl_path']),
as_attachment=True
)
else:
flash('CRL文件不存在', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
except Error as e:
print(f"Database error: {e}")
flash('获取CRL失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/certificates/create', methods=['GET', 'POST'])
@login_required
def create_certificate_view():
if request.method == 'POST':
common_name = request.form['common_name'].strip()
san_dns = request.form.get('san_dns', '').strip()
san_ip = request.form.get('san_ip', '').strip()
organization = request.form['organization'].strip()
organizational_unit = request.form['organizational_unit'].strip()
country = request.form['country'].strip()
state = request.form['state'].strip()
locality = request.form['locality'].strip()
key_size = int(request.form['key_size'])
days_valid = int(request.form['days_valid'])
ca_id = int(request.form['ca_id'])
# 通用名校验
if not validate_common_name(common_name):
flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger')
return redirect(url_for('create_certificate_view'))
# SAN DNS校验
if san_dns:
for dns in san_dns.split(','):
dns = dns.strip()
if not validate_common_name(dns):
flash(f'DNS SAN条目无效: {dns},只能包含字母、数字、点号和短横线', 'danger')
return redirect(url_for('create_certificate_view'))
# SAN IP校验
if san_ip:
for ip in san_ip.split(','):
ip = ip.strip()
if not re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip):
flash(f'IP SAN条目无效: {ip}请输入有效的IPv4地址', 'danger')
return redirect(url_for('create_certificate_view'))
# 检查证书是否已存在
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM certificates WHERE common_name = %s AND ca_id = %s",
(common_name, ca_id))
if cursor.fetchone():
flash('该CA下已存在相同通用名的证书', 'danger')
return redirect(url_for('create_certificate_view'))
except Error as e:
print(f"Database error: {e}")
flash('检查证书名称失败', 'danger')
return redirect(url_for('create_certificate_view'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
cert_id = create_certificate(ca_id, common_name, san_dns, san_ip, organization,
organizational_unit, country, state, locality,
key_size, days_valid, current_user.id)
if cert_id:
flash('证书创建成功', 'success')
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书创建失败', 'danger')
# 获取可用的CA
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
if current_user.is_admin:
cursor.execute("SELECT id, name FROM certificate_authorities")
else:
cursor.execute("""
SELECT id, name FROM certificate_authorities
WHERE created_by = %s
""", (current_user.id,))
cas = cursor.fetchall()
return render_template('create_certificate.html', cas=cas)
except Error as e:
print(f"Database error: {e}")
flash('获取CA列表失败', 'danger')
return redirect(url_for('certificate_list'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('certificate_list'))
@app.route('/certificates/<int:cert_id>')
@login_required
def certificate_detail(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权访问此证书', 'danger')
return redirect(url_for('certificate_list'))
# 获取CA信息
ca = get_ca_by_id(cert['ca_id'])
return render_template('certificate_detail.html', cert=cert, ca=ca, get_username=get_username)
@app.route('/certificates/<int:cert_id>/revoke', methods=['GET', 'POST'])
@login_required
def revoke_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if cert['status'] == 'revoked':
flash('证书已被吊销', 'warning')
return redirect(url_for('certificate_detail', cert_id=cert_id))
if request.method == 'POST':
reason = request.form['reason']
if revoke_certificate(cert_id, reason):
flash('证书吊销成功', 'success')
# 更新CRL
generate_crl(cert['ca_id'])
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书吊销失败', 'danger')
return render_template('revoke_certificate.html', cert=cert)
@app.route('/certificates/<int:cert_id>/renew', methods=['GET', 'POST'])
@login_required
def renew_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if request.method == 'POST':
days_valid = int(request.form['days_valid'])
if renew_certificate(cert_id, days_valid):
flash('证书续期成功', 'success')
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书续期失败', 'danger')
return render_template('renew_certificate.html', cert=cert)
@app.route('/cas/<int:ca_id>/export')
@login_required
def export_ca_view(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA not found', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('No permission', 'danger')
return redirect(url_for('ca_list'))
# 创建临时zip文件
memory_file = BytesIO()
try:
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
# 添加CA证书文件
with open(ca['cert_path'], 'rb') as f: # 使用二进制模式读取
cert_content = f.read()
zf.writestr(f"{ca['common_name']}_ca.crt", cert_content)
# 添加CA私钥文件(可选,只有管理员可以导出私钥)
if current_user.is_admin:
with open(ca['key_path'], 'rb') as f: # 使用二进制模式读取
key_content = f.read()
zf.writestr(f"{ca['common_name']}_ca.key", key_content)
memory_file.seek(0)
# 确保文件名只包含ASCII字符
safe_filename = f"{ca['name']}_ca_bundle.zip".encode('ascii', 'ignore').decode('ascii')
return Response(
memory_file.getvalue(),
mimetype="application/zip",
headers={
"Content-Disposition": f"attachment; filename={safe_filename}"
}
)
except Exception as e:
flash(f'Export failed: {str(e)}', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/certificates/<int:cert_id>/export', methods=['GET', 'POST'])
@login_required
def export_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if request.method == 'POST':
format_type = request.form['format']
password = request.form.get('password', '')
if format_type == 'pkcs12':
if not password:
flash('PKCS#12格式需要密码', 'danger')
return redirect(url_for('export_certificate_view', cert_id=cert_id))
pkcs12_path = export_pkcs12(cert_id, password)
if pkcs12_path:
return send_from_directory(
os.path.dirname(pkcs12_path),
os.path.basename(pkcs12_path),
as_attachment=True
)
else:
flash('导出PKCS#12失败', 'danger')
elif format_type == 'pem':
# 合并证书和私钥为PEM
pem_content = ""
with open(cert['cert_path'], 'r') as f:
pem_content += f.read()
with open(cert['key_path'], 'r') as f:
pem_content += f.read()
return Response(
pem_content,
mimetype="application/x-pem-file",
headers={
"Content-Disposition": f"attachment; filename={cert['common_name']}.pem"
}
)
elif format_type == 'pem_separate':
# 创建内存中的zip文件(.pem + .key)
return generate_separate_files_zip(
cert,
cert_ext=".pem",
zip_suffix="_pem_key"
)
elif format_type == 'crt_separate':
# 创建内存中的zip文件(.crt + .key)
return generate_separate_files_zip(
cert,
cert_ext=".crt",
zip_suffix="_crt_key"
)
else:
flash('不支持的导出格式', 'danger')
return render_template('export_certificate.html', cert=cert)
# 在app.py中添加以下路由
@app.route('/cas/<int:ca_id>/delete', methods=['GET', 'POST'])
@login_required
def delete_ca(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权删除此CA', 'danger')
return redirect(url_for('ca_list'))
# 检查是否有关联的证书
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,))
result = cursor.fetchone()
if result['count'] > 0:
flash('无法删除CA因为存在关联的证书', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
except Error as e:
print(f"Database error: {e}")
flash('检查关联证书失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
if request.method == 'POST':
# 删除文件
try:
# 删除CA证书和私钥文件
if os.path.exists(ca['cert_path']):
os.remove(ca['cert_path'])
if os.path.exists(ca['key_path']):
os.remove(ca['key_path'])
# 删除CA目录及其所有内容
ca_dir = os.path.dirname(ca['cert_path'])
if os.path.exists(ca_dir):
# 删除目录中的所有文件和子目录
for filename in os.listdir(ca_dir):
file_path = os.path.join(ca_dir, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f'删除文件/目录失败 {file_path}. 原因: {e}')
flash(f'删除文件/目录失败: {str(e)}', 'warning')
# 现在删除空目录
try:
os.rmdir(ca_dir)
except OSError as e:
print(f"删除目录失败: {e}")
flash(f'删除目录失败: {str(e)}', 'warning')
except OSError as e:
print(f"文件删除错误: {e}")
flash(f'删除文件时出错: {str(e)}', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
# 删除数据库记录
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 先删除CRL记录
cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,))
# 再删除CA记录
cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,))
conn.commit()
flash('CA删除成功', 'success')
return redirect(url_for('ca_list'))
except Error as e:
print(f"Database error: {e}")
conn.rollback()
flash('删除CA记录失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return render_template('confirm_delete_ca.html', ca=ca)
@app.route('/certificates/<int:cert_id>/delete', methods=['GET', 'POST'])
@login_required
def delete_certificate(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权删除此证书', 'danger')
return redirect(url_for('certificate_list'))
if request.method == 'POST':
# 删除文件
try:
# 确保所有文件路径都存在
files_to_delete = [
cert['cert_path'],
cert['key_path'],
cert['csr_path']
]
# 删除所有指定文件
for file_path in files_to_delete:
if file_path and os.path.exists(file_path):
os.remove(file_path)
# 删除证书目录及其所有内容
cert_dir = os.path.dirname(cert['cert_path'])
if os.path.exists(cert_dir):
# 删除目录中的所有文件
for filename in os.listdir(cert_dir):
file_path = os.path.join(cert_dir, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f'Failed to delete {file_path}. Reason: {e}')
# 现在删除空目录
os.rmdir(cert_dir)
except OSError as e:
print(f"文件删除错误: {e}")
flash(f'删除文件时出错: {str(e)}', 'danger')
return redirect(url_for('certificate_detail', cert_id=cert_id))
# 删除数据库记录
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,))
conn.commit()
flash('证书删除成功', 'success')
return redirect(url_for('certificate_list'))
except Error as e:
print(f"Database error: {e}")
flash('删除证书记录失败', 'danger')
return redirect(url_for('certificate_detail', cert_id=cert_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return render_template('confirm_delete_certificate.html', cert=cert)
def generate_separate_files_zip(cert, cert_ext, zip_suffix):
"""生成包含分开文件的ZIP包通用函数"""
memory_file = BytesIO()
try:
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
# 添加证书文件
with open(cert['cert_path'], 'r') as f:
cert_content = f.read()
zf.writestr(f"{cert['common_name']}{cert_ext}", cert_content)
# 添加私钥文件
with open(cert['key_path'], 'r') as f:
key_content = f.read()
zf.writestr(f"{cert['common_name']}.key", key_content)
memory_file.seek(0)
return Response(
memory_file.getvalue(),
mimetype="application/zip",
headers={
"Content-Disposition": f"attachment; filename={cert['common_name']}{zip_suffix}.zip"
}
)
except Exception as e:
flash(f'创建ZIP文件失败: {str(e)}', 'danger')
return redirect(url_for('export_certificate_view', cert_id=cert['id']))
@app.route('/download/<path:filename>')
@login_required
def download_file(filename):
# 安全检查:确保文件路径在证书存储目录内
file_path = os.path.join(Config.CERT_STORE, filename)
if not os.path.exists(file_path):
flash('文件不存在', 'danger')
return redirect(url_for('index'))
# 检查权限
# 这里需要根据实际业务逻辑检查用户是否有权下载该文件
# 简单示例:只允许下载自己的证书文件
return send_from_directory(
os.path.dirname(file_path),
os.path.basename(file_path),
as_attachment=True
)
if __name__ == '__main__':
app.run(debug=Config.DEBUG, host=Config.APP_HOST, port=Config.APP_PORT)