squid_ui/app.py
2025-06-24 11:37:20 +08:00

339 lines
9.5 KiB
Python

from flask import Flask, render_template, request, jsonify, abort, redirect, url_for, session, flash
from functools import wraps
import subprocess
import os
import sqlite3
from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(__name__)
app.secret_key = os.urandom(24)
# 数据库配置
DATABASE = 'config/squid_manager.db'
SQUID_PASSWD_FILE = 'config/squid_passwd'
# 初始化数据库
def init_db():
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
# 创建admin_users表
cursor.execute('''
CREATE TABLE IF NOT EXISTS admin_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
)
''')
# 创建squid_users表
cursor.execute('''
CREATE TABLE IF NOT EXISTS squid_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
is_active INTEGER DEFAULT 1
)
''')
# 创建settings表
cursor.execute('''
CREATE TABLE IF NOT EXISTS settings (
id INTEGER PRIMARY KEY DEFAULT 1,
proxy_address TEXT DEFAULT 'proxy.example.com',
proxy_port TEXT DEFAULT '3128',
CONSTRAINT singleton CHECK (id = 1)
)
''')
# 检查并初始化管理员用户
cursor.execute("SELECT COUNT(*) FROM admin_users")
if cursor.fetchone()[0] == 0:
cursor.execute(
"INSERT INTO admin_users (username, password) VALUES (?, ?)",
('admin', generate_password_hash('admin123'))
)
# 检查并初始化设置
cursor.execute("SELECT COUNT(*) FROM settings")
if cursor.fetchone()[0] == 0:
cursor.execute(
"INSERT INTO settings (proxy_address, proxy_port) VALUES (?, ?)",
('proxy.example.com', '3128')
)
# 初始化squid_passwd文件
if not os.path.exists(SQUID_PASSWD_FILE):
with open(SQUID_PASSWD_FILE, 'w') as f:
f.write('')
conn.commit()
# 数据库连接
def get_db():
db = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
# 更新squid_passwd文件
def update_squid_passwd():
db = get_db()
users = db.execute("SELECT * FROM squid_users").fetchall()
db.close()
# 先创建临时文件
temp_file = SQUID_PASSWD_FILE + '.tmp'
# 清空或创建文件
with open(temp_file, 'w') as f:
f.write('')
# 使用htpasswd命令添加每个用户
for user in users:
if not user['is_active']:
continue # 跳过禁用的用户
try:
subprocess.run([
'htpasswd', '-b',
temp_file,
user['username'],
user['password']
], check=True)
except subprocess.CalledProcessError as e:
print(f"Failed to add user {user['username']}: {e}")
continue
# 替换原文件
os.replace(temp_file, SQUID_PASSWD_FILE)
# 登录装饰器
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# 初始化应用
init_db()
@app.route('/')
@login_required
def index():
db = get_db()
squid_users = db.execute("SELECT * FROM squid_users").fetchall()
settings = db.execute("SELECT * FROM settings WHERE id = 1").fetchone()
db.close()
return render_template('index.html',
user_count=len(squid_users),
settings=settings)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db()
user = db.execute(
"SELECT * FROM admin_users WHERE username = ?", (username,)
).fetchone()
db.close()
if user and check_password_hash(user['password'], password):
session['logged_in'] = True
session['username'] = username
return redirect(url_for('index'))
else:
flash('用户名或密码错误', 'error')
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
@app.route('/clients')
@login_required
def clients():
db = get_db()
users = db.execute("SELECT * FROM squid_users").fetchall()
db.close()
return render_template('clients.html', users=users)
@app.route('/settings', methods=['GET', 'POST'])
@login_required
def settings():
db = get_db()
if request.method == 'POST':
action = request.form.get('action')
if action == 'change_password':
current_password = request.form['current_password']
new_password = request.form['new_password']
confirm_password = request.form['confirm_password']
user = db.execute(
"SELECT * FROM admin_users WHERE username = ?", (session['username'],)
).fetchone()
if not check_password_hash(user['password'], current_password):
flash('当前密码不正确', 'error')
elif new_password != confirm_password:
flash('新密码不匹配', 'error')
else:
db.execute(
"UPDATE admin_users SET password = ? WHERE username = ?",
(generate_password_hash(new_password), session['username'])
)
db.commit()
flash('密码修改成功', 'success')
return redirect(url_for('settings'))
elif action == 'update_proxy':
proxy_address = request.form['proxy_address']
proxy_port = request.form['proxy_port']
db.execute(
"UPDATE settings SET proxy_address = ?, proxy_port = ? WHERE id = 1",
(proxy_address, proxy_port)
)
db.commit()
flash('代理设置更新成功', 'success')
return redirect(url_for('settings'))
settings = db.execute("SELECT * FROM settings WHERE id = 1").fetchone()
db.close()
return render_template('settings.html', settings=settings)
# API路由
@app.route('/api/toggle_user/<int:user_id>', methods=['POST'])
@login_required
def toggle_user(user_id):
db = get_db()
user = db.execute("SELECT is_active FROM squid_users WHERE id = ?", (user_id,)).fetchone()
if user:
new_status = 0 if user['is_active'] else 1
db.execute(
"UPDATE squid_users SET is_active = ? WHERE id = ?",
(new_status, user_id)
)
db.commit()
update_squid_passwd()
db.close()
return jsonify({'success': True})
@app.route('/api/delete_user/<int:user_id>', methods=['POST'])
@login_required
def delete_user(user_id):
db = get_db()
db.execute("DELETE FROM squid_users WHERE id = ?", (user_id,))
db.commit()
update_squid_passwd()
db.close()
return jsonify({'success': True})
@app.route('/api/create_user', methods=['POST'])
@login_required
def create_user():
username = request.json.get('username')
password = request.json.get('password')
if not username or not password:
return jsonify({'success': False, 'error': '用户名和密码不能为空'}), 400
db = get_db()
try:
# 先存入明文密码到数据库
db.execute(
"INSERT INTO squid_users (username, password) VALUES (?, ?)",
(username, password)
)
db.commit()
# 更新squid_passwd文件
update_squid_passwd()
return jsonify({'success': True})
except sqlite3.IntegrityError:
return jsonify({'success': False, 'error': '用户名已存在'}), 400
finally:
db.close()
@app.route('/api/create_user', methods=['POST'])
@login_required
def create_user():
username = request.json.get('username')
password = request.json.get('password')
if not username or not password:
return jsonify({'success': False, 'error': '用户名和密码不能为空'}), 400
db = get_db()
try:
# 先存入明文密码到数据库
db.execute(
"INSERT INTO squid_users (username, password) VALUES (?, ?)",
(username, password)
)
db.commit()
# 更新squid_passwd文件
update_squid_passwd()
return jsonify({'success': True})
except sqlite3.IntegrityError:
return jsonify({'success': False, 'error': '用户名已存在'}), 400
finally:
db.close()
@app.route('/api/update_user_password', methods=['POST'])
@login_required
def update_user_password():
user_id = request.json.get('user_id')
password = request.json.get('password')
if not user_id or not password:
return jsonify({'success': False, 'error': '参数不完整'}), 400
db = get_db()
db.execute(
"UPDATE squid_users SET password = ? WHERE id = ?",
(password, user_id)
)
db.commit()
# 更新squid_passwd文件
update_squid_passwd()
db.close()
return jsonify({'success': True})
if __name__ == '__main__':
if not os.path.exists('config'):
os.makedirs('config')
app.run(host='0.0.0.0', port=8080, debug=True)