修复根证书信任问题

This commit is contained in:
wzj 2025-06-15 07:19:50 +08:00
parent afa61f7a04
commit 47f0c572b3

178
app.py
View File

@ -365,9 +365,11 @@ def create_ca(ca_name, common_name, organization, organizational_unit, country,
def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit, def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, created_by): country, state, locality, key_size, days_valid, created_by):
"""创建证书并返回证书ID"""
# 获取CA信息 # 获取CA信息
ca = get_ca_by_id(ca_id) ca = get_ca_by_id(ca_id)
if not ca: if not ca:
print(f"CA ID {ca_id} 不存在")
return None return None
# 创建证书目录 # 创建证书目录
@ -378,132 +380,146 @@ def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organi
csr_path = os.path.join(cert_dir, f"{common_name}.csr") csr_path = os.path.join(cert_dir, f"{common_name}.csr")
cert_path = os.path.join(cert_dir, f"{common_name}.crt") cert_path = os.path.join(cert_dir, f"{common_name}.crt")
# 生成私钥 # 1. 生成私钥
subprocess.run([ try:
'openssl', 'genrsa', '-out', key_path, str(key_size) subprocess.run([
], check=True) 'openssl', 'genrsa', '-out', key_path, str(key_size)
], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"生成私钥失败: {e.stderr.decode()}")
return None
# 创建CSR配置文件 # 2. 创建CSR配置文件
csr_config = f"""[req]
default_bits = {key_size}
prompt = no
default_md = sha256
distinguished_name = dn
"""
# 只有在有SAN时才添加扩展部分
has_san = bool(san_dns or san_ip) has_san = bool(san_dns or san_ip)
dns_entries = [dns.strip() for dns in san_dns.split(',') if dns.strip()] if san_dns else []
ip_entries = [ip.strip() for ip in san_ip.split(',') if ip.strip()] if san_ip else []
# 构建CSR配置
csr_config = f"""[req]
default_bits = {key_size}
prompt = no
default_md = sha256
distinguished_name = dn
"""
if has_san: if has_san:
csr_config += "req_extensions = req_ext\n" csr_config += "req_extensions = req_ext\n"
csr_config += f""" csr_config += f"""
[dn] [dn]
CN = {common_name} CN = {common_name}
O = {organization} O = {organization}
OU = {organizational_unit} OU = {organizational_unit}
C = {country} C = {country}
ST = {state} ST = {state}
L = {locality} L = {locality}
""" """
if has_san: if has_san:
csr_config += """ csr_config += """
[req_ext] [req_ext]
basicConstraints = CA:FALSE basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment keyUsage = digitalSignature, keyEncipherment
subjectAltName = @alt_names subjectAltName = @alt_names
extendedKeyUsage = serverAuth, clientAuth extendedKeyUsage = serverAuth, clientAuth
[alt_names]""" [alt_names]"""
# 添加DNS SAN条目 # 添加DNS SAN条目
if san_dns: for i, dns in enumerate(dns_entries, 1):
dns_entries = [dns.strip() for dns in san_dns.split(',') if dns.strip()] csr_config += f"\nDNS.{i} = {dns}"
for i, dns in enumerate(dns_entries, 1):
csr_config += f"\nDNS.{i} = {dns}"
# 添加IP SAN条目 # 添加IP SAN条目
if san_ip: for i, ip in enumerate(ip_entries, 1):
ip_entries = [ip.strip() for ip in san_ip.split(',') if ip.strip()] csr_config += f"\nIP.{i} = {ip}"
for i, ip in enumerate(ip_entries, 1):
csr_config += f"\nIP.{i} = {ip}"
# 确保配置文件不以空行结尾 # 写入CSR配置文件
csr_config = csr_config.strip() csr_config_path = os.path.join(cert_dir, 'csr_config.cnf')
with open(csr_config_path, 'w') as f:
f.write(csr_config.strip()) # 确保没有多余空行
config_path = os.path.join(cert_dir, 'csr_config.cnf') # 3. 生成CSR
with open(config_path, 'w') as f:
f.write(csr_config)
# 生成CSR
try: try:
subprocess.run([ subprocess.run([
'openssl', 'req', '-new', '-key', key_path, '-out', csr_path, 'openssl', 'req', '-new', '-key', key_path,
'-config', config_path '-out', csr_path, '-config', csr_config_path
], check=True) ], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"OpenSSL错误: {e}") print(f"生成CSR失败: {e.stderr.decode()}")
print("CSR配置文件内容:") print("CSR配置文件内容:")
print(csr_config) print(csr_config)
return None return None
# 使用CA签名证书添加-extfile参数 # 4. 创建证书扩展配置文件
ext_config = """authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage=digitalSignature, keyEncipherment
extendedKeyUsage=serverAuth, clientAuth
"""
if has_san:
ext_config += "subjectAltName=@alt_names\n\n[alt_names]\n"
# 添加DNS SAN条目
for i, dns in enumerate(dns_entries, 1):
ext_config += f"DNS.{i} = {dns}\n"
# 添加IP SAN条目
for i, ip in enumerate(ip_entries, 1):
ext_config += f"IP.{i} = {ip}\n"
ext_config_path = os.path.join(cert_dir, 'ext.cnf')
with open(ext_config_path, 'w') as f:
f.write(ext_config.strip())
# 5. 使用CA签名证书
try: try:
# 创建扩展配置文件 cmd = [
ext_config = f""" 'openssl', 'x509', '-req', '-in', csr_path,
authorityKeyIdentifier=keyid,issuer '-CA', ca['cert_path'], '-CAkey', ca['key_path'],
basicConstraints=CA:FALSE '-CAcreateserial', '-out', cert_path,
keyUsage=digitalSignature, keyEncipherment '-days', str(days_valid), '-sha256'
extendedKeyUsage=serverAuth, clientAuth ]
subjectAltName=@alt_names
[alt_names]""" # 只有有扩展内容时才添加-extfile参数
if os.path.getsize(ext_config_path) > 0:
cmd.extend(['-extfile', ext_config_path])
if san_dns: subprocess.run(cmd, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
for i, dns in enumerate(dns_entries, 1):
ext_config += f"\nDNS.{i} = {dns}"
if san_ip:
for i, ip in enumerate(ip_entries, 1):
ext_config += f"\nIP.{i} = {ip}"
ext_path = os.path.join(cert_dir, 'ext.cnf')
with open(ext_path, 'w') as f:
f.write(ext_config.strip())
subprocess.run([
'openssl', 'x509', '-req', '-in', csr_path, '-CA', ca['cert_path'],
'-CAkey', ca['key_path'], '-CAcreateserial', '-out', cert_path,
'-days', str(days_valid), '-sha256', '-extfile', ext_path
], check=True)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"签名证书错误: {e}") print(f"证书签名失败: {e.stderr.decode()}")
print("扩展配置文件内容:")
print(ext_config)
return None return None
# 计算过期时间 # 6. 保存到数据库
expires_at = datetime.now() + timedelta(days=days_valid) expires_at = datetime.now() + timedelta(days=days_valid)
# 保存到数据库
conn = get_db_connection() conn = get_db_connection()
if conn: if conn:
try: try:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(""" cursor.execute("""
INSERT INTO certificates INSERT INTO certificates
(common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, (common_name, san_dns, san_ip, organization, organizational_unit,
key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at) country, state, locality, key_size, days_valid, cert_path,
key_path, csr_path, ca_id, created_by, expires_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, """, (
key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at)) common_name, san_dns, san_ip, organization, organizational_unit,
country, state, locality, key_size, days_valid, cert_path,
key_path, csr_path, ca_id, created_by, expires_at
))
conn.commit() conn.commit()
return cursor.lastrowid return cursor.lastrowid
except Error as e: except Error as e:
print(f"Database error: {e}") print(f"数据库错误: {e}")
conn.rollback()
return None return None
finally: finally:
if conn.is_connected(): if conn.is_connected():
cursor.close() cursor.close()
conn.close() conn.close()
return None return None