222 lines
5.9 KiB
Python
222 lines
5.9 KiB
Python
from flask import Flask, render_template, request, jsonify, abort, redirect, url_for, session
|
|
from functools import wraps
|
|
import os
|
|
import subprocess
|
|
import json
|
|
import uuid
|
|
from math import ceil
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = str(uuid.uuid4())
|
|
|
|
# 配置文件路径
|
|
CONFIG_FILE = 'config/config.json'
|
|
SQUID_PASSWD_FILE = 'config/squid_passwd'
|
|
|
|
|
|
# 加载配置
|
|
def load_config():
|
|
try:
|
|
with open(CONFIG_FILE, 'r') as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
# 默认配置
|
|
default_config = {
|
|
"admin_password": "admin123",
|
|
"proxy_address": "127.0.0.1",
|
|
"proxy_port": "3128"
|
|
}
|
|
os.makedirs('config', exist_ok=True)
|
|
with open(CONFIG_FILE, 'w') as f:
|
|
json.dump(default_config, f, indent=4)
|
|
return default_config
|
|
|
|
|
|
# 保存配置
|
|
def save_config(config):
|
|
with open(CONFIG_FILE, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
|
|
|
|
class User:
|
|
def __init__(self, name, password, is_active=True):
|
|
self.name = name
|
|
self.password = password
|
|
self.is_active = is_active
|
|
|
|
|
|
def basic_auth_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
config = load_config()
|
|
auth = request.authorization
|
|
if not auth or not (auth.username == 'admin' and auth.password == config['admin_password']):
|
|
return ('Unauthorized', 401,
|
|
{'WWW-Authenticate': 'Basic realm="Authorization Required"'})
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
|
|
def read_squid_file():
|
|
users = []
|
|
try:
|
|
with open(SQUID_PASSWD_FILE, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
is_active = not line.startswith('#')
|
|
if not is_active:
|
|
line = line[1:]
|
|
|
|
parts = line.split(':', 1)
|
|
if len(parts) == 2:
|
|
users.append(User(parts[0], parts[1], is_active))
|
|
except FileNotFoundError:
|
|
pass
|
|
return users
|
|
|
|
|
|
def write_squid_file(users):
|
|
with open(SQUID_PASSWD_FILE, 'w') as f:
|
|
for user in users:
|
|
line = f"{'#' if not user.is_active else ''}{user.name}:{user.password}\n"
|
|
f.write(line)
|
|
|
|
|
|
def create_user(username, password):
|
|
users = read_squid_file()
|
|
if any(u.name == username for u in users):
|
|
return False
|
|
|
|
try:
|
|
subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
|
|
@app.route('/')
|
|
@basic_auth_required
|
|
def index():
|
|
config = load_config()
|
|
users = read_squid_file()
|
|
return render_template('index.html',
|
|
user_count=len(users),
|
|
proxy_address=config['proxy_address'],
|
|
proxy_port=config['proxy_port'])
|
|
|
|
# 在 clients 路由中增加分页逻辑
|
|
@app.route('/clients')
|
|
@basic_auth_required
|
|
def clients():
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 5
|
|
|
|
users = read_squid_file()
|
|
total_pages = ceil(len(users) / per_page)
|
|
|
|
start = (page - 1) * per_page
|
|
end = start + per_page
|
|
paginated_users = users[start:end]
|
|
|
|
return render_template('clients.html',
|
|
users=paginated_users,
|
|
page=page,
|
|
total_pages=total_pages)
|
|
|
|
|
|
# 其他代码保持不变...
|
|
|
|
|
|
@app.route('/settings')
|
|
@basic_auth_required
|
|
def settings():
|
|
config = load_config()
|
|
return render_template('settings.html', config=config)
|
|
|
|
|
|
@app.route('/update_settings', methods=['POST'])
|
|
@basic_auth_required
|
|
def update_settings():
|
|
config = load_config()
|
|
new_password = request.form.get('admin_password')
|
|
proxy_address = request.form.get('proxy_address')
|
|
proxy_port = request.form.get('proxy_port')
|
|
|
|
if new_password:
|
|
config['admin_password'] = new_password
|
|
if proxy_address:
|
|
config['proxy_address'] = proxy_address
|
|
if proxy_port:
|
|
config['proxy_port'] = proxy_port
|
|
|
|
save_config(config)
|
|
return redirect(url_for('settings'))
|
|
|
|
|
|
@app.route('/toggle/<username>', methods=['POST'])
|
|
@basic_auth_required
|
|
def toggle_user(username):
|
|
users = read_squid_file()
|
|
for user in users:
|
|
if user.name == username:
|
|
user.is_active = not user.is_active
|
|
break
|
|
write_squid_file(users)
|
|
return '', 200
|
|
|
|
|
|
@app.route('/delete/<username>', methods=['POST'])
|
|
@basic_auth_required
|
|
def delete_user(username):
|
|
users = [u for u in read_squid_file() if u.name != username]
|
|
write_squid_file(users)
|
|
return '', 200
|
|
|
|
|
|
@app.route('/save_user', methods=['POST'])
|
|
@basic_auth_required
|
|
def save_user():
|
|
data = request.get_json()
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
if not username or not password:
|
|
return jsonify({'error': 'Username and password required'}), 400
|
|
|
|
try:
|
|
subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True)
|
|
return '', 200
|
|
except subprocess.CalledProcessError:
|
|
return jsonify({'error': 'Failed to update password'}), 500
|
|
|
|
|
|
@app.route('/create_user', methods=['POST'])
|
|
@basic_auth_required
|
|
def handle_create_user():
|
|
data = request.get_json()
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
if not username or not password:
|
|
return jsonify({'error': 'Username and password required'}), 400
|
|
|
|
if create_user(username, password):
|
|
return '', 200
|
|
else:
|
|
return jsonify({'error': 'User already exists'}), 409
|
|
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
return ('', 401, {'WWW-Authenticate': 'Basic realm="Authorization Required"'})
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if not os.path.exists('config'):
|
|
os.makedirs('config')
|
|
load_config() # 初始化配置文件
|
|
app.run(host='0.0.0.0', port=8080, debug=True) |