307 lines
8.6 KiB
Python
307 lines
8.6 KiB
Python
from flask import Flask, render_template, request, jsonify, abort, redirect, url_for, session, flash
|
|
from functools import wraps
|
|
import subprocess
|
|
import os
|
|
import sqlite3
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.urandom(24)
|
|
|
|
# 数据库配置
|
|
DATABASE = 'config/squid_manager.db'
|
|
SQUID_PASSWD_FILE = 'config/squid_passwd'
|
|
|
|
|
|
# 初始化数据库
|
|
def init_db():
|
|
with sqlite3.connect(DATABASE) as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# 创建admin_users表
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS admin_users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password TEXT NOT NULL
|
|
)
|
|
''')
|
|
|
|
# 创建squid_users表
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS squid_users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password TEXT NOT NULL,
|
|
is_active INTEGER DEFAULT 1
|
|
)
|
|
''')
|
|
|
|
# 创建settings表
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS settings (
|
|
id INTEGER PRIMARY KEY DEFAULT 1,
|
|
proxy_address TEXT DEFAULT 'proxy.example.com',
|
|
proxy_port TEXT DEFAULT '3128',
|
|
CONSTRAINT singleton CHECK (id = 1)
|
|
)
|
|
''')
|
|
|
|
# 检查并初始化管理员用户
|
|
cursor.execute("SELECT COUNT(*) FROM admin_users")
|
|
if cursor.fetchone()[0] == 0:
|
|
cursor.execute(
|
|
"INSERT INTO admin_users (username, password) VALUES (?, ?)",
|
|
('admin', generate_password_hash('admin123'))
|
|
)
|
|
|
|
# 检查并初始化设置
|
|
cursor.execute("SELECT COUNT(*) FROM settings")
|
|
if cursor.fetchone()[0] == 0:
|
|
cursor.execute(
|
|
"INSERT INTO settings (proxy_address, proxy_port) VALUES (?, ?)",
|
|
('proxy.example.com', '3128')
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
# 数据库连接
|
|
|
|
|
|
def get_db():
|
|
db = sqlite3.connect(DATABASE)
|
|
db.row_factory = sqlite3.Row
|
|
return db
|
|
|
|
|
|
# 更新squid_passwd文件
|
|
def update_squid_passwd():
|
|
db = get_db()
|
|
users = db.execute("SELECT * FROM squid_users").fetchall()
|
|
db.close()
|
|
|
|
# 先创建临时文件
|
|
temp_file = SQUID_PASSWD_FILE + '.tmp'
|
|
|
|
# 清空或创建文件
|
|
with open(temp_file, 'w') as f:
|
|
f.write('')
|
|
|
|
# 使用htpasswd命令添加每个用户
|
|
for user in users:
|
|
if not user['is_active']:
|
|
continue # 跳过禁用的用户
|
|
|
|
try:
|
|
subprocess.run([
|
|
'htpasswd', '-b',
|
|
temp_file,
|
|
user['username'],
|
|
user['password']
|
|
], check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Failed to add user {user['username']}: {e}")
|
|
continue
|
|
|
|
# 替换原文件
|
|
os.replace(temp_file, SQUID_PASSWD_FILE)
|
|
|
|
# 登录装饰器
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'logged_in' not in session:
|
|
return redirect(url_for('login'))
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
# 初始化应用
|
|
init_db()
|
|
|
|
|
|
@app.route('/')
|
|
@login_required
|
|
def index():
|
|
db = get_db()
|
|
squid_users = db.execute("SELECT * FROM squid_users").fetchall()
|
|
settings = db.execute("SELECT * FROM settings WHERE id = 1").fetchone()
|
|
db.close()
|
|
|
|
return render_template('index.html',
|
|
user_count=len(squid_users),
|
|
settings=settings)
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
db = get_db()
|
|
user = db.execute(
|
|
"SELECT * FROM admin_users WHERE username = ?", (username,)
|
|
).fetchone()
|
|
db.close()
|
|
|
|
if user and check_password_hash(user['password'], password):
|
|
session['logged_in'] = True
|
|
session['username'] = username
|
|
return redirect(url_for('index'))
|
|
else:
|
|
flash('用户名或密码错误', 'error')
|
|
|
|
return render_template('login.html')
|
|
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
@app.route('/clients')
|
|
@login_required
|
|
def clients():
|
|
db = get_db()
|
|
users = db.execute("SELECT * FROM squid_users").fetchall()
|
|
db.close()
|
|
return render_template('clients.html', users=users)
|
|
|
|
|
|
@app.route('/settings', methods=['GET', 'POST'])
|
|
@login_required
|
|
def settings():
|
|
db = get_db()
|
|
|
|
if request.method == 'POST':
|
|
action = request.form.get('action')
|
|
|
|
if action == 'change_password':
|
|
current_password = request.form['current_password']
|
|
new_password = request.form['new_password']
|
|
confirm_password = request.form['confirm_password']
|
|
|
|
user = db.execute(
|
|
"SELECT * FROM admin_users WHERE username = ?", (session['username'],)
|
|
).fetchone()
|
|
|
|
if not check_password_hash(user['password'], current_password):
|
|
flash('当前密码不正确', 'error')
|
|
elif new_password != confirm_password:
|
|
flash('新密码不匹配', 'error')
|
|
else:
|
|
db.execute(
|
|
"UPDATE admin_users SET password = ? WHERE username = ?",
|
|
(generate_password_hash(new_password), session['username'])
|
|
)
|
|
db.commit()
|
|
flash('密码修改成功', 'success')
|
|
return redirect(url_for('settings'))
|
|
|
|
elif action == 'update_proxy':
|
|
proxy_address = request.form['proxy_address']
|
|
proxy_port = request.form['proxy_port']
|
|
|
|
db.execute(
|
|
"UPDATE settings SET proxy_address = ?, proxy_port = ? WHERE id = 1",
|
|
(proxy_address, proxy_port)
|
|
)
|
|
db.commit()
|
|
flash('代理设置更新成功', 'success')
|
|
return redirect(url_for('settings'))
|
|
|
|
settings = db.execute("SELECT * FROM settings WHERE id = 1").fetchone()
|
|
db.close()
|
|
|
|
return render_template('settings.html', settings=settings)
|
|
|
|
|
|
# API路由
|
|
@app.route('/api/toggle_user/<int:user_id>', methods=['POST'])
|
|
@login_required
|
|
def toggle_user(user_id):
|
|
db = get_db()
|
|
user = db.execute("SELECT is_active FROM squid_users WHERE id = ?", (user_id,)).fetchone()
|
|
|
|
if user:
|
|
new_status = 0 if user['is_active'] else 1
|
|
db.execute(
|
|
"UPDATE squid_users SET is_active = ? WHERE id = ?",
|
|
(new_status, user_id)
|
|
)
|
|
db.commit()
|
|
update_squid_passwd()
|
|
|
|
db.close()
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@app.route('/api/delete_user/<int:user_id>', methods=['POST'])
|
|
@login_required
|
|
def delete_user(user_id):
|
|
db = get_db()
|
|
db.execute("DELETE FROM squid_users WHERE id = ?", (user_id,))
|
|
db.commit()
|
|
update_squid_passwd()
|
|
db.close()
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@app.route('/api/create_user', methods=['POST'])
|
|
@login_required
|
|
def create_user():
|
|
username = request.json.get('username')
|
|
password = request.json.get('password')
|
|
|
|
if not username or not password:
|
|
return jsonify({'success': False, 'error': '用户名和密码不能为空'}), 400
|
|
|
|
db = get_db()
|
|
try:
|
|
# 先存入明文密码到数据库
|
|
db.execute(
|
|
"INSERT INTO squid_users (username, password) VALUES (?, ?)",
|
|
(username, password)
|
|
)
|
|
db.commit()
|
|
|
|
# 更新squid_passwd文件
|
|
update_squid_passwd()
|
|
|
|
return jsonify({'success': True})
|
|
except sqlite3.IntegrityError:
|
|
return jsonify({'success': False, 'error': '用户名已存在'}), 400
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.route('/api/update_user_password', methods=['POST'])
|
|
@login_required
|
|
def update_user_password():
|
|
user_id = request.json.get('user_id')
|
|
password = request.json.get('password')
|
|
|
|
if not user_id or not password:
|
|
return jsonify({'success': False, 'error': '参数不完整'}), 400
|
|
|
|
db = get_db()
|
|
db.execute(
|
|
"UPDATE squid_users SET password = ? WHERE id = ?",
|
|
(password, user_id)
|
|
)
|
|
db.commit()
|
|
|
|
# 更新squid_passwd文件
|
|
update_squid_passwd()
|
|
|
|
db.close()
|
|
return jsonify({'success': True})
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if not os.path.exists('config'):
|
|
os.makedirs('config')
|
|
app.run(host='0.0.0.0', port=8080, debug=True) |