312 lines
9.4 KiB
Python
312 lines
9.4 KiB
Python
from flask import Flask, render_template, request, jsonify, abort, redirect, url_for, session
|
||
from functools import wraps
|
||
import os
|
||
import subprocess
|
||
import json
|
||
import uuid
|
||
from math import ceil
|
||
from flask import flash
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = str(uuid.uuid4())
|
||
|
||
# 配置文件路径
|
||
CONFIG_FILE = 'config/config.json'
|
||
SQUID_PASSWD_FILE = 'config/squid_passwd'
|
||
USERS_FILE = 'config/users.json' # 新增用户信息存储文件
|
||
|
||
|
||
# 加载配置
|
||
def load_config():
|
||
try:
|
||
with open(CONFIG_FILE, 'r') as f:
|
||
return json.load(f)
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
# 默认配置
|
||
default_config = {
|
||
"admin_password": "admin123",
|
||
"proxy_address": "127.0.0.1",
|
||
"proxy_port": "3128"
|
||
}
|
||
os.makedirs('config', exist_ok=True)
|
||
with open(CONFIG_FILE, 'w') as f:
|
||
json.dump(default_config, f, indent=4)
|
||
return default_config
|
||
|
||
|
||
# 保存配置
|
||
def save_config(config):
|
||
with open(CONFIG_FILE, 'w') as f:
|
||
json.dump(config, f, indent=4)
|
||
|
||
|
||
class User:
|
||
def __init__(self, name, password, is_active=True):
|
||
self.name = name
|
||
self.password = password # 这里存储明文密码
|
||
self.is_active = is_active
|
||
|
||
|
||
def basic_auth_required(f):
|
||
@wraps(f)
|
||
def decorated(*args, **kwargs):
|
||
config = load_config()
|
||
auth = request.authorization
|
||
if not auth or not (auth.username == 'admin' and auth.password == config['admin_password']):
|
||
return ('Unauthorized', 401,
|
||
{'WWW-Authenticate': 'Basic realm="Authorization Required"'})
|
||
return f(*args, **kwargs)
|
||
|
||
return decorated
|
||
|
||
|
||
def read_squid_file():
|
||
users = []
|
||
try:
|
||
with open(SQUID_PASSWD_FILE, 'r') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
is_active = not line.startswith('#')
|
||
if not is_active:
|
||
line = line[1:]
|
||
|
||
parts = line.split(':', 1)
|
||
if len(parts) == 2:
|
||
# 从users.json中获取明文密码
|
||
users_data = load_users_data()
|
||
plain_password = next((u['password'] for u in users_data if u['username'] == parts[0]), None)
|
||
users.append(User(parts[0], plain_password, is_active))
|
||
except FileNotFoundError:
|
||
pass
|
||
return users
|
||
|
||
|
||
def write_squid_file(users):
|
||
"""只更新squid_passwd文件的状态(是否注释),不修改密码内容"""
|
||
try:
|
||
# 读取现有加密密码
|
||
with open(SQUID_PASSWD_FILE, 'r') as f:
|
||
existing_lines = f.readlines()
|
||
|
||
# 创建用户名到密码的映射
|
||
existing_passwords = {}
|
||
for line in existing_lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
active = not line.startswith('#')
|
||
if not active:
|
||
line = line[1:]
|
||
parts = line.split(':', 1)
|
||
if len(parts) == 2:
|
||
existing_passwords[parts[0]] = parts[1]
|
||
|
||
# 写入新文件,保持加密密码不变
|
||
with open(SQUID_PASSWD_FILE, 'w') as f:
|
||
for user in users:
|
||
encrypted_password = existing_passwords.get(user.name, user.password)
|
||
line = f"{'#' if not user.is_active else ''}{user.name}:{encrypted_password}\n"
|
||
f.write(line)
|
||
except FileNotFoundError:
|
||
# 如果文件不存在,创建新文件
|
||
with open(SQUID_PASSWD_FILE, 'w') as f:
|
||
for user in users:
|
||
# 首次创建时需要使用htpasswd命令生成加密密码
|
||
subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, user.name, user.password], check=True)
|
||
# 如果不是活跃用户,需要添加注释
|
||
if not user.is_active:
|
||
with open(SQUID_PASSWD_FILE, 'r') as f_read:
|
||
content = f_read.read()
|
||
with open(SQUID_PASSWD_FILE, 'w') as f_write:
|
||
f_write.write(f"#{content}")
|
||
|
||
|
||
def load_users_data():
|
||
"""加载用户明文数据"""
|
||
try:
|
||
with open(USERS_FILE, 'r') as f:
|
||
return json.load(f)
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
return []
|
||
|
||
|
||
def save_users_to_json(users):
|
||
"""保存用户明文信息到JSON文件"""
|
||
users_data = [{'username': u.name, 'password': u.password, 'active': u.is_active} for u in users]
|
||
with open(USERS_FILE, 'w') as f:
|
||
json.dump(users_data, f, indent=4)
|
||
|
||
|
||
def create_user(username, password):
|
||
users = read_squid_file()
|
||
if any(u.name == username for u in users):
|
||
return False
|
||
|
||
try:
|
||
# 先保存明文密码到users.json
|
||
users.append(User(username, password, True))
|
||
save_users_to_json(users)
|
||
|
||
# 然后创建加密密码
|
||
subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True)
|
||
return True
|
||
except subprocess.CalledProcessError:
|
||
return False
|
||
|
||
|
||
@app.route('/')
|
||
@basic_auth_required
|
||
def index():
|
||
config = load_config()
|
||
users = read_squid_file()
|
||
return render_template('index.html',
|
||
user_count=len(users),
|
||
proxy_address=config['proxy_address'],
|
||
proxy_port=config['proxy_port'])
|
||
|
||
|
||
@app.route('/clients')
|
||
@basic_auth_required
|
||
def clients():
|
||
page = request.args.get('page', 1, type=int)
|
||
per_page = 5
|
||
|
||
users = read_squid_file()
|
||
total_pages = ceil(len(users) / per_page)
|
||
|
||
start = (page - 1) * per_page
|
||
end = start + per_page
|
||
paginated_users = users[start:end]
|
||
|
||
config = load_config()
|
||
|
||
return render_template('clients.html',
|
||
users=paginated_users,
|
||
page=page,
|
||
total_pages=total_pages,
|
||
proxy_address=config['proxy_address'],
|
||
proxy_port=config['proxy_port'])
|
||
|
||
|
||
@app.route('/settings')
|
||
@basic_auth_required
|
||
def settings():
|
||
config = load_config()
|
||
return render_template('settings.html', config=config)
|
||
|
||
|
||
@app.route('/update_settings', methods=['POST'])
|
||
@basic_auth_required
|
||
def update_settings():
|
||
config = load_config()
|
||
new_password = request.form.get('admin_password')
|
||
proxy_address = request.form.get('proxy_address')
|
||
proxy_port = request.form.get('proxy_port')
|
||
|
||
if new_password:
|
||
config['admin_password'] = new_password
|
||
if proxy_address:
|
||
config['proxy_address'] = proxy_address
|
||
if proxy_port:
|
||
config['proxy_port'] = proxy_port
|
||
|
||
save_config(config)
|
||
flash('设置已成功保存!', 'success')
|
||
return redirect(url_for('settings'))
|
||
|
||
|
||
@app.route('/toggle/<username>', methods=['POST'])
|
||
@basic_auth_required
|
||
def toggle_user(username):
|
||
users = read_squid_file()
|
||
for user in users:
|
||
if user.name == username:
|
||
user.is_active = not user.is_active
|
||
break
|
||
write_squid_file(users)
|
||
save_users_to_json(users) # 更新users.json
|
||
return '', 200
|
||
|
||
|
||
@app.route('/delete/<username>', methods=['POST'])
|
||
@basic_auth_required
|
||
def delete_user(username):
|
||
users = [u for u in read_squid_file() if u.name != username]
|
||
write_squid_file(users)
|
||
save_users_to_json(users) # 更新users.json
|
||
return '', 200
|
||
|
||
|
||
@app.route('/save_user', methods=['POST'])
|
||
@basic_auth_required
|
||
def save_user():
|
||
data = request.get_json()
|
||
username = data.get('username')
|
||
password = data.get('password')
|
||
|
||
if not username or not password:
|
||
return jsonify({'error': 'Username and password required'}), 400
|
||
|
||
try:
|
||
# 更新明文密码
|
||
users = read_squid_file()
|
||
for user in users:
|
||
if user.name == username:
|
||
user.password = password
|
||
break
|
||
save_users_to_json(users)
|
||
|
||
# 更新加密密码
|
||
subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True)
|
||
return '', 200
|
||
except subprocess.CalledProcessError:
|
||
return jsonify({'error': 'Failed to update password'}), 500
|
||
|
||
|
||
@app.route('/create_user', methods=['POST'])
|
||
@basic_auth_required
|
||
def handle_create_user():
|
||
data = request.get_json()
|
||
username = data.get('username')
|
||
password = data.get('password')
|
||
|
||
if not username or not password:
|
||
return jsonify({'error': 'Username and password required'}), 400
|
||
|
||
if create_user(username, password):
|
||
return '', 200
|
||
else:
|
||
return jsonify({'error': 'User already exists'}), 409
|
||
|
||
|
||
@app.route('/copy_proxy/<username>', methods=['POST'])
|
||
@basic_auth_required
|
||
def copy_proxy(username):
|
||
config = load_config()
|
||
users = read_squid_file()
|
||
user = next((u for u in users if u.name == username), None)
|
||
|
||
if not user:
|
||
return jsonify({'error': 'User not found'}), 404
|
||
|
||
proxy_url = f"http://{user.name}:{user.password}@{config['proxy_address']}:{config['proxy_port']}"
|
||
return jsonify({
|
||
'message': 'Proxy URL copied to clipboard (simulated)',
|
||
'proxy_url': proxy_url
|
||
})
|
||
|
||
|
||
@app.route('/logout')
|
||
def logout():
|
||
return ('', 401, {'WWW-Authenticate': 'Basic realm="Authorization Required"'})
|
||
|
||
|
||
if __name__ == '__main__':
|
||
if not os.path.exists('config'):
|
||
os.makedirs('config')
|
||
load_config() # 初始化配置文件
|
||
app.run(host='0.0.0.0', port=8080, debug=True)
|