certmanager/app.py
2025-06-14 09:46:15 +08:00

1038 lines
34 KiB
Python

import os
import subprocess
from datetime import datetime, timedelta
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, Response
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
import mysql.connector
from mysql.connector import Error
import random
import string
from io import BytesIO
import zipfile
import uuid
app = Flask(__name__)
app.secret_key = 'your-secret-key-here'
# 数据库配置
db_config = {
'host': '192.168.31.11',
'database': 'cert_manager',
'user': 'root',
'password': 'Home123#$.'
}
# Flask-Login 配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
# 确保证书存储目录存在
CERT_STORE = os.path.join(os.path.dirname(__file__), 'cert_store')
os.makedirs(CERT_STORE, exist_ok=True)
class User(UserMixin):
pass
@login_manager.user_loader
def load_user(user_id):
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user_data = cursor.fetchone()
if user_data:
user = User()
user.id = user_data['id']
user.username = user_data['username']
user.is_admin = user_data['is_admin']
return user
return None
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 辅助函数
def get_db_connection():
try:
conn = mysql.connector.connect(**db_config)
return conn
except Error as e:
print(f"Database connection error: {e}")
return None
def generate_captcha():
# 生成6位随机验证码
captcha_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 清除旧的验证码
cursor.execute("DELETE FROM captcha WHERE created_at < NOW() - INTERVAL 10 MINUTE")
# 插入新验证码
cursor.execute("INSERT INTO captcha (code) VALUES (%s)", (captcha_code,))
conn.commit()
return captcha_code
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def verify_captcha(user_input):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("SELECT code FROM captcha ORDER BY created_at DESC LIMIT 1")
result = cursor.fetchone()
if result and user_input.upper() == result[0]:
return True
return False
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid,
created_by):
# 创建CA目录
ca_dir = os.path.join(CERT_STORE, f"ca_{ca_name}")
os.makedirs(ca_dir, exist_ok=True)
key_path = os.path.join(ca_dir, f"{common_name}.key")
cert_path = os.path.join(ca_dir, f"{common_name}.crt")
# 生成CA私钥
subprocess.run([
'openssl', 'genrsa', '-out', key_path, str(key_size)
], check=True)
# 生成CA自签名证书
subprocess.run([
'openssl', 'req', '-new', '-x509', '-days', str(days_valid),
'-key', key_path, '-out', cert_path,
'-subj', f'/CN={common_name}/O={organization}/OU={organizational_unit}/C={country}/ST={state}/L={locality}'
], check=True)
# 保存到数据库
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO certificate_authorities
(name, common_name, organization, organizational_unit, country, state, locality,
key_size, days_valid, cert_path, key_path, created_by)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (ca_name, common_name, organization, organizational_unit, country, state, locality,
key_size, days_valid, cert_path, key_path, created_by))
conn.commit()
return cursor.lastrowid
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, created_by):
# 获取CA信息
ca = get_ca_by_id(ca_id)
if not ca:
return None
# 创建证书目录
cert_dir = os.path.join(CERT_STORE, f"certs_{common_name}")
os.makedirs(cert_dir, exist_ok=True)
key_path = os.path.join(cert_dir, f"{common_name}.key")
csr_path = os.path.join(cert_dir, f"{common_name}.csr")
cert_path = os.path.join(cert_dir, f"{common_name}.crt")
# 生成私钥
subprocess.run([
'openssl', 'genrsa', '-out', key_path, str(key_size)
], check=True)
# 创建CSR配置文件
csr_config = f"""
[req]
default_bits = {key_size}
prompt = no
default_md = sha256
distinguished_name = dn
[dn]
CN = {common_name}
O = {organization}
OU = {organizational_unit}
C = {country}
ST = {state}
L = {locality}
"""
if san_dns or san_ip:
csr_config += "\nreq_extensions = req_ext\n[req_ext]\nsubjectAltName = @alt_names\n[alt_names]\n"
if san_dns:
dns_entries = san_dns.split(',')
for i, dns in enumerate(dns_entries, 1):
csr_config += f"DNS.{i} = {dns.strip()}\n"
if san_ip:
ip_entries = san_ip.split(',')
for i, ip in enumerate(ip_entries, 1):
csr_config += f"IP.{i} = {ip.strip()}\n"
config_path = os.path.join(cert_dir, 'csr_config.cnf')
with open(config_path, 'w') as f:
f.write(csr_config)
# 生成CSR
subprocess.run([
'openssl', 'req', '-new', '-key', key_path, '-out', csr_path,
'-config', config_path
], check=True)
# 使用CA签名证书
subprocess.run([
'openssl', 'x509', '-req', '-in', csr_path, '-CA', ca['cert_path'],
'-CAkey', ca['key_path'], '-CAcreateserial', '-out', cert_path,
'-days', str(days_valid), '-sha256'
], check=True)
# 计算过期时间
expires_at = datetime.now() + timedelta(days=days_valid)
# 保存到数据库
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO certificates
(common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality,
key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality,
key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at))
conn.commit()
return cursor.lastrowid
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def get_ca_by_id(ca_id):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM certificate_authorities WHERE id = %s", (ca_id,))
return cursor.fetchone()
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def get_certificate_by_id(cert_id):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM certificates WHERE id = %s", (cert_id,))
return cursor.fetchone()
except Error as e:
print(f"Database error: {e}")
return None
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def revoke_certificate(cert_id, reason):
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE certificates
SET status = 'revoked', revoked_at = NOW(), revocation_reason = %s
WHERE id = %s
""", (reason, cert_id))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def renew_certificate(cert_id, days_valid):
cert = get_certificate_by_id(cert_id)
if not cert:
return False
ca = get_ca_by_id(cert['ca_id'])
if not ca:
return False
# 使用现有CSR重新签名
new_cert_path = os.path.join(os.path.dirname(cert['cert_path']), f"renewed_{cert['common_name']}.crt")
subprocess.run([
'openssl', 'x509', '-req', '-in', cert['csr_path'], '-CA', ca['cert_path'],
'-CAkey', ca['key_path'], '-CAcreateserial', '-out', new_cert_path,
'-days', str(days_valid), '-sha256'
], check=True)
# 更新数据库
new_expires_at = datetime.now() + timedelta(days=days_valid)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE certificates
SET cert_path = %s, days_valid = %s, expires_at = %s, status = 'active',
revoked_at = NULL, revocation_reason = NULL
WHERE id = %s
""", (new_cert_path, days_valid, new_expires_at, cert_id))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def generate_crl(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
return False
crl_path = os.path.join(os.path.dirname(ca['cert_path']), f"crl_{ca['name']}.pem")
# 获取所有被吊销的证书
revoked_certs = []
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT cert_path FROM certificates
WHERE ca_id = %s AND status = 'revoked'
""", (ca_id,))
revoked_certs = [row['cert_path'] for row in cursor.fetchall()]
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 创建索引文件
index_file = os.path.join(os.path.dirname(ca['cert_path']), 'index.txt')
if not os.path.exists(index_file):
open(index_file, 'w').close()
# 生成CRL
subprocess.run([
'openssl', 'ca', '-gencrl', '-out', crl_path,
'-keyfile', ca['key_path'], '-cert', ca['cert_path'],
'-crldays', '30'
], check=True)
# 更新数据库
next_update = datetime.now() + timedelta(days=30)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
# 检查是否已有CRL记录
cursor.execute("SELECT id FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,))
if cursor.fetchone():
cursor.execute("""
UPDATE certificate_revocation_list
SET crl_path = %s, last_updated = NOW(), next_update = %s
WHERE ca_id = %s
""", (crl_path, next_update, ca_id))
else:
cursor.execute("""
INSERT INTO certificate_revocation_list
(ca_id, crl_path, next_update)
VALUES (%s, %s, %s)
""", (ca_id, crl_path, next_update))
conn.commit()
return True
except Error as e:
print(f"Database error: {e}")
return False
finally:
if conn.is_connected():
cursor.close()
conn.close()
return False
def export_pkcs12(cert_id, password):
cert = get_certificate_by_id(cert_id)
if not cert:
return None
ca = get_ca_by_id(cert['ca_id'])
if not ca:
return None
pkcs12_path = os.path.join(os.path.dirname(cert['cert_path']), f"{cert['common_name']}.p12")
subprocess.run([
'openssl', 'pkcs12', '-export', '-out', pkcs12_path,
'-inkey', cert['key_path'], '-in', cert['cert_path'],
'-certfile', ca['cert_path'], '-passout', f'pass:{password}'
], check=True)
return pkcs12_path
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
confirm_password = request.form['confirm_password']
email = request.form.get('email', '')
captcha = request.form['captcha']
# 验证验证码
if not verify_captcha(captcha):
flash('验证码错误', 'danger')
return redirect(url_for('register'))
# 验证密码匹配
if password != confirm_password:
flash('两次输入的密码不匹配', 'danger')
return redirect(url_for('register'))
# 验证用户名是否已存在
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE username = %s", (username,))
if cursor.fetchone():
flash('用户名已存在', 'danger')
return redirect(url_for('register'))
# 创建新用户
password_hash = generate_password_hash(password)
cursor.execute("""
INSERT INTO users (username, password_hash, email, is_admin, is_active)
VALUES (%s, %s, %s, %s, %s)
""", (username, password_hash, email, False, True))
conn.commit()
flash('注册成功,请登录', 'success')
return redirect(url_for('login'))
except Error as e:
print(f"Database error: {e}")
flash('注册失败,请稍后再试', 'danger')
finally:
if conn.is_connected():
cursor.close()
conn.close()
captcha_code = generate_captcha()
return render_template('register.html', captcha_code=captcha_code)
# 路由定义
@app.route('/')
@login_required
def index():
return render_template('index.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
captcha = request.form['captcha']
if not verify_captcha(captcha):
flash('验证码错误', 'danger')
return redirect(url_for('login'))
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT * FROM users
WHERE username = %s AND is_active = TRUE
""", (username,))
user_data = cursor.fetchone()
if user_data and check_password_hash(user_data['password_hash'], password):
user = User()
user.id = user_data['id']
user.username = user_data['username']
user.is_admin = user_data['is_admin']
login_user(user)
flash('登录成功', 'success')
return redirect(url_for('index'))
else:
flash('用户名或密码错误,或账户未激活', 'danger')
except Error as e:
print(f"Database error: {e}")
flash('登录失败,请稍后再试', 'danger')
finally:
if conn.is_connected():
cursor.close()
conn.close()
captcha_code = generate_captcha()
return render_template('login.html', captcha_code=captcha_code)
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('您已成功登出', 'success')
return redirect(url_for('login'))
@app.route('/cas')
@login_required
def ca_list():
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
if current_user.is_admin:
cursor.execute("SELECT * FROM certificate_authorities")
else:
cursor.execute("SELECT * FROM certificate_authorities WHERE created_by = %s", (current_user.id,))
cas = cursor.fetchall()
return render_template('ca_list.html', cas=cas)
except Error as e:
print(f"Database error: {e}")
flash('获取CA列表失败', 'danger')
return redirect(url_for('index'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('index'))
@app.route('/cas/create', methods=['GET', 'POST'])
@login_required
def create_ca_view():
if request.method == 'POST':
ca_name = request.form['ca_name']
common_name = request.form['common_name']
organization = request.form['organization']
organizational_unit = request.form['organizational_unit']
country = request.form['country']
state = request.form['state']
locality = request.form['locality']
key_size = int(request.form['key_size'])
days_valid = int(request.form['days_valid'])
ca_id = create_ca(ca_name, common_name, organization, organizational_unit,
country, state, locality, key_size, days_valid, current_user.id)
if ca_id:
flash('CA创建成功', 'success')
return redirect(url_for('ca_list'))
else:
flash('CA创建失败', 'danger')
return render_template('create_ca.html')
@app.route('/cas/<int:ca_id>')
@login_required
def ca_detail(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权访问此CA', 'danger')
return redirect(url_for('ca_list'))
# 获取该CA颁发的证书
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT * FROM certificates
WHERE ca_id = %s
ORDER BY created_at DESC
""", (ca_id,))
certificates = cursor.fetchall()
# 获取CRL信息
cursor.execute("""
SELECT * FROM certificate_revocation_list
WHERE ca_id = %s
""", (ca_id,))
crl = cursor.fetchone()
return render_template('ca_detail.html', ca=ca, certificates=certificates, crl=crl)
except Error as e:
print(f"Database error: {e}")
flash('获取CA详情失败', 'danger')
return redirect(url_for('ca_list'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('ca_list'))
@app.route('/cas/<int:ca_id>/generate_crl')
@login_required
def generate_crl_view(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权操作此CA', 'danger')
return redirect(url_for('ca_list'))
if generate_crl(ca_id):
flash('CRL生成成功', 'success')
else:
flash('CRL生成失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/cas/<int:ca_id>/download_crl')
@login_required
def download_crl(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA不存在', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('无权操作此CA', 'danger')
return redirect(url_for('ca_list'))
# 获取CRL路径
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT crl_path FROM certificate_revocation_list
WHERE ca_id = %s
""", (ca_id,))
crl = cursor.fetchone()
if crl and os.path.exists(crl['crl_path']):
return send_from_directory(
os.path.dirname(crl['crl_path']),
os.path.basename(crl['crl_path']),
as_attachment=True
)
else:
flash('CRL文件不存在', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
except Error as e:
print(f"Database error: {e}")
flash('获取CRL失败', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/certificates')
@login_required
def certificate_list():
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
if current_user.is_admin:
cursor.execute("""
SELECT c.*, ca.name as ca_name
FROM certificates c
JOIN certificate_authorities ca ON c.ca_id = ca.id
ORDER BY c.created_at DESC
""")
else:
cursor.execute("""
SELECT c.*, ca.name as ca_name
FROM certificates c
JOIN certificate_authorities ca ON c.ca_id = ca.id
WHERE c.created_by = %s
ORDER BY c.created_at DESC
""", (current_user.id,))
certificates = cursor.fetchall()
return render_template('certificate_list.html', certificates=certificates)
except Error as e:
print(f"Database error: {e}")
flash('获取证书列表失败', 'danger')
return redirect(url_for('index'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('index'))
@app.route('/certificates/create', methods=['GET', 'POST'])
@login_required
def create_certificate_view():
if request.method == 'POST':
common_name = request.form['common_name']
san_dns = request.form.get('san_dns', '')
san_ip = request.form.get('san_ip', '')
organization = request.form['organization']
organizational_unit = request.form['organizational_unit']
country = request.form['country']
state = request.form['state']
locality = request.form['locality']
key_size = int(request.form['key_size'])
days_valid = int(request.form['days_valid'])
ca_id = int(request.form['ca_id'])
cert_id = create_certificate(ca_id, common_name, san_dns, san_ip, organization,
organizational_unit, country, state, locality,
key_size, days_valid, current_user.id)
if cert_id:
flash('证书创建成功', 'success')
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书创建失败', 'danger')
# 获取可用的CA
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
if current_user.is_admin:
cursor.execute("SELECT id, name FROM certificate_authorities")
else:
cursor.execute("""
SELECT id, name FROM certificate_authorities
WHERE created_by = %s
""", (current_user.id,))
cas = cursor.fetchall()
return render_template('create_certificate.html', cas=cas)
except Error as e:
print(f"Database error: {e}")
flash('获取CA列表失败', 'danger')
return redirect(url_for('certificate_list'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('certificate_list'))
@app.route('/certificates/<int:cert_id>')
@login_required
def certificate_detail(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权访问此证书', 'danger')
return redirect(url_for('certificate_list'))
# 获取CA信息
ca = get_ca_by_id(cert['ca_id'])
return render_template('certificate_detail.html', cert=cert, ca=ca)
@app.route('/certificates/<int:cert_id>/revoke', methods=['GET', 'POST'])
@login_required
def revoke_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if cert['status'] == 'revoked':
flash('证书已被吊销', 'warning')
return redirect(url_for('certificate_detail', cert_id=cert_id))
if request.method == 'POST':
reason = request.form['reason']
if revoke_certificate(cert_id, reason):
flash('证书吊销成功', 'success')
# 更新CRL
generate_crl(cert['ca_id'])
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书吊销失败', 'danger')
return render_template('revoke_certificate.html', cert=cert)
@app.route('/certificates/<int:cert_id>/renew', methods=['GET', 'POST'])
@login_required
def renew_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if request.method == 'POST':
days_valid = int(request.form['days_valid'])
if renew_certificate(cert_id, days_valid):
flash('证书续期成功', 'success')
return redirect(url_for('certificate_detail', cert_id=cert_id))
else:
flash('证书续期失败', 'danger')
return render_template('renew_certificate.html', cert=cert)
@app.route('/cas/<int:ca_id>/export')@app.route('/cas/<int:ca_id>/export')
@login_required
def export_ca_view(ca_id):
ca = get_ca_by_id(ca_id)
if not ca:
flash('CA not found', 'danger')
return redirect(url_for('ca_list'))
# 检查权限
if not current_user.is_admin and ca['created_by'] != current_user.id:
flash('No permission', 'danger')
return redirect(url_for('ca_list'))
# 创建临时zip文件
memory_file = BytesIO()
try:
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
# 添加CA证书文件
with open(ca['cert_path'], 'rb') as f: # 使用二进制模式读取
cert_content = f.read()
zf.writestr(f"{ca['common_name']}_ca.crt", cert_content)
# 添加CA私钥文件(可选,只有管理员可以导出私钥)
if current_user.is_admin:
with open(ca['key_path'], 'rb') as f: # 使用二进制模式读取
key_content = f.read()
zf.writestr(f"{ca['common_name']}_ca.key", key_content)
memory_file.seek(0)
# 确保文件名只包含ASCII字符
safe_filename = f"{ca['name']}_ca_bundle.zip".encode('ascii', 'ignore').decode('ascii')
return Response(
memory_file.getvalue(),
mimetype="application/zip",
headers={
"Content-Disposition": f"attachment; filename={safe_filename}"
}
)
except Exception as e:
flash(f'Export failed: {str(e)}', 'danger')
return redirect(url_for('ca_detail', ca_id=ca_id))
@app.route('/certificates/<int:cert_id>/export', methods=['GET', 'POST'])
@login_required
def export_certificate_view(cert_id):
cert = get_certificate_by_id(cert_id)
if not cert:
flash('证书不存在', 'danger')
return redirect(url_for('certificate_list'))
# 检查权限
if not current_user.is_admin and cert['created_by'] != current_user.id:
flash('无权操作此证书', 'danger')
return redirect(url_for('certificate_list'))
if request.method == 'POST':
format_type = request.form['format']
password = request.form.get('password', '')
if format_type == 'pkcs12':
if not password:
flash('PKCS#12格式需要密码', 'danger')
return redirect(url_for('export_certificate_view', cert_id=cert_id))
pkcs12_path = export_pkcs12(cert_id, password)
if pkcs12_path:
return send_from_directory(
os.path.dirname(pkcs12_path),
os.path.basename(pkcs12_path),
as_attachment=True
)
else:
flash('导出PKCS#12失败', 'danger')
elif format_type == 'pem':
# 合并证书和私钥为PEM
pem_content = ""
with open(cert['cert_path'], 'r') as f:
pem_content += f.read()
with open(cert['key_path'], 'r') as f:
pem_content += f.read()
return Response(
pem_content,
mimetype="application/x-pem-file",
headers={
"Content-Disposition": f"attachment; filename={cert['common_name']}.pem"
}
)
elif format_type == 'pem_separate':
# 创建内存中的zip文件(.pem + .key)
return generate_separate_files_zip(
cert,
cert_ext=".pem",
zip_suffix="_pem_key"
)
elif format_type == 'crt_separate':
# 创建内存中的zip文件(.crt + .key)
return generate_separate_files_zip(
cert,
cert_ext=".crt",
zip_suffix="_crt_key"
)
else:
flash('不支持的导出格式', 'danger')
return render_template('export_certificate.html', cert=cert)
def generate_separate_files_zip(cert, cert_ext, zip_suffix):
"""生成包含分开文件的ZIP包通用函数"""
memory_file = BytesIO()
try:
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
# 添加证书文件
with open(cert['cert_path'], 'r') as f:
cert_content = f.read()
zf.writestr(f"{cert['common_name']}{cert_ext}", cert_content)
# 添加私钥文件
with open(cert['key_path'], 'r') as f:
key_content = f.read()
zf.writestr(f"{cert['common_name']}.key", key_content)
memory_file.seek(0)
return Response(
memory_file.getvalue(),
mimetype="application/zip",
headers={
"Content-Disposition": f"attachment; filename={cert['common_name']}{zip_suffix}.zip"
}
)
except Exception as e:
flash(f'创建ZIP文件失败: {str(e)}', 'danger')
return redirect(url_for('export_certificate_view', cert_id=cert['id']))
@app.route('/download/<path:filename>')
@login_required
def download_file(filename):
# 安全检查:确保文件路径在证书存储目录内
file_path = os.path.join(CERT_STORE, filename)
if not os.path.exists(file_path):
flash('文件不存在', 'danger')
return redirect(url_for('index'))
# 检查权限
# 这里需要根据实际业务逻辑检查用户是否有权下载该文件
# 简单示例:只允许下载自己的证书文件
return send_from_directory(
os.path.dirname(file_path),
os.path.basename(file_path),
as_attachment=True
)
if __name__ == '__main__':
app.run(debug=True, ssl_context='adhoc', host='0.0.0.0', port='9875')