diff --git a/app.py b/app.py index c16deed..d429237 100644 --- a/app.py +++ b/app.py @@ -10,7 +10,20 @@ import random import string from io import BytesIO import zipfile +from pypinyin import pinyin, Style import uuid +def to_pinyin(text): + """将中文转换为拼音""" + if not text: + return "" + # 获取拼音列表,不带声调 + pinyin_list = pinyin(text, style=Style.NORMAL) + # 拼接成字符串 + return "_".join([item[0] for item in pinyin_list]) + +@app.template_filter('to_pinyin') +def jinja2_to_pinyin(text): + return to_pinyin(text) app = Flask(__name__) app.secret_key = 'your-secret-key-here' @@ -116,10 +129,12 @@ def verify_captcha(user_input): def create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, created_by): - # 创建CA目录 - ca_dir = os.path.join(CERT_STORE, f"ca_{ca_name}") + # 创建拼音格式的目录名 + pinyin_name = to_pinyin(ca_name) + ca_dir = os.path.join(CERT_STORE, f"ca_{pinyin_name}") os.makedirs(ca_dir, exist_ok=True) + # 使用原始common_name作为文件名 key_path = os.path.join(ca_dir, f"{common_name}.key") cert_path = os.path.join(ca_dir, f"{common_name}.crt") @@ -357,69 +372,77 @@ def generate_crl(ca_id): if not ca: return False - crl_path = os.path.join(os.path.dirname(ca['cert_path']), f"crl_{ca['name']}.pem") + # 将中文路径转换为拼音 + ca_dir = os.path.dirname(ca['cert_path']) + pinyin_name = to_pinyin(ca['name']) + crl_path = os.path.join(ca_dir, f"crl_{pinyin_name}.pem") - # 获取所有被吊销的证书 - revoked_certs = [] - conn = get_db_connection() - if conn: - try: - cursor = conn.cursor(dictionary=True) - cursor.execute(""" - SELECT cert_path FROM certificates - WHERE ca_id = %s AND status = 'revoked' - """, (ca_id,)) - revoked_certs = [row['cert_path'] for row in cursor.fetchall()] - except Error as e: - print(f"Database error: {e}") - return False - finally: - if conn.is_connected(): - cursor.close() - conn.close() + # 创建必要的配置文件 + openssl_cnf = f""" + [ ca ] + default_ca = CA_default - # 创建索引文件 - index_file = os.path.join(os.path.dirname(ca['cert_path']), 'index.txt') + [ CA_default ] + database = {os.path.join(ca_dir, 'index.txt')} + certificate = {ca['cert_path']} + private_key = {ca['key_path']} + crl = {crl_path} + RANDFILE = {os.path.join(ca_dir, '.rand')} + """ + + # 确保目录存在 + os.makedirs(ca_dir, exist_ok=True) + + cnf_path = os.path.join(ca_dir, 'openssl.cnf') + with open(cnf_path, 'w', encoding='utf-8') as f: + f.write(openssl_cnf) + + # 确保index.txt存在 + index_file = os.path.join(ca_dir, 'index.txt') if not os.path.exists(index_file): - open(index_file, 'w').close() + open(index_file, 'a').close() # 生成CRL - subprocess.run([ - 'openssl', 'ca', '-gencrl', '-out', crl_path, - '-keyfile', ca['key_path'], '-cert', ca['cert_path'], - '-crldays', '30' - ], check=True) + try: + subprocess.run([ + 'openssl', 'ca', '-gencrl', + '-config', cnf_path, + '-out', crl_path, + '-crldays', '30' + ], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, encoding='utf-8') - # 更新数据库 - next_update = datetime.now() + timedelta(days=30) - - conn = get_db_connection() - if conn: - try: - cursor = conn.cursor() - # 检查是否已有CRL记录 - cursor.execute("SELECT id FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,)) - if cursor.fetchone(): - cursor.execute(""" - UPDATE certificate_revocation_list - SET crl_path = %s, last_updated = NOW(), next_update = %s - WHERE ca_id = %s - """, (crl_path, next_update, ca_id)) - else: + # 更新数据库 + next_update = datetime.now() + timedelta(days=30) + conn = get_db_connection() + if conn: + try: + cursor = conn.cursor() cursor.execute(""" INSERT INTO certificate_revocation_list (ca_id, crl_path, next_update) VALUES (%s, %s, %s) + ON DUPLICATE KEY UPDATE + crl_path = VALUES(crl_path), + last_updated = NOW(), + next_update = VALUES(next_update) """, (ca_id, crl_path, next_update)) - conn.commit() - return True - except Error as e: - print(f"Database error: {e}") - return False - finally: - if conn.is_connected(): - cursor.close() - conn.close() + conn.commit() + return True + except Error as e: + print(f"Database error: {e}") + return False + finally: + if conn.is_connected(): + cursor.close() + conn.close() + except subprocess.CalledProcessError as e: + error_msg = f"OpenSSL错误: {e.stderr}" if e.stderr else "未知OpenSSL错误" + print(error_msg) + flash(f'CRL生成失败: {error_msg}', 'danger') + except Exception as e: + print(f"CRL生成异常: {str(e)}") + flash(f'CRL生成异常: {str(e)}', 'danger') + return False diff --git a/templates/ca_detail.html b/templates/ca_detail.html index 571d4ea..75b94bf 100644 --- a/templates/ca_detail.html +++ b/templates/ca_detail.html @@ -4,12 +4,29 @@ {% block content %}
-

CA机构详情: {{ ca.name }}

+

CA机构详情: {{ ca.name }} + (路径: {{ ca.name|to_pinyin }}) +

- 导出CA - 生成CRL + + 导出CA + + + 生成CRL + {% if crl %} - 下载CRL + + 下载CRL + {% endif %}
@@ -18,26 +35,28 @@
-
基本信息
+
+ 基本信息 +
-
通用名
+
通用名
{{ ca.common_name }}
-
组织
+
组织
{{ ca.organization }}
-
组织单位
+
组织单位
{{ ca.organizational_unit or 'N/A' }}
-
国家
+
国家
{{ ca.country }}
-
州/省
+
州/省
{{ ca.state or 'N/A' }}
-
城市
+
城市
{{ ca.locality or 'N/A' }}
@@ -47,27 +66,35 @@
-
技术信息
+
+ 技术信息 +
-
密钥长度
+
密钥长度
{{ ca.key_size }}位
-
有效期
-
{{ ca.days_valid }}天
+
有效期
+
{{ ca.days_valid }}天 (至 {{ (ca.created_at + timedelta(days=ca.days_valid)).strftime('%Y-%m-%d') }})
-
创建者
-
{{ ca.created_by }}
+
创建者
+
{{ get_username(ca.created_by) }}
-
创建时间
+
创建时间
{{ ca.created_at.strftime('%Y-%m-%d %H:%M') }}
-
证书路径
-
{{ ca.cert_path }}
+
证书路径
+
+ {{ ca.cert_path }} + 拼音路径: {{ ca.cert_path|to_pinyin }} +
-
私钥路径
-
{{ ca.key_path }}
+
私钥路径
+
+ {{ ca.key_path }} + 拼音路径: {{ ca.key_path|to_pinyin }} +
@@ -76,13 +103,24 @@
-
颁发的证书
- 创建证书 +
+ 颁发的证书 + {{ certificates|length }} +
+
+ {% if certificates %}
- + @@ -99,27 +137,58 @@ - {% else %} - - - {% endfor %}
ID 通用名{{ cert.common_name }} {% if cert.status == 'active' %} - 有效 + 有效 {% elif cert.status == 'revoked' %} - 已吊销 + 已吊销 {% else %} - 已过期 + 已过期 {% endif %} {{ cert.expires_at.strftime('%Y-%m-%d') }} {{ cert.created_at.strftime('%Y-%m-%d') }} - 详情 +
该CA尚未颁发任何证书
+ {% else %} +
+ +
该CA尚未颁发任何证书
+ + 立即创建证书 + +
+ {% endif %}
+{% endblock %} + +{% block scripts %} +{{ super() }} + {% endblock %} \ No newline at end of file