1355 lines
45 KiB
Python
1355 lines
45 KiB
Python
import os
|
||
import subprocess
|
||
from datetime import datetime, timedelta
|
||
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, Response
|
||
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
import mysql.connector
|
||
from mysql.connector import Error
|
||
import random
|
||
import string
|
||
from io import BytesIO
|
||
import zipfile
|
||
import shutil
|
||
from pypinyin import pinyin, Style
|
||
import uuid
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = 'your-secret-key-here'
|
||
from pypinyin import pinyin, Style
|
||
|
||
def to_pinyin(text):
|
||
"""将中文转换为拼音"""
|
||
if not text:
|
||
return ""
|
||
# 获取拼音列表,不带声调
|
||
pinyin_list = pinyin(text, style=Style.NORMAL)
|
||
# 拼接成字符串
|
||
return "_".join([item[0] for item in pinyin_list])
|
||
|
||
# 注册模板过滤器
|
||
@app.template_filter('to_pinyin')
|
||
def jinja2_to_pinyin(text):
|
||
return to_pinyin(text)
|
||
|
||
# 数据库配置
|
||
db_config = {
|
||
'host': '192.168.31.11',
|
||
'database': 'cert_manager',
|
||
'user': 'root',
|
||
'password': 'Home123#$.'
|
||
}
|
||
|
||
# Flask-Login 配置
|
||
login_manager = LoginManager()
|
||
login_manager.init_app(app)
|
||
login_manager.login_view = 'login'
|
||
|
||
# 确保证书存储目录存在
|
||
CERT_STORE = os.path.join(os.path.dirname(__file__), 'cert_store')
|
||
os.makedirs(CERT_STORE, exist_ok=True)
|
||
|
||
|
||
class User(UserMixin):
|
||
pass
|
||
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
try:
|
||
conn = mysql.connector.connect(**db_config)
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
|
||
user_data = cursor.fetchone()
|
||
|
||
if user_data:
|
||
user = User()
|
||
user.id = user_data['id']
|
||
user.username = user_data['username']
|
||
user.is_admin = user_data['is_admin']
|
||
return user
|
||
return None
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
# 辅助函数
|
||
def get_db_connection():
|
||
try:
|
||
conn = mysql.connector.connect(**db_config)
|
||
return conn
|
||
except Error as e:
|
||
print(f"Database connection error: {e}")
|
||
return None
|
||
|
||
|
||
def generate_captcha():
|
||
# 生成6位随机验证码
|
||
captcha_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
# 清除旧的验证码
|
||
cursor.execute("DELETE FROM captcha WHERE created_at < NOW() - INTERVAL 10 MINUTE")
|
||
# 插入新验证码
|
||
cursor.execute("INSERT INTO captcha (code) VALUES (%s)", (captcha_code,))
|
||
conn.commit()
|
||
return captcha_code
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return None
|
||
|
||
|
||
def verify_captcha(user_input):
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT code FROM captcha ORDER BY created_at DESC LIMIT 1")
|
||
result = cursor.fetchone()
|
||
if result and user_input.upper() == result[0]:
|
||
return True
|
||
return False
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return False
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return False
|
||
|
||
def validate_name(name, max_length=64):
|
||
"""
|
||
校验名称是否符合规范
|
||
规则:
|
||
1. 长度1-64个字符
|
||
2. 只能包含字母、数字、中文、下划线、短横线
|
||
3. 不能以短横线开头或结尾
|
||
"""
|
||
if not name or len(name) > max_length:
|
||
return False
|
||
|
||
# 允许中文、字母、数字、下划线、短横线
|
||
pattern = r'^[a-zA-Z0-9_\-\u4e00-\u9fa5]+$'
|
||
if not re.match(pattern, name):
|
||
return False
|
||
|
||
# 不能以短横线开头或结尾
|
||
if name.startswith('-') or name.endswith('-'):
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def validate_common_name(cn):
|
||
"""
|
||
校验通用名(Common Name)是否符合规范
|
||
规则:
|
||
1. 长度1-64个字符
|
||
2. 只能包含字母、数字、点号(.)和短横线(-)
|
||
3. 不能以点号或短横线开头或结尾
|
||
4. 不能连续两个点号或短横线
|
||
"""
|
||
if not cn or len(cn) > 64:
|
||
return False
|
||
|
||
# 只允许字母、数字、点号和短横线
|
||
if not re.match(r'^[a-zA-Z0-9.-]+$', cn):
|
||
return False
|
||
|
||
# 不能以点号或短横线开头或结尾
|
||
if cn.startswith('.') or cn.endswith('.') or cn.startswith('-') or cn.endswith('-'):
|
||
return False
|
||
|
||
# 不能连续两个点号或短横线
|
||
if '..' in cn or '--' in cn:
|
||
return False
|
||
|
||
return True
|
||
|
||
def create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid,
|
||
created_by):
|
||
# 创建拼音格式的目录名
|
||
pinyin_name = to_pinyin(ca_name)
|
||
ca_dir = os.path.join(CERT_STORE, f"ca_{pinyin_name}")
|
||
os.makedirs(ca_dir, exist_ok=True)
|
||
|
||
# 使用原始common_name作为文件名
|
||
key_path = os.path.join(ca_dir, f"{common_name}.key")
|
||
cert_path = os.path.join(ca_dir, f"{common_name}.crt")
|
||
|
||
# 生成CA私钥
|
||
subprocess.run([
|
||
'openssl', 'genrsa', '-out', key_path, str(key_size)
|
||
], check=True)
|
||
|
||
# 生成CA自签名证书
|
||
subprocess.run([
|
||
'openssl', 'req', '-new', '-x509', '-days', str(days_valid),
|
||
'-key', key_path, '-out', cert_path,
|
||
'-subj', f'/CN={common_name}/O={organization}/OU={organizational_unit}/C={country}/ST={state}/L={locality}'
|
||
], check=True)
|
||
|
||
# 保存到数据库
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO certificate_authorities
|
||
(name, common_name, organization, organizational_unit, country, state, locality,
|
||
key_size, days_valid, cert_path, key_path, created_by)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
""", (ca_name, common_name, organization, organizational_unit, country, state, locality,
|
||
key_size, days_valid, cert_path, key_path, created_by))
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return None
|
||
|
||
|
||
def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit,
|
||
country, state, locality, key_size, days_valid, created_by):
|
||
# 获取CA信息
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
return None
|
||
|
||
# 创建证书目录
|
||
cert_dir = os.path.join(CERT_STORE, f"certs_{common_name}")
|
||
os.makedirs(cert_dir, exist_ok=True)
|
||
|
||
key_path = os.path.join(cert_dir, f"{common_name}.key")
|
||
csr_path = os.path.join(cert_dir, f"{common_name}.csr")
|
||
cert_path = os.path.join(cert_dir, f"{common_name}.crt")
|
||
|
||
# 生成私钥
|
||
subprocess.run([
|
||
'openssl', 'genrsa', '-out', key_path, str(key_size)
|
||
], check=True)
|
||
|
||
# 创建CSR配置文件
|
||
csr_config = f"""[req]
|
||
default_bits = {key_size}
|
||
prompt = no
|
||
default_md = sha256
|
||
distinguished_name = dn
|
||
req_extensions = req_ext
|
||
|
||
[dn]
|
||
CN = {common_name}
|
||
O = {organization}
|
||
OU = {organizational_unit}
|
||
C = {country}
|
||
ST = {state}
|
||
L = {locality}
|
||
|
||
[req_ext]
|
||
subjectAltName = @alt_names
|
||
|
||
[alt_names]"""
|
||
|
||
# 添加DNS SAN条目
|
||
if san_dns:
|
||
dns_entries = [dns.strip() for dns in san_dns.split(',') if dns.strip()]
|
||
for i, dns in enumerate(dns_entries, 1):
|
||
csr_config += f"\nDNS.{i} = {dns}"
|
||
|
||
# 添加IP SAN条目
|
||
if san_ip:
|
||
ip_entries = [ip.strip() for ip in san_ip.split(',') if ip.strip()]
|
||
for i, ip in enumerate(ip_entries, 1):
|
||
csr_config += f"\nIP.{i} = {ip}"
|
||
|
||
# 确保配置文件不以空行结尾
|
||
csr_config = csr_config.strip()
|
||
|
||
config_path = os.path.join(cert_dir, 'csr_config.cnf')
|
||
with open(config_path, 'w') as f:
|
||
f.write(csr_config)
|
||
|
||
# 生成CSR
|
||
subprocess.run([
|
||
'openssl', 'req', '-new', '-key', key_path, '-out', csr_path,
|
||
'-config', config_path
|
||
], check=True)
|
||
|
||
# 使用CA签名证书
|
||
subprocess.run([
|
||
'openssl', 'x509', '-req', '-in', csr_path, '-CA', ca['cert_path'],
|
||
'-CAkey', ca['key_path'], '-CAcreateserial', '-out', cert_path,
|
||
'-days', str(days_valid), '-sha256'
|
||
], check=True)
|
||
|
||
# 计算过期时间
|
||
expires_at = datetime.now() + timedelta(days=days_valid)
|
||
|
||
# 保存到数据库
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO certificates
|
||
(common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality,
|
||
key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
""", (common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality,
|
||
key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at))
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return None
|
||
|
||
|
||
def get_ca_by_id(ca_id):
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT * FROM certificate_authorities WHERE id = %s", (ca_id,))
|
||
return cursor.fetchone()
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return None
|
||
|
||
|
||
def get_certificate_by_id(cert_id):
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT * FROM certificates WHERE id = %s", (cert_id,))
|
||
return cursor.fetchone()
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return None
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return None
|
||
|
||
|
||
def revoke_certificate(cert_id, reason):
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE certificates
|
||
SET status = 'revoked', revoked_at = NOW(), revocation_reason = %s
|
||
WHERE id = %s
|
||
""", (reason, cert_id))
|
||
conn.commit()
|
||
return True
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return False
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return False
|
||
|
||
|
||
def renew_certificate(cert_id, days_valid):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
return False
|
||
|
||
ca = get_ca_by_id(cert['ca_id'])
|
||
if not ca:
|
||
return False
|
||
|
||
# 使用现有CSR重新签名
|
||
new_cert_path = os.path.join(os.path.dirname(cert['cert_path']), f"renewed_{cert['common_name']}.crt")
|
||
|
||
subprocess.run([
|
||
'openssl', 'x509', '-req', '-in', cert['csr_path'], '-CA', ca['cert_path'],
|
||
'-CAkey', ca['key_path'], '-CAcreateserial', '-out', new_cert_path,
|
||
'-days', str(days_valid), '-sha256'
|
||
], check=True)
|
||
|
||
# 更新数据库
|
||
new_expires_at = datetime.now() + timedelta(days=days_valid)
|
||
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE certificates
|
||
SET cert_path = %s, days_valid = %s, expires_at = %s, status = 'active',
|
||
revoked_at = NULL, revocation_reason = NULL
|
||
WHERE id = %s
|
||
""", (new_cert_path, days_valid, new_expires_at, cert_id))
|
||
conn.commit()
|
||
return True
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return False
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return False
|
||
|
||
|
||
def generate_crl(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
return False
|
||
|
||
ca_dir = os.path.dirname(ca['cert_path'])
|
||
crl_path = os.path.join(ca_dir, f"crl_{ca['name']}.pem")
|
||
|
||
# 创建完整的OpenSSL配置文件
|
||
openssl_cnf = f"""
|
||
[ ca ]
|
||
default_ca = CA_default
|
||
|
||
[ CA_default ]
|
||
database = {os.path.join(ca_dir, 'index.txt')}
|
||
certificate = {ca['cert_path']}
|
||
private_key = {ca['key_path']}
|
||
crl = {crl_path}
|
||
RANDFILE = {os.path.join(ca_dir, '.rand')}
|
||
default_days = 365
|
||
default_crl_days = 30
|
||
default_md = sha256
|
||
preserve = no
|
||
policy = policy_anything
|
||
|
||
[ policy_anything ]
|
||
countryName = optional
|
||
stateOrProvinceName = optional
|
||
localityName = optional
|
||
organizationName = optional
|
||
organizationalUnitName = optional
|
||
commonName = optional
|
||
emailAddress = optional
|
||
"""
|
||
|
||
cnf_path = os.path.join(ca_dir, 'openssl.cnf')
|
||
with open(cnf_path, 'w') as f:
|
||
f.write(openssl_cnf)
|
||
|
||
# 确保index.txt存在
|
||
index_file = os.path.join(ca_dir, 'index.txt')
|
||
if not os.path.exists(index_file):
|
||
open(index_file, 'a').close()
|
||
|
||
# 生成CRL
|
||
try:
|
||
subprocess.run([
|
||
'openssl', 'ca', '-gencrl',
|
||
'-config', cnf_path,
|
||
'-out', crl_path
|
||
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
|
||
|
||
# 更新数据库
|
||
next_update = datetime.now() + timedelta(days=30)
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO certificate_revocation_list
|
||
(ca_id, crl_path, next_update)
|
||
VALUES (%s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE
|
||
crl_path = VALUES(crl_path),
|
||
last_updated = NOW(),
|
||
next_update = VALUES(next_update)
|
||
""", (ca_id, crl_path, next_update))
|
||
conn.commit()
|
||
return True
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return False
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
except subprocess.CalledProcessError as e:
|
||
error_msg = e.stderr.decode() if e.stderr else "Unknown OpenSSL error"
|
||
print(f"OpenSSL error: {error_msg}")
|
||
flash(f'CRL生成失败: {error_msg}', 'danger')
|
||
except Exception as e:
|
||
print(f"Error generating CRL: {str(e)}")
|
||
flash(f'CRL生成异常: {str(e)}', 'danger')
|
||
|
||
return False
|
||
|
||
def export_pkcs12(cert_id, password):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
return None
|
||
|
||
ca = get_ca_by_id(cert['ca_id'])
|
||
if not ca:
|
||
return None
|
||
|
||
pkcs12_path = os.path.join(os.path.dirname(cert['cert_path']), f"{cert['common_name']}.p12")
|
||
|
||
subprocess.run([
|
||
'openssl', 'pkcs12', '-export', '-out', pkcs12_path,
|
||
'-inkey', cert['key_path'], '-in', cert['cert_path'],
|
||
'-certfile', ca['cert_path'], '-passout', f'pass:{password}'
|
||
], check=True)
|
||
|
||
return pkcs12_path
|
||
|
||
|
||
@app.route('/register', methods=['GET', 'POST'])
|
||
def register():
|
||
if current_user.is_authenticated:
|
||
return redirect(url_for('index'))
|
||
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
confirm_password = request.form['confirm_password']
|
||
email = request.form.get('email', '')
|
||
captcha = request.form['captcha']
|
||
|
||
# 验证验证码
|
||
if not verify_captcha(captcha):
|
||
flash('验证码错误', 'danger')
|
||
return redirect(url_for('register'))
|
||
|
||
# 验证密码匹配
|
||
if password != confirm_password:
|
||
flash('两次输入的密码不匹配', 'danger')
|
||
return redirect(url_for('register'))
|
||
|
||
# 验证用户名是否已存在
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT id FROM users WHERE username = %s", (username,))
|
||
if cursor.fetchone():
|
||
flash('用户名已存在', 'danger')
|
||
return redirect(url_for('register'))
|
||
|
||
# 创建新用户
|
||
password_hash = generate_password_hash(password)
|
||
cursor.execute("""
|
||
INSERT INTO users (username, password_hash, email, is_admin, is_active)
|
||
VALUES (%s, %s, %s, %s, %s)
|
||
""", (username, password_hash, email, False, True))
|
||
conn.commit()
|
||
|
||
flash('注册成功,请登录', 'success')
|
||
return redirect(url_for('login'))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('注册失败,请稍后再试', 'danger')
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
captcha_code = generate_captcha()
|
||
return render_template('register.html', captcha_code=captcha_code)
|
||
|
||
# 路由定义
|
||
@app.route('/')
|
||
@login_required
|
||
def index():
|
||
return render_template('index.html')
|
||
|
||
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
if current_user.is_authenticated:
|
||
return redirect(url_for('index'))
|
||
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
captcha = request.form['captcha']
|
||
|
||
if not verify_captcha(captcha):
|
||
flash('验证码错误', 'danger')
|
||
return redirect(url_for('login'))
|
||
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("""
|
||
SELECT * FROM users
|
||
WHERE username = %s AND is_active = TRUE
|
||
""", (username,))
|
||
user_data = cursor.fetchone()
|
||
|
||
if user_data and check_password_hash(user_data['password_hash'], password):
|
||
user = User()
|
||
user.id = user_data['id']
|
||
user.username = user_data['username']
|
||
user.is_admin = user_data['is_admin']
|
||
login_user(user)
|
||
flash('登录成功', 'success')
|
||
return redirect(url_for('index'))
|
||
else:
|
||
flash('用户名或密码错误,或账户未激活', 'danger')
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('登录失败,请稍后再试', 'danger')
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
captcha_code = generate_captcha()
|
||
return render_template('login.html', captcha_code=captcha_code)
|
||
|
||
|
||
@app.route('/logout')
|
||
@login_required
|
||
def logout():
|
||
logout_user()
|
||
flash('您已成功登出', 'success')
|
||
return redirect(url_for('login'))
|
||
|
||
|
||
@app.route('/cas')
|
||
@login_required
|
||
def ca_list():
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
if current_user.is_admin:
|
||
cursor.execute("SELECT * FROM certificate_authorities")
|
||
else:
|
||
cursor.execute("SELECT * FROM certificate_authorities WHERE created_by = %s", (current_user.id,))
|
||
cas = cursor.fetchall()
|
||
return render_template('ca_list.html', cas=cas)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取CA列表失败', 'danger')
|
||
return redirect(url_for('index'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('index'))
|
||
|
||
|
||
@app.route('/cas/create', methods=['GET', 'POST'])
|
||
@login_required
|
||
def create_ca_view():
|
||
if request.method == 'POST':
|
||
ca_name = request.form['ca_name'].strip()
|
||
common_name = request.form['common_name'].strip()
|
||
organization = request.form['organization'].strip()
|
||
organizational_unit = request.form['organizational_unit'].strip()
|
||
country = request.form['country'].strip()
|
||
state = request.form['state'].strip()
|
||
locality = request.form['locality'].strip()
|
||
key_size = int(request.form['key_size'])
|
||
days_valid = int(request.form['days_valid'])
|
||
|
||
# 名称校验
|
||
if not validate_name(ca_name):
|
||
flash('CA名称无效:只能包含中文、字母、数字、下划线和短横线,且不能以短横线开头或结尾', 'danger')
|
||
return render_template('create_ca.html')
|
||
|
||
if not validate_common_name(common_name):
|
||
flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger')
|
||
return render_template('create_ca.html')
|
||
|
||
# 检查CA名称是否已存在
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT id FROM certificate_authorities WHERE name = %s", (ca_name,))
|
||
if cursor.fetchone():
|
||
flash('CA名称已存在,请使用其他名称', 'danger')
|
||
return render_template('create_ca.html')
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('检查CA名称失败', 'danger')
|
||
return render_template('create_ca.html')
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
ca_id = create_ca(ca_name, common_name, organization, organizational_unit,
|
||
country, state, locality, key_size, days_valid, current_user.id)
|
||
|
||
if ca_id:
|
||
flash('CA创建成功', 'success')
|
||
return redirect(url_for('ca_list'))
|
||
else:
|
||
flash('CA创建失败', 'danger')
|
||
|
||
return render_template('create_ca.html')
|
||
|
||
from datetime import timedelta # 确保顶部已导入
|
||
|
||
@app.route('/cas/<int:ca_id>')
|
||
@login_required
|
||
def ca_detail(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA不存在', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('无权访问此CA', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 获取该CA颁发的证书
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("""
|
||
SELECT * FROM certificates
|
||
WHERE ca_id = %s
|
||
ORDER BY created_at DESC
|
||
""", (ca_id,))
|
||
certificates = cursor.fetchall()
|
||
|
||
# 获取CRL信息
|
||
cursor.execute("""
|
||
SELECT * FROM certificate_revocation_list
|
||
WHERE ca_id = %s
|
||
""", (ca_id,))
|
||
crl = cursor.fetchone()
|
||
|
||
return render_template(
|
||
'ca_detail.html',
|
||
ca=ca,
|
||
certificates=certificates,
|
||
crl=crl,
|
||
timedelta=timedelta, # 传递timedelta到模板
|
||
get_username=get_username # 确保这个函数已定义
|
||
)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取CA详情失败', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('ca_list'))
|
||
|
||
def get_username(user_id):
|
||
"""根据用户ID获取用户名"""
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT username FROM users WHERE id = %s", (user_id,))
|
||
user = cursor.fetchone()
|
||
return user['username'] if user else str(user_id)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
return str(user_id)
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return str(user_id)
|
||
|
||
@app.route('/cas/<int:ca_id>/generate_crl')
|
||
@login_required
|
||
def generate_crl_view(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA不存在', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('无权操作此CA', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
if generate_crl(ca_id):
|
||
flash('CRL生成成功', 'success')
|
||
else:
|
||
flash('CRL生成失败', 'danger')
|
||
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
|
||
|
||
@app.route('/cas/<int:ca_id>/download_crl')
|
||
@login_required
|
||
def download_crl(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA不存在', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('无权操作此CA', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 获取CRL路径
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("""
|
||
SELECT crl_path FROM certificate_revocation_list
|
||
WHERE ca_id = %s
|
||
""", (ca_id,))
|
||
crl = cursor.fetchone()
|
||
|
||
if crl and os.path.exists(crl['crl_path']):
|
||
return send_from_directory(
|
||
os.path.dirname(crl['crl_path']),
|
||
os.path.basename(crl['crl_path']),
|
||
as_attachment=True
|
||
)
|
||
else:
|
||
flash('CRL文件不存在', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取CRL失败', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
|
||
|
||
@app.route('/certificates')
|
||
@login_required
|
||
def certificate_list():
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
if current_user.is_admin:
|
||
cursor.execute("""
|
||
SELECT c.*, ca.name as ca_name
|
||
FROM certificates c
|
||
JOIN certificate_authorities ca ON c.ca_id = ca.id
|
||
ORDER BY c.created_at DESC
|
||
""")
|
||
else:
|
||
cursor.execute("""
|
||
SELECT c.*, ca.name as ca_name
|
||
FROM certificates c
|
||
JOIN certificate_authorities ca ON c.ca_id = ca.id
|
||
WHERE c.created_by = %s
|
||
ORDER BY c.created_at DESC
|
||
""", (current_user.id,))
|
||
certificates = cursor.fetchall()
|
||
return render_template('certificate_list.html', certificates=certificates)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取证书列表失败', 'danger')
|
||
return redirect(url_for('index'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('index'))
|
||
|
||
|
||
@app.route('/certificates/create', methods=['GET', 'POST'])
|
||
@login_required
|
||
def create_certificate_view():
|
||
if request.method == 'POST':
|
||
common_name = request.form['common_name'].strip()
|
||
san_dns = request.form.get('san_dns', '').strip()
|
||
san_ip = request.form.get('san_ip', '').strip()
|
||
organization = request.form['organization'].strip()
|
||
organizational_unit = request.form['organizational_unit'].strip()
|
||
country = request.form['country'].strip()
|
||
state = request.form['state'].strip()
|
||
locality = request.form['locality'].strip()
|
||
key_size = int(request.form['key_size'])
|
||
days_valid = int(request.form['days_valid'])
|
||
ca_id = int(request.form['ca_id'])
|
||
|
||
# 通用名校验
|
||
if not validate_common_name(common_name):
|
||
flash('通用名无效:只能包含字母、数字、点号和短横线,且不能以点号或短横线开头或结尾', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
|
||
# SAN DNS校验
|
||
if san_dns:
|
||
for dns in san_dns.split(','):
|
||
dns = dns.strip()
|
||
if not validate_common_name(dns):
|
||
flash(f'DNS SAN条目无效: {dns},只能包含字母、数字、点号和短横线', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
|
||
# SAN IP校验
|
||
if san_ip:
|
||
for ip in san_ip.split(','):
|
||
ip = ip.strip()
|
||
if not re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip):
|
||
flash(f'IP SAN条目无效: {ip},请输入有效的IPv4地址', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
|
||
# 检查证书是否已存在
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT id FROM certificates WHERE common_name = %s AND ca_id = %s",
|
||
(common_name, ca_id))
|
||
if cursor.fetchone():
|
||
flash('该CA下已存在相同通用名的证书', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('检查证书名称失败', 'danger')
|
||
return redirect(url_for('create_certificate_view'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
cert_id = create_certificate(ca_id, common_name, san_dns, san_ip, organization,
|
||
organizational_unit, country, state, locality,
|
||
key_size, days_valid, current_user.id)
|
||
|
||
if cert_id:
|
||
flash('证书创建成功', 'success')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
else:
|
||
flash('证书创建失败', 'danger')
|
||
|
||
# 获取可用的CA
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
if current_user.is_admin:
|
||
cursor.execute("SELECT id, name FROM certificate_authorities")
|
||
else:
|
||
cursor.execute("""
|
||
SELECT id, name FROM certificate_authorities
|
||
WHERE created_by = %s
|
||
""", (current_user.id,))
|
||
cas = cursor.fetchall()
|
||
return render_template('create_certificate.html', cas=cas)
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('获取CA列表失败', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
@app.route('/certificates/<int:cert_id>')
|
||
@login_required
|
||
def certificate_detail(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权访问此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 获取CA信息
|
||
ca = get_ca_by_id(cert['ca_id'])
|
||
|
||
return render_template('certificate_detail.html', cert=cert, ca=ca, get_username=get_username)
|
||
|
||
|
||
@app.route('/certificates/<int:cert_id>/revoke', methods=['GET', 'POST'])
|
||
@login_required
|
||
def revoke_certificate_view(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权操作此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
if cert['status'] == 'revoked':
|
||
flash('证书已被吊销', 'warning')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
|
||
if request.method == 'POST':
|
||
reason = request.form['reason']
|
||
if revoke_certificate(cert_id, reason):
|
||
flash('证书吊销成功', 'success')
|
||
# 更新CRL
|
||
generate_crl(cert['ca_id'])
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
else:
|
||
flash('证书吊销失败', 'danger')
|
||
|
||
return render_template('revoke_certificate.html', cert=cert)
|
||
|
||
|
||
@app.route('/certificates/<int:cert_id>/renew', methods=['GET', 'POST'])
|
||
@login_required
|
||
def renew_certificate_view(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权操作此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
if request.method == 'POST':
|
||
days_valid = int(request.form['days_valid'])
|
||
if renew_certificate(cert_id, days_valid):
|
||
flash('证书续期成功', 'success')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
else:
|
||
flash('证书续期失败', 'danger')
|
||
|
||
return render_template('renew_certificate.html', cert=cert)
|
||
|
||
|
||
@app.route('/cas/<int:ca_id>/export')
|
||
@login_required
|
||
def export_ca_view(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA not found', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('No permission', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 创建临时zip文件
|
||
memory_file = BytesIO()
|
||
|
||
try:
|
||
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||
# 添加CA证书文件
|
||
with open(ca['cert_path'], 'rb') as f: # 使用二进制模式读取
|
||
cert_content = f.read()
|
||
zf.writestr(f"{ca['common_name']}_ca.crt", cert_content)
|
||
|
||
# 添加CA私钥文件(可选,只有管理员可以导出私钥)
|
||
if current_user.is_admin:
|
||
with open(ca['key_path'], 'rb') as f: # 使用二进制模式读取
|
||
key_content = f.read()
|
||
zf.writestr(f"{ca['common_name']}_ca.key", key_content)
|
||
|
||
memory_file.seek(0)
|
||
|
||
# 确保文件名只包含ASCII字符
|
||
safe_filename = f"{ca['name']}_ca_bundle.zip".encode('ascii', 'ignore').decode('ascii')
|
||
|
||
return Response(
|
||
memory_file.getvalue(),
|
||
mimetype="application/zip",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={safe_filename}"
|
||
}
|
||
)
|
||
except Exception as e:
|
||
flash(f'Export failed: {str(e)}', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
@app.route('/certificates/<int:cert_id>/export', methods=['GET', 'POST'])
|
||
@login_required
|
||
def export_certificate_view(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权操作此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
if request.method == 'POST':
|
||
format_type = request.form['format']
|
||
password = request.form.get('password', '')
|
||
|
||
if format_type == 'pkcs12':
|
||
if not password:
|
||
flash('PKCS#12格式需要密码', 'danger')
|
||
return redirect(url_for('export_certificate_view', cert_id=cert_id))
|
||
|
||
pkcs12_path = export_pkcs12(cert_id, password)
|
||
if pkcs12_path:
|
||
return send_from_directory(
|
||
os.path.dirname(pkcs12_path),
|
||
os.path.basename(pkcs12_path),
|
||
as_attachment=True
|
||
)
|
||
else:
|
||
flash('导出PKCS#12失败', 'danger')
|
||
elif format_type == 'pem':
|
||
# 合并证书和私钥为PEM
|
||
pem_content = ""
|
||
with open(cert['cert_path'], 'r') as f:
|
||
pem_content += f.read()
|
||
with open(cert['key_path'], 'r') as f:
|
||
pem_content += f.read()
|
||
|
||
return Response(
|
||
pem_content,
|
||
mimetype="application/x-pem-file",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={cert['common_name']}.pem"
|
||
}
|
||
)
|
||
elif format_type == 'pem_separate':
|
||
# 创建内存中的zip文件(.pem + .key)
|
||
return generate_separate_files_zip(
|
||
cert,
|
||
cert_ext=".pem",
|
||
zip_suffix="_pem_key"
|
||
)
|
||
elif format_type == 'crt_separate':
|
||
# 创建内存中的zip文件(.crt + .key)
|
||
return generate_separate_files_zip(
|
||
cert,
|
||
cert_ext=".crt",
|
||
zip_suffix="_crt_key"
|
||
)
|
||
else:
|
||
flash('不支持的导出格式', 'danger')
|
||
|
||
return render_template('export_certificate.html', cert=cert)
|
||
|
||
# 在app.py中添加以下路由
|
||
|
||
@app.route('/cas/<int:ca_id>/delete', methods=['GET', 'POST'])
|
||
@login_required
|
||
def delete_ca(ca_id):
|
||
ca = get_ca_by_id(ca_id)
|
||
if not ca:
|
||
flash('CA不存在', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and ca['created_by'] != current_user.id:
|
||
flash('无权删除此CA', 'danger')
|
||
return redirect(url_for('ca_list'))
|
||
|
||
# 检查是否有关联的证书
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor(dictionary=True)
|
||
cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,))
|
||
result = cursor.fetchone()
|
||
if result['count'] > 0:
|
||
flash('无法删除CA,因为存在关联的证书', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('检查关联证书失败', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
if request.method == 'POST':
|
||
# 删除文件
|
||
try:
|
||
if os.path.exists(ca['cert_path']):
|
||
os.remove(ca['cert_path'])
|
||
if os.path.exists(ca['key_path']):
|
||
os.remove(ca['key_path'])
|
||
# 删除CA目录
|
||
ca_dir = os.path.dirname(ca['cert_path'])
|
||
if os.path.exists(ca_dir):
|
||
os.rmdir(ca_dir)
|
||
except OSError as e:
|
||
print(f"文件删除错误: {e}")
|
||
flash('删除文件时出错', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
|
||
# 删除数据库记录
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,))
|
||
conn.commit()
|
||
flash('CA删除成功', 'success')
|
||
return redirect(url_for('ca_list'))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('删除CA记录失败', 'danger')
|
||
return redirect(url_for('ca_detail', ca_id=ca_id))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return render_template('confirm_delete_ca.html', ca=ca)
|
||
|
||
@app.route('/certificates/<int:cert_id>/delete', methods=['GET', 'POST'])
|
||
@login_required
|
||
def delete_certificate(cert_id):
|
||
cert = get_certificate_by_id(cert_id)
|
||
if not cert:
|
||
flash('证书不存在', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
# 检查权限
|
||
if not current_user.is_admin and cert['created_by'] != current_user.id:
|
||
flash('无权删除此证书', 'danger')
|
||
return redirect(url_for('certificate_list'))
|
||
|
||
if request.method == 'POST':
|
||
# 删除文件
|
||
try:
|
||
# 确保所有文件路径都存在
|
||
files_to_delete = [
|
||
cert['cert_path'],
|
||
cert['key_path'],
|
||
cert['csr_path']
|
||
]
|
||
|
||
# 删除所有指定文件
|
||
for file_path in files_to_delete:
|
||
if file_path and os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
|
||
# 删除证书目录及其所有内容
|
||
cert_dir = os.path.dirname(cert['cert_path'])
|
||
if os.path.exists(cert_dir):
|
||
# 删除目录中的所有文件
|
||
for filename in os.listdir(cert_dir):
|
||
file_path = os.path.join(cert_dir, filename)
|
||
try:
|
||
if os.path.isfile(file_path) or os.path.islink(file_path):
|
||
os.unlink(file_path)
|
||
elif os.path.isdir(file_path):
|
||
shutil.rmtree(file_path)
|
||
except Exception as e:
|
||
print(f'Failed to delete {file_path}. Reason: {e}')
|
||
|
||
# 现在删除空目录
|
||
os.rmdir(cert_dir)
|
||
|
||
except OSError as e:
|
||
print(f"文件删除错误: {e}")
|
||
flash(f'删除文件时出错: {str(e)}', 'danger')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
|
||
# 删除数据库记录
|
||
conn = get_db_connection()
|
||
if conn:
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,))
|
||
conn.commit()
|
||
flash('证书删除成功', 'success')
|
||
return redirect(url_for('certificate_list'))
|
||
except Error as e:
|
||
print(f"Database error: {e}")
|
||
flash('删除证书记录失败', 'danger')
|
||
return redirect(url_for('certificate_detail', cert_id=cert_id))
|
||
finally:
|
||
if conn.is_connected():
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return render_template('confirm_delete_certificate.html', cert=cert)
|
||
def generate_separate_files_zip(cert, cert_ext, zip_suffix):
|
||
"""生成包含分开文件的ZIP包通用函数"""
|
||
memory_file = BytesIO()
|
||
|
||
try:
|
||
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||
# 添加证书文件
|
||
with open(cert['cert_path'], 'r') as f:
|
||
cert_content = f.read()
|
||
zf.writestr(f"{cert['common_name']}{cert_ext}", cert_content)
|
||
|
||
# 添加私钥文件
|
||
with open(cert['key_path'], 'r') as f:
|
||
key_content = f.read()
|
||
zf.writestr(f"{cert['common_name']}.key", key_content)
|
||
|
||
memory_file.seek(0)
|
||
|
||
return Response(
|
||
memory_file.getvalue(),
|
||
mimetype="application/zip",
|
||
headers={
|
||
"Content-Disposition": f"attachment; filename={cert['common_name']}{zip_suffix}.zip"
|
||
}
|
||
)
|
||
except Exception as e:
|
||
flash(f'创建ZIP文件失败: {str(e)}', 'danger')
|
||
return redirect(url_for('export_certificate_view', cert_id=cert['id']))
|
||
|
||
@app.route('/download/<path:filename>')
|
||
@login_required
|
||
def download_file(filename):
|
||
# 安全检查:确保文件路径在证书存储目录内
|
||
file_path = os.path.join(CERT_STORE, filename)
|
||
if not os.path.exists(file_path):
|
||
flash('文件不存在', 'danger')
|
||
return redirect(url_for('index'))
|
||
|
||
# 检查权限
|
||
# 这里需要根据实际业务逻辑检查用户是否有权下载该文件
|
||
# 简单示例:只允许下载自己的证书文件
|
||
|
||
return send_from_directory(
|
||
os.path.dirname(file_path),
|
||
os.path.basename(file_path),
|
||
as_attachment=True
|
||
)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app.run(debug=True, ssl_context='adhoc', host='0.0.0.0', port='9875') |