diff --git a/app.py b/app.py index bc7854a..1c520f0 100644 --- a/app.py +++ b/app.py @@ -14,6 +14,9 @@ import zipfile import shutil import re from pypinyin import pinyin, Style +from flask import send_file +from PIL import Image, ImageDraw, ImageFont + from flask_migrate import Migrate # 从配置文件中导入配置 @@ -22,6 +25,10 @@ from config import Config app = Flask(__name__) app.config.from_object(Config) +@app.context_processor +def inject_now(): + return {'now': datetime.now()} + # 确保证书存储目录存在 os.makedirs(Config.CERT_STORE, exist_ok=True) @@ -37,6 +44,89 @@ login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' +from io import BytesIO +from flask import send_file +import random +import string +from PIL import Image, ImageDraw, ImageFont + + +def generate_captcha_image(): + # 生成4位随机验证码(字母和数字) + captcha_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) + + # 创建图片(120x40像素,白色背景) + image = Image.new('RGB', (120, 40), color=(255, 255, 255)) + draw = ImageDraw.Draw(image) + + # 尝试加载字体,失败则使用默认字体 + try: + font = ImageFont.truetype("arial.ttf", 24) + except: + font = ImageFont.load_default() + + # 绘制验证码文本(每个字符随机颜色) + for i, char in enumerate(captcha_code): + text_color = ( + random.randint(0, 150), # R + random.randint(0, 150), # G + random.randint(0, 150) # B + ) + draw.text( + (10 + i * 25, 5), # 位置 + char, # 文本 + font=font, # 字体 + fill=text_color # 颜色 + ) + + # 添加5条干扰线(随机位置和颜色) + for _ in range(5): + line_color = ( + random.randint(0, 255), # R + random.randint(0, 255), # G + random.randint(0, 255) # B + ) + draw.line( + [ + random.randint(0, 120), # x1 + random.randint(0, 40), # y1 + random.randint(0, 120), # x2 + random.randint(0, 40) # y2 + ], + fill=line_color, + width=1 + ) + + # 保存验证码到数据库 + conn = get_db_connection() + if conn: + try: + cursor = conn.cursor() + # 清除10分钟前的旧验证码 + cursor.execute("DELETE FROM captcha WHERE created_at < NOW() - INTERVAL 10 MINUTE") + # 插入新验证码 + cursor.execute("INSERT INTO captcha (code) VALUES (%s)", (captcha_code,)) + conn.commit() + except Error as e: + print(f"Database error: {e}") + finally: + if conn.is_connected(): + cursor.close() + conn.close() + + # 返回图片字节流 + img_io = BytesIO() + image.save(img_io, 'PNG') + img_io.seek(0) + return img_io, captcha_code + + +@app.route('/captcha') +def captcha(): + img_io, _ = generate_captcha_image() + return send_file(img_io, mimetype='image/png') + + def to_pinyin(text): """将中文转换为拼音""" if not text: @@ -608,9 +698,120 @@ def register(): @app.route('/') @login_required def index(): - return render_template('index.html') + conn = get_db_connection() + if conn: + try: + cursor = conn.cursor(dictionary=True) + # 获取CA数量 + if current_user.is_admin: + cursor.execute("SELECT COUNT(*) as count FROM certificate_authorities") + ca_count = cursor.fetchone()['count'] + cursor.fetchall() # 确保清空结果集 + cursor.execute(""" + SELECT * FROM certificate_authorities + ORDER BY created_at DESC LIMIT 5 + """) + else: + cursor.execute(""" + SELECT COUNT(*) as count FROM certificate_authorities + WHERE created_by = %s + """, (current_user.id,)) + ca_count = cursor.fetchone()['count'] + cursor.fetchall() # 确保清空结果集 + + cursor.execute(""" + SELECT * FROM certificate_authorities + WHERE created_by = %s + ORDER BY created_at DESC LIMIT 5 + """, (current_user.id,)) + recent_cas = cursor.fetchall() + + # 获取证书数量 + if current_user.is_admin: + cursor.execute("SELECT COUNT(*) as count FROM certificates") + cert_count = cursor.fetchone()['count'] + cursor.fetchall() # 确保清空结果集 + + cursor.execute(""" + SELECT * FROM certificates + ORDER BY created_at DESC LIMIT 5 + """) + else: + cursor.execute(""" + SELECT COUNT(*) as count FROM certificates + WHERE created_by = %s + """, (current_user.id,)) + cert_count = cursor.fetchone()['count'] + cursor.fetchall() # 确保清空结果集 + + cursor.execute(""" + SELECT * FROM certificates + WHERE created_by = %s + ORDER BY created_at DESC LIMIT 5 + """, (current_user.id,)) + recent_certs = cursor.fetchall() + + # 获取即将过期证书数量(30天内) + if current_user.is_admin: + cursor.execute(""" + SELECT COUNT(*) as count FROM certificates + WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY) + AND status = 'active' + """) + else: + cursor.execute(""" + SELECT COUNT(*) as count FROM certificates + WHERE expires_at BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 30 DAY) + AND status = 'active' AND created_by = %s + """, (current_user.id,)) + expiring_soon_count = cursor.fetchone()['count'] + cursor.fetchall() # 确保清空结果集 + + # 获取活跃证书数量 + if current_user.is_admin: + cursor.execute(""" + SELECT COUNT(*) as count FROM certificates + WHERE status = 'active' + """) + else: + cursor.execute(""" + SELECT COUNT(*) as count FROM certificates + WHERE status = 'active' AND created_by = %s + """, (current_user.id,)) + active_count = cursor.fetchone()['count'] + cursor.fetchall() # 确保清空结果集 + + return render_template('index.html', + ca_count=ca_count, + cert_count=cert_count, + expiring_soon_count=expiring_soon_count, + active_count=active_count, + recent_cas=recent_cas, + recent_certs=recent_certs) + + except Error as e: + print(f"Database error: {e}") + flash('获取系统统计信息失败', 'danger') + return render_template('index.html', + ca_count=0, + cert_count=0, + expiring_soon_count=0, + active_count=0, + recent_cas=[], + recent_certs=[]) + finally: + if conn.is_connected(): + cursor.close() + conn.close() + return render_template('index.html', + ca_count=0, + cert_count=0, + expiring_soon_count=0, + active_count=0, + recent_cas=[], + recent_certs=[]) @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: @@ -677,7 +878,7 @@ def ca_list(): else: cursor.execute("SELECT * FROM certificate_authorities WHERE created_by = %s", (current_user.id,)) cas = cursor.fetchall() - return render_template('ca_list.html', cas=cas) + return render_template('ca_list.html', cas=cas, get_username=get_username) except Error as e: print(f"Database error: {e}") flash('获取CA列表失败', 'danger') @@ -899,7 +1100,7 @@ def certificate_list(): ORDER BY c.created_at DESC """, (current_user.id,)) certificates = cursor.fetchall() - return render_template('certificate_list.html', certificates=certificates) + return render_template('certificate_list.html', certificates=certificates, get_username=get_username) except Error as e: print(f"Database error: {e}") flash('获取证书列表失败', 'danger') @@ -1372,4 +1573,4 @@ def download_file(filename): if __name__ == '__main__': - app.run(debug=Config.DEBUG, ssl_context='adhoc', host=Config.APP_HOST, port=Config.APP_PORT) \ No newline at end of file + app.run(debug=Config.DEBUG, host=Config.APP_HOST, port=Config.APP_PORT) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index a9ab10b..da28dbc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,7 @@ Flask==2.0.1 Flask-Login==0.5.0 mysql-connector-python==8.0.26 python-dotenv==0.19.0 -cryptography==2.0 \ No newline at end of file +cryptography==2.0 +pypinyin +shutil +flask_migrate \ No newline at end of file diff --git a/templates/base.html b/templates/base.html index bead1ac..bf235b3 100644 --- a/templates/base.html +++ b/templates/base.html @@ -5,51 +5,147 @@