import os import subprocess from datetime import datetime, timedelta from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, Response from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash import mysql.connector from mysql.connector import Error import random import string from io import BytesIO import zipfile import uuid app = Flask(__name__) app.secret_key = 'your-secret-key-here' # 数据库配置 db_config = { 'host': '192.168.31.11', 'database': 'cert_manager', 'user': 'root', 'password': 'Home123#$.' } # Flask-Login 配置 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' # 确保证书存储目录存在 CERT_STORE = os.path.join(os.path.dirname(__file__), 'cert_store') os.makedirs(CERT_STORE, exist_ok=True) class User(UserMixin): pass @login_manager.user_loader def load_user(user_id): try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user_data = cursor.fetchone() if user_data: user = User() user.id = user_data['id'] user.username = user_data['username'] user.is_admin = user_data['is_admin'] return user return None except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() # 辅助函数 def get_db_connection(): try: conn = mysql.connector.connect(**db_config) return conn except Error as e: print(f"Database connection error: {e}") return None def generate_captcha(): # 生成6位随机验证码 captcha_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) conn = get_db_connection() if conn: try: cursor = conn.cursor() # 清除旧的验证码 cursor.execute("DELETE FROM captcha WHERE created_at < NOW() - INTERVAL 10 MINUTE") # 插入新验证码 cursor.execute("INSERT INTO captcha (code) VALUES (%s)", (captcha_code,)) conn.commit() return captcha_code except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() return None def verify_captcha(user_input): conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute("SELECT code FROM captcha ORDER BY created_at DESC LIMIT 1") result = cursor.fetchone() if result and user_input.upper() == result[0]: return True return False except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() return False def create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, created_by): # 创建CA目录 ca_dir = os.path.join(CERT_STORE, f"ca_{ca_name}") os.makedirs(ca_dir, exist_ok=True) key_path = os.path.join(ca_dir, f"{common_name}.key") cert_path = os.path.join(ca_dir, f"{common_name}.crt") # 生成CA私钥 subprocess.run([ 'openssl', 'genrsa', '-out', key_path, str(key_size) ], check=True) # 生成CA自签名证书 subprocess.run([ 'openssl', 'req', '-new', '-x509', '-days', str(days_valid), '-key', key_path, '-out', cert_path, '-subj', f'/CN={common_name}/O={organization}/OU={organizational_unit}/C={country}/ST={state}/L={locality}' ], check=True) # 保存到数据库 conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" INSERT INTO certificate_authorities (name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, cert_path, key_path, created_by) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, (ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, cert_path, key_path, created_by)) conn.commit() return cursor.lastrowid except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() return None def create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, key_size, days_valid, created_by): # 获取CA信息 ca = get_ca_by_id(ca_id) if not ca: return None # 创建证书目录 cert_dir = os.path.join(CERT_STORE, f"certs_{common_name}") os.makedirs(cert_dir, exist_ok=True) key_path = os.path.join(cert_dir, f"{common_name}.key") csr_path = os.path.join(cert_dir, f"{common_name}.csr") cert_path = os.path.join(cert_dir, f"{common_name}.crt") # 生成私钥 subprocess.run([ 'openssl', 'genrsa', '-out', key_path, str(key_size) ], check=True) # 创建CSR配置文件 csr_config = f""" [req] default_bits = {key_size} prompt = no default_md = sha256 distinguished_name = dn [dn] CN = {common_name} O = {organization} OU = {organizational_unit} C = {country} ST = {state} L = {locality} """ if san_dns or san_ip: csr_config += "\nreq_extensions = req_ext\n[req_ext]\nsubjectAltName = @alt_names\n[alt_names]\n" if san_dns: dns_entries = san_dns.split(',') for i, dns in enumerate(dns_entries, 1): csr_config += f"DNS.{i} = {dns.strip()}\n" if san_ip: ip_entries = san_ip.split(',') for i, ip in enumerate(ip_entries, 1): csr_config += f"IP.{i} = {ip.strip()}\n" config_path = os.path.join(cert_dir, 'csr_config.cnf') with open(config_path, 'w') as f: f.write(csr_config) # 生成CSR subprocess.run([ 'openssl', 'req', '-new', '-key', key_path, '-out', csr_path, '-config', config_path ], check=True) # 使用CA签名证书 subprocess.run([ 'openssl', 'x509', '-req', '-in', csr_path, '-CA', ca['cert_path'], '-CAkey', ca['key_path'], '-CAcreateserial', '-out', cert_path, '-days', str(days_valid), '-sha256' ], check=True) # 计算过期时间 expires_at = datetime.now() + timedelta(days=days_valid) # 保存到数据库 conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" INSERT INTO certificates (common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, (common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, key_size, days_valid, cert_path, key_path, csr_path, ca_id, created_by, expires_at)) conn.commit() return cursor.lastrowid except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() return None def get_ca_by_id(ca_id): conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM certificate_authorities WHERE id = %s", (ca_id,)) return cursor.fetchone() except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() return None def get_certificate_by_id(cert_id): conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute("SELECT * FROM certificates WHERE id = %s", (cert_id,)) return cursor.fetchone() except Error as e: print(f"Database error: {e}") return None finally: if conn.is_connected(): cursor.close() conn.close() return None def revoke_certificate(cert_id, reason): conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" UPDATE certificates SET status = 'revoked', revoked_at = NOW(), revocation_reason = %s WHERE id = %s """, (reason, cert_id)) conn.commit() return True except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() return False def renew_certificate(cert_id, days_valid): cert = get_certificate_by_id(cert_id) if not cert: return False ca = get_ca_by_id(cert['ca_id']) if not ca: return False # 使用现有CSR重新签名 new_cert_path = os.path.join(os.path.dirname(cert['cert_path']), f"renewed_{cert['common_name']}.crt") subprocess.run([ 'openssl', 'x509', '-req', '-in', cert['csr_path'], '-CA', ca['cert_path'], '-CAkey', ca['key_path'], '-CAcreateserial', '-out', new_cert_path, '-days', str(days_valid), '-sha256' ], check=True) # 更新数据库 new_expires_at = datetime.now() + timedelta(days=days_valid) conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute(""" UPDATE certificates SET cert_path = %s, days_valid = %s, expires_at = %s, status = 'active', revoked_at = NULL, revocation_reason = NULL WHERE id = %s """, (new_cert_path, days_valid, new_expires_at, cert_id)) conn.commit() return True except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() return False def generate_crl(ca_id): ca = get_ca_by_id(ca_id) if not ca: return False crl_path = os.path.join(os.path.dirname(ca['cert_path']), f"crl_{ca['name']}.pem") # 获取所有被吊销的证书 revoked_certs = [] conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT cert_path FROM certificates WHERE ca_id = %s AND status = 'revoked' """, (ca_id,)) revoked_certs = [row['cert_path'] for row in cursor.fetchall()] except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() # 创建索引文件 index_file = os.path.join(os.path.dirname(ca['cert_path']), 'index.txt') if not os.path.exists(index_file): open(index_file, 'w').close() # 生成CRL subprocess.run([ 'openssl', 'ca', '-gencrl', '-out', crl_path, '-keyfile', ca['key_path'], '-cert', ca['cert_path'], '-crldays', '30' ], check=True) # 更新数据库 next_update = datetime.now() + timedelta(days=30) conn = get_db_connection() if conn: try: cursor = conn.cursor() # 检查是否已有CRL记录 cursor.execute("SELECT id FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,)) if cursor.fetchone(): cursor.execute(""" UPDATE certificate_revocation_list SET crl_path = %s, last_updated = NOW(), next_update = %s WHERE ca_id = %s """, (crl_path, next_update, ca_id)) else: cursor.execute(""" INSERT INTO certificate_revocation_list (ca_id, crl_path, next_update) VALUES (%s, %s, %s) """, (ca_id, crl_path, next_update)) conn.commit() return True except Error as e: print(f"Database error: {e}") return False finally: if conn.is_connected(): cursor.close() conn.close() return False def export_pkcs12(cert_id, password): cert = get_certificate_by_id(cert_id) if not cert: return None ca = get_ca_by_id(cert['ca_id']) if not ca: return None pkcs12_path = os.path.join(os.path.dirname(cert['cert_path']), f"{cert['common_name']}.p12") subprocess.run([ 'openssl', 'pkcs12', '-export', '-out', pkcs12_path, '-inkey', cert['key_path'], '-in', cert['cert_path'], '-certfile', ca['cert_path'], '-passout', f'pass:{password}' ], check=True) return pkcs12_path @app.route('/register', methods=['GET', 'POST']) def register(): if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': username = request.form['username'] password = request.form['password'] confirm_password = request.form['confirm_password'] email = request.form.get('email', '') captcha = request.form['captcha'] # 验证验证码 if not verify_captcha(captcha): flash('验证码错误', 'danger') return redirect(url_for('register')) # 验证密码匹配 if password != confirm_password: flash('两次输入的密码不匹配', 'danger') return redirect(url_for('register')) # 验证用户名是否已存在 conn = get_db_connection() if conn: try: cursor = conn.cursor() cursor.execute("SELECT id FROM users WHERE username = %s", (username,)) if cursor.fetchone(): flash('用户名已存在', 'danger') return redirect(url_for('register')) # 创建新用户 password_hash = generate_password_hash(password) cursor.execute(""" INSERT INTO users (username, password_hash, email, is_admin, is_active) VALUES (%s, %s, %s, %s, %s) """, (username, password_hash, email, False, True)) conn.commit() flash('注册成功,请登录', 'success') return redirect(url_for('login')) except Error as e: print(f"Database error: {e}") flash('注册失败,请稍后再试', 'danger') finally: if conn.is_connected(): cursor.close() conn.close() captcha_code = generate_captcha() return render_template('register.html', captcha_code=captcha_code) # 路由定义 @app.route('/') @login_required def index(): return render_template('index.html') @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('index')) if request.method == 'POST': username = request.form['username'] password = request.form['password'] captcha = request.form['captcha'] if not verify_captcha(captcha): flash('验证码错误', 'danger') return redirect(url_for('login')) conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT * FROM users WHERE username = %s AND is_active = TRUE """, (username,)) user_data = cursor.fetchone() if user_data and check_password_hash(user_data['password_hash'], password): user = User() user.id = user_data['id'] user.username = user_data['username'] user.is_admin = user_data['is_admin'] login_user(user) flash('登录成功', 'success') return redirect(url_for('index')) else: flash('用户名或密码错误,或账户未激活', 'danger') except Error as e: print(f"Database error: {e}") flash('登录失败,请稍后再试', 'danger') finally: if conn.is_connected(): cursor.close() conn.close() captcha_code = generate_captcha() return render_template('login.html', captcha_code=captcha_code) @app.route('/logout') @login_required def logout(): logout_user() flash('您已成功登出', 'success') return redirect(url_for('login')) @app.route('/cas') @login_required def ca_list(): conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) if current_user.is_admin: cursor.execute("SELECT * FROM certificate_authorities") else: cursor.execute("SELECT * FROM certificate_authorities WHERE created_by = %s", (current_user.id,)) cas = cursor.fetchall() return render_template('ca_list.html', cas=cas) except Error as e: print(f"Database error: {e}") flash('获取CA列表失败', 'danger') return redirect(url_for('index')) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('index')) @app.route('/cas/create', methods=['GET', 'POST']) @login_required def create_ca_view(): if request.method == 'POST': ca_name = request.form['ca_name'] common_name = request.form['common_name'] organization = request.form['organization'] organizational_unit = request.form['organizational_unit'] country = request.form['country'] state = request.form['state'] locality = request.form['locality'] key_size = int(request.form['key_size']) days_valid = int(request.form['days_valid']) ca_id = create_ca(ca_name, common_name, organization, organizational_unit, country, state, locality, key_size, days_valid, current_user.id) if ca_id: flash('CA创建成功', 'success') return redirect(url_for('ca_list')) else: flash('CA创建失败', 'danger') return render_template('create_ca.html') @app.route('/cas/') @login_required def ca_detail(ca_id): ca = get_ca_by_id(ca_id) if not ca: flash('CA不存在', 'danger') return redirect(url_for('ca_list')) # 检查权限 if not current_user.is_admin and ca['created_by'] != current_user.id: flash('无权访问此CA', 'danger') return redirect(url_for('ca_list')) # 获取该CA颁发的证书 conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT * FROM certificates WHERE ca_id = %s ORDER BY created_at DESC """, (ca_id,)) certificates = cursor.fetchall() # 获取CRL信息 cursor.execute(""" SELECT * FROM certificate_revocation_list WHERE ca_id = %s """, (ca_id,)) crl = cursor.fetchone() return render_template('ca_detail.html', ca=ca, certificates=certificates, crl=crl) except Error as e: print(f"Database error: {e}") flash('获取CA详情失败', 'danger') return redirect(url_for('ca_list')) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('ca_list')) @app.route('/cas//generate_crl') @login_required def generate_crl_view(ca_id): ca = get_ca_by_id(ca_id) if not ca: flash('CA不存在', 'danger') return redirect(url_for('ca_list')) # 检查权限 if not current_user.is_admin and ca['created_by'] != current_user.id: flash('无权操作此CA', 'danger') return redirect(url_for('ca_list')) if generate_crl(ca_id): flash('CRL生成成功', 'success') else: flash('CRL生成失败', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) @app.route('/cas//download_crl') @login_required def download_crl(ca_id): ca = get_ca_by_id(ca_id) if not ca: flash('CA不存在', 'danger') return redirect(url_for('ca_list')) # 检查权限 if not current_user.is_admin and ca['created_by'] != current_user.id: flash('无权操作此CA', 'danger') return redirect(url_for('ca_list')) # 获取CRL路径 conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT crl_path FROM certificate_revocation_list WHERE ca_id = %s """, (ca_id,)) crl = cursor.fetchone() if crl and os.path.exists(crl['crl_path']): return send_from_directory( os.path.dirname(crl['crl_path']), os.path.basename(crl['crl_path']), as_attachment=True ) else: flash('CRL文件不存在', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) except Error as e: print(f"Database error: {e}") flash('获取CRL失败', 'danger') return redirect(url_for('ca_detail', ca_id=ca_id)) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('ca_detail', ca_id=ca_id)) @app.route('/certificates') @login_required def certificate_list(): conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) if current_user.is_admin: cursor.execute(""" SELECT c.*, ca.name as ca_name FROM certificates c JOIN certificate_authorities ca ON c.ca_id = ca.id ORDER BY c.created_at DESC """) else: cursor.execute(""" SELECT c.*, ca.name as ca_name FROM certificates c JOIN certificate_authorities ca ON c.ca_id = ca.id WHERE c.created_by = %s ORDER BY c.created_at DESC """, (current_user.id,)) certificates = cursor.fetchall() return render_template('certificate_list.html', certificates=certificates) except Error as e: print(f"Database error: {e}") flash('获取证书列表失败', 'danger') return redirect(url_for('index')) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('index')) @app.route('/certificates/create', methods=['GET', 'POST']) @login_required def create_certificate_view(): if request.method == 'POST': common_name = request.form['common_name'] san_dns = request.form.get('san_dns', '') san_ip = request.form.get('san_ip', '') organization = request.form['organization'] organizational_unit = request.form['organizational_unit'] country = request.form['country'] state = request.form['state'] locality = request.form['locality'] key_size = int(request.form['key_size']) days_valid = int(request.form['days_valid']) ca_id = int(request.form['ca_id']) cert_id = create_certificate(ca_id, common_name, san_dns, san_ip, organization, organizational_unit, country, state, locality, key_size, days_valid, current_user.id) if cert_id: flash('证书创建成功', 'success') return redirect(url_for('certificate_detail', cert_id=cert_id)) else: flash('证书创建失败', 'danger') # 获取可用的CA conn = get_db_connection() if conn: try: cursor = conn.cursor(dictionary=True) if current_user.is_admin: cursor.execute("SELECT id, name FROM certificate_authorities") else: cursor.execute(""" SELECT id, name FROM certificate_authorities WHERE created_by = %s """, (current_user.id,)) cas = cursor.fetchall() return render_template('create_certificate.html', cas=cas) except Error as e: print(f"Database error: {e}") flash('获取CA列表失败', 'danger') return redirect(url_for('certificate_list')) finally: if conn.is_connected(): cursor.close() conn.close() return redirect(url_for('certificate_list')) @app.route('/certificates/') @login_required def certificate_detail(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权访问此证书', 'danger') return redirect(url_for('certificate_list')) # 获取CA信息 ca = get_ca_by_id(cert['ca_id']) return render_template('certificate_detail.html', cert=cert, ca=ca) @app.route('/certificates//revoke', methods=['GET', 'POST']) @login_required def revoke_certificate_view(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权操作此证书', 'danger') return redirect(url_for('certificate_list')) if cert['status'] == 'revoked': flash('证书已被吊销', 'warning') return redirect(url_for('certificate_detail', cert_id=cert_id)) if request.method == 'POST': reason = request.form['reason'] if revoke_certificate(cert_id, reason): flash('证书吊销成功', 'success') # 更新CRL generate_crl(cert['ca_id']) return redirect(url_for('certificate_detail', cert_id=cert_id)) else: flash('证书吊销失败', 'danger') return render_template('revoke_certificate.html', cert=cert) @app.route('/certificates//renew', methods=['GET', 'POST']) @login_required def renew_certificate_view(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权操作此证书', 'danger') return redirect(url_for('certificate_list')) if request.method == 'POST': days_valid = int(request.form['days_valid']) if renew_certificate(cert_id, days_valid): flash('证书续期成功', 'success') return redirect(url_for('certificate_detail', cert_id=cert_id)) else: flash('证书续期失败', 'danger') return render_template('renew_certificate.html', cert=cert) @app.route('/certificates//export', methods=['GET', 'POST']) @login_required def export_certificate_view(cert_id): cert = get_certificate_by_id(cert_id) if not cert: flash('证书不存在', 'danger') return redirect(url_for('certificate_list')) # 检查权限 if not current_user.is_admin and cert['created_by'] != current_user.id: flash('无权操作此证书', 'danger') return redirect(url_for('certificate_list')) if request.method == 'POST': format_type = request.form['format'] password = request.form.get('password', '') if format_type == 'pkcs12': if not password: flash('PKCS#12格式需要密码', 'danger') return redirect(url_for('export_certificate_view', cert_id=cert_id)) pkcs12_path = export_pkcs12(cert_id, password) if pkcs12_path: return send_from_directory( os.path.dirname(pkcs12_path), os.path.basename(pkcs12_path), as_attachment=True ) else: flash('导出PKCS#12失败', 'danger') elif format_type == 'pem': # 合并证书和私钥为PEM pem_content = "" with open(cert['cert_path'], 'r') as f: pem_content += f.read() with open(cert['key_path'], 'r') as f: pem_content += f.read() return Response( pem_content, mimetype="application/x-pem-file", headers={ "Content-Disposition": f"attachment; filename={cert['common_name']}.pem" } ) elif format_type == 'pem_separate': # 创建内存中的zip文件 memory_file = BytesIO() try: with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf: # 添加证书文件 with open(cert['cert_path'], 'r') as f: cert_content = f.read() zf.writestr(f"{cert['common_name']}.crt", cert_content) # 添加私钥文件 with open(cert['key_path'], 'r') as f: key_content = f.read() zf.writestr(f"{cert['common_name']}.key", key_content) memory_file.seek(0) return Response( memory_file.getvalue(), mimetype="application/zip", headers={ "Content-Disposition": f"attachment; filename={cert['common_name']}_separate.zip" } ) except Exception as e: flash(f'创建ZIP文件失败: {str(e)}', 'danger') return redirect(url_for('export_certificate_view', cert_id=cert_id)) else: flash('不支持的导出格式', 'danger') return render_template('export_certificate.html', cert=cert) @app.route('/download/') @login_required def download_file(filename): # 安全检查:确保文件路径在证书存储目录内 file_path = os.path.join(CERT_STORE, filename) if not os.path.exists(file_path): flash('文件不存在', 'danger') return redirect(url_for('index')) # 检查权限 # 这里需要根据实际业务逻辑检查用户是否有权下载该文件 # 简单示例:只允许下载自己的证书文件 return send_from_directory( os.path.dirname(file_path), os.path.basename(file_path), as_attachment=True ) if __name__ == '__main__': app.run(debug=True, ssl_context='adhoc', host='0.0.0.0', port='9875')