from flask import Flask, render_template, request, jsonify, abort, redirect, url_for, session from functools import wraps import os import subprocess import json import uuid app = Flask(__name__) app.secret_key = str(uuid.uuid4()) # 配置文件路径 CONFIG_FILE = 'config/config.json' SQUID_PASSWD_FILE = 'config/squid_passwd' # 加载配置 def load_config(): try: with open(CONFIG_FILE, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): # 默认配置 default_config = { "admin_password": "admin123", "proxy_address": "127.0.0.1", "proxy_port": "3128" } os.makedirs('config', exist_ok=True) with open(CONFIG_FILE, 'w') as f: json.dump(default_config, f, indent=4) return default_config # 保存配置 def save_config(config): with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=4) class User: def __init__(self, name, password, is_active=True): self.name = name self.password = password self.is_active = is_active def basic_auth_required(f): @wraps(f) def decorated(*args, **kwargs): config = load_config() auth = request.authorization if not auth or not (auth.username == 'admin' and auth.password == config['admin_password']): return ('Unauthorized', 401, {'WWW-Authenticate': 'Basic realm="Authorization Required"'}) return f(*args, **kwargs) return decorated def read_squid_file(): users = [] try: with open(SQUID_PASSWD_FILE, 'r') as f: for line in f: line = line.strip() if not line: continue is_active = not line.startswith('#') if not is_active: line = line[1:] parts = line.split(':', 1) if len(parts) == 2: users.append(User(parts[0], parts[1], is_active)) except FileNotFoundError: pass return users def write_squid_file(users): with open(SQUID_PASSWD_FILE, 'w') as f: for user in users: line = f"{'#' if not user.is_active else ''}{user.name}:{user.password}\n" f.write(line) def create_user(username, password): users = read_squid_file() if any(u.name == username for u in users): return False try: subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True) return True except subprocess.CalledProcessError: return False @app.route('/') @basic_auth_required def index(): config = load_config() users = read_squid_file() return render_template('index.html', user_count=len(users), proxy_address=config['proxy_address'], proxy_port=config['proxy_port']) @app.route('/clients') @basic_auth_required def clients(): users = read_squid_file() return render_template('clients.html', users=users) @app.route('/settings') @basic_auth_required def settings(): config = load_config() return render_template('settings.html', config=config) @app.route('/update_settings', methods=['POST']) @basic_auth_required def update_settings(): config = load_config() new_password = request.form.get('admin_password') proxy_address = request.form.get('proxy_address') proxy_port = request.form.get('proxy_port') if new_password: config['admin_password'] = new_password if proxy_address: config['proxy_address'] = proxy_address if proxy_port: config['proxy_port'] = proxy_port save_config(config) return redirect(url_for('settings')) @app.route('/toggle/', methods=['POST']) @basic_auth_required def toggle_user(username): users = read_squid_file() for user in users: if user.name == username: user.is_active = not user.is_active break write_squid_file(users) return '', 200 @app.route('/delete/', methods=['POST']) @basic_auth_required def delete_user(username): users = [u for u in read_squid_file() if u.name != username] write_squid_file(users) return '', 200 @app.route('/save_user', methods=['POST']) @basic_auth_required def save_user(): data = request.get_json() username = data.get('username') password = data.get('password') if not username or not password: return jsonify({'error': 'Username and password required'}), 400 try: subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True) return '', 200 except subprocess.CalledProcessError: return jsonify({'error': 'Failed to update password'}), 500 @app.route('/create_user', methods=['POST']) @basic_auth_required def handle_create_user(): data = request.get_json() username = data.get('username') password = data.get('password') if not username or not password: return jsonify({'error': 'Username and password required'}), 400 if create_user(username, password): return '', 200 else: return jsonify({'error': 'User already exists'}), 409 @app.route('/logout') def logout(): return ('', 401, {'WWW-Authenticate': 'Basic realm="Authorization Required"'}) if __name__ == '__main__': if not os.path.exists('config'): os.makedirs('config') load_config() # 初始化配置文件 app.run(host='0.0.0.0', port=8080, debug=True)