from flask import Flask, render_template, request, jsonify, abort, redirect, url_for, session from functools import wraps import os import subprocess import json import uuid from math import ceil from flask import flash app = Flask(__name__) app.secret_key = str(uuid.uuid4()) # 配置文件路径 CONFIG_FILE = 'config/config.json' SQUID_PASSWD_FILE = 'config/squid_passwd' USERS_FILE = 'config/users.json' # 新增用户信息存储文件 # 加载配置 def load_config(): try: with open(CONFIG_FILE, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): # 默认配置 default_config = { "admin_password": "admin123", "proxy_address": "127.0.0.1", "proxy_port": "3128" } os.makedirs('config', exist_ok=True) with open(CONFIG_FILE, 'w') as f: json.dump(default_config, f, indent=4) return default_config # 保存配置 def save_config(config): with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=4) class User: def __init__(self, name, password, is_active=True): self.name = name self.password = password # 这里存储明文密码 self.is_active = is_active def basic_auth_required(f): @wraps(f) def decorated(*args, **kwargs): config = load_config() auth = request.authorization if not auth or not (auth.username == 'admin' and auth.password == config['admin_password']): return ('Unauthorized', 401, {'WWW-Authenticate': 'Basic realm="Authorization Required"'}) return f(*args, **kwargs) return decorated def read_squid_file(): users = [] try: with open(SQUID_PASSWD_FILE, 'r') as f: for line in f: line = line.strip() if not line: continue is_active = not line.startswith('#') if not is_active: line = line[1:] parts = line.split(':', 1) if len(parts) == 2: # 从users.json中获取明文密码 users_data = load_users_data() plain_password = next((u['password'] for u in users_data if u['username'] == parts[0]), None) users.append(User(parts[0], plain_password, is_active)) except FileNotFoundError: pass return users def write_squid_file(users): """只更新squid_passwd文件的状态(是否注释),不修改密码内容""" try: # 读取现有加密密码 with open(SQUID_PASSWD_FILE, 'r') as f: existing_lines = f.readlines() # 创建用户名到密码的映射 existing_passwords = {} for line in existing_lines: line = line.strip() if not line: continue active = not line.startswith('#') if not active: line = line[1:] parts = line.split(':', 1) if len(parts) == 2: existing_passwords[parts[0]] = parts[1] # 写入新文件,保持加密密码不变 with open(SQUID_PASSWD_FILE, 'w') as f: for user in users: encrypted_password = existing_passwords.get(user.name, user.password) line = f"{'#' if not user.is_active else ''}{user.name}:{encrypted_password}\n" f.write(line) except FileNotFoundError: # 如果文件不存在,创建新文件 with open(SQUID_PASSWD_FILE, 'w') as f: for user in users: # 首次创建时需要使用htpasswd命令生成加密密码 subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, user.name, user.password], check=True) # 如果不是活跃用户,需要添加注释 if not user.is_active: with open(SQUID_PASSWD_FILE, 'r') as f_read: content = f_read.read() with open(SQUID_PASSWD_FILE, 'w') as f_write: f_write.write(f"#{content}") def load_users_data(): """加载用户明文数据""" try: with open(USERS_FILE, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return [] def save_users_to_json(users): """保存用户明文信息到JSON文件""" users_data = [{'username': u.name, 'password': u.password, 'active': u.is_active} for u in users] with open(USERS_FILE, 'w') as f: json.dump(users_data, f, indent=4) def create_user(username, password): users = read_squid_file() if any(u.name == username for u in users): return False try: # 先保存明文密码到users.json users.append(User(username, password, True)) save_users_to_json(users) # 然后创建加密密码 subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True) return True except subprocess.CalledProcessError: return False @app.route('/') @basic_auth_required def index(): config = load_config() users = read_squid_file() return render_template('index.html', user_count=len(users), proxy_address=config['proxy_address'], proxy_port=config['proxy_port']) @app.route('/clients') @basic_auth_required def clients(): page = request.args.get('page', 1, type=int) per_page = 5 users = read_squid_file() total_pages = ceil(len(users) / per_page) start = (page - 1) * per_page end = start + per_page paginated_users = users[start:end] config = load_config() return render_template('clients.html', users=paginated_users, page=page, total_pages=total_pages, proxy_address=config['proxy_address'], proxy_port=config['proxy_port']) @app.route('/settings') @basic_auth_required def settings(): config = load_config() return render_template('settings.html', config=config) @app.route('/update_settings', methods=['POST']) @basic_auth_required def update_settings(): config = load_config() new_password = request.form.get('admin_password') proxy_address = request.form.get('proxy_address') proxy_port = request.form.get('proxy_port') if new_password: config['admin_password'] = new_password if proxy_address: config['proxy_address'] = proxy_address if proxy_port: config['proxy_port'] = proxy_port save_config(config) flash('设置已成功保存!', 'success') return redirect(url_for('settings')) @app.route('/toggle/', methods=['POST']) @basic_auth_required def toggle_user(username): users = read_squid_file() for user in users: if user.name == username: user.is_active = not user.is_active break write_squid_file(users) save_users_to_json(users) # 更新users.json return '', 200 @app.route('/delete/', methods=['POST']) @basic_auth_required def delete_user(username): users = [u for u in read_squid_file() if u.name != username] write_squid_file(users) save_users_to_json(users) # 更新users.json return '', 200 @app.route('/save_user', methods=['POST']) @basic_auth_required def save_user(): data = request.get_json() username = data.get('username') password = data.get('password') if not username or not password: return jsonify({'error': 'Username and password required'}), 400 try: # 更新明文密码 users = read_squid_file() for user in users: if user.name == username: user.password = password break save_users_to_json(users) # 更新加密密码 subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True) return '', 200 except subprocess.CalledProcessError: return jsonify({'error': 'Failed to update password'}), 500 @app.route('/create_user', methods=['POST']) @basic_auth_required def handle_create_user(): data = request.get_json() username = data.get('username') password = data.get('password') if not username or not password: return jsonify({'error': 'Username and password required'}), 400 if create_user(username, password): return '', 200 else: return jsonify({'error': 'User already exists'}), 409 @app.route('/copy_proxy/', methods=['POST']) @basic_auth_required def copy_proxy(username): config = load_config() users = read_squid_file() user = next((u for u in users if u.name == username), None) if not user: return jsonify({'error': 'User not found'}), 404 proxy_url = f"http://{user.name}:{user.password}@{config['proxy_address']}:{config['proxy_port']}" return jsonify({ 'message': 'Proxy URL copied to clipboard (simulated)', 'proxy_url': proxy_url }) @app.route('/logout') def logout(): return ('', 401, {'WWW-Authenticate': 'Basic realm="Authorization Required"'}) if __name__ == '__main__': if not os.path.exists('config'): os.makedirs('config') load_config() # 初始化配置文件 app.run(host='0.0.0.0', port=8080, debug=True)