squid_ui/app.py

312 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, jsonify, abort, redirect, url_for, session
from functools import wraps
import os
import subprocess
import json
import uuid
from math import ceil
from flask import flash
app = Flask(__name__)
app.secret_key = str(uuid.uuid4())
# 配置文件路径
CONFIG_FILE = 'config/config.json'
SQUID_PASSWD_FILE = 'config/squid_passwd'
USERS_FILE = 'config/users.json' # 新增用户信息存储文件
# 加载配置
def load_config():
try:
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
# 默认配置
default_config = {
"admin_password": "admin123",
"proxy_address": "127.0.0.1",
"proxy_port": "3128"
}
os.makedirs('config', exist_ok=True)
with open(CONFIG_FILE, 'w') as f:
json.dump(default_config, f, indent=4)
return default_config
# 保存配置
def save_config(config):
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=4)
class User:
def __init__(self, name, password, is_active=True):
self.name = name
self.password = password # 这里存储明文密码
self.is_active = is_active
def basic_auth_required(f):
@wraps(f)
def decorated(*args, **kwargs):
config = load_config()
auth = request.authorization
if not auth or not (auth.username == 'admin' and auth.password == config['admin_password']):
return ('Unauthorized', 401,
{'WWW-Authenticate': 'Basic realm="Authorization Required"'})
return f(*args, **kwargs)
return decorated
def read_squid_file():
users = []
try:
with open(SQUID_PASSWD_FILE, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
is_active = not line.startswith('#')
if not is_active:
line = line[1:]
parts = line.split(':', 1)
if len(parts) == 2:
# 从users.json中获取明文密码
users_data = load_users_data()
plain_password = next((u['password'] for u in users_data if u['username'] == parts[0]), None)
users.append(User(parts[0], plain_password, is_active))
except FileNotFoundError:
pass
return users
def write_squid_file(users):
"""只更新squid_passwd文件的状态是否注释不修改密码内容"""
try:
# 读取现有加密密码
with open(SQUID_PASSWD_FILE, 'r') as f:
existing_lines = f.readlines()
# 创建用户名到密码的映射
existing_passwords = {}
for line in existing_lines:
line = line.strip()
if not line:
continue
active = not line.startswith('#')
if not active:
line = line[1:]
parts = line.split(':', 1)
if len(parts) == 2:
existing_passwords[parts[0]] = parts[1]
# 写入新文件,保持加密密码不变
with open(SQUID_PASSWD_FILE, 'w') as f:
for user in users:
encrypted_password = existing_passwords.get(user.name, user.password)
line = f"{'#' if not user.is_active else ''}{user.name}:{encrypted_password}\n"
f.write(line)
except FileNotFoundError:
# 如果文件不存在,创建新文件
with open(SQUID_PASSWD_FILE, 'w') as f:
for user in users:
# 首次创建时需要使用htpasswd命令生成加密密码
subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, user.name, user.password], check=True)
# 如果不是活跃用户,需要添加注释
if not user.is_active:
with open(SQUID_PASSWD_FILE, 'r') as f_read:
content = f_read.read()
with open(SQUID_PASSWD_FILE, 'w') as f_write:
f_write.write(f"#{content}")
def load_users_data():
"""加载用户明文数据"""
try:
with open(USERS_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_users_to_json(users):
"""保存用户明文信息到JSON文件"""
users_data = [{'username': u.name, 'password': u.password, 'active': u.is_active} for u in users]
with open(USERS_FILE, 'w') as f:
json.dump(users_data, f, indent=4)
def create_user(username, password):
users = read_squid_file()
if any(u.name == username for u in users):
return False
try:
# 先保存明文密码到users.json
users.append(User(username, password, True))
save_users_to_json(users)
# 然后创建加密密码
subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True)
return True
except subprocess.CalledProcessError:
return False
@app.route('/')
@basic_auth_required
def index():
config = load_config()
users = read_squid_file()
return render_template('index.html',
user_count=len(users),
proxy_address=config['proxy_address'],
proxy_port=config['proxy_port'])
@app.route('/clients')
@basic_auth_required
def clients():
page = request.args.get('page', 1, type=int)
per_page = 5
users = read_squid_file()
total_pages = ceil(len(users) / per_page)
start = (page - 1) * per_page
end = start + per_page
paginated_users = users[start:end]
config = load_config()
return render_template('clients.html',
users=paginated_users,
page=page,
total_pages=total_pages,
proxy_address=config['proxy_address'],
proxy_port=config['proxy_port'])
@app.route('/settings')
@basic_auth_required
def settings():
config = load_config()
return render_template('settings.html', config=config)
@app.route('/update_settings', methods=['POST'])
@basic_auth_required
def update_settings():
config = load_config()
new_password = request.form.get('admin_password')
proxy_address = request.form.get('proxy_address')
proxy_port = request.form.get('proxy_port')
if new_password:
config['admin_password'] = new_password
if proxy_address:
config['proxy_address'] = proxy_address
if proxy_port:
config['proxy_port'] = proxy_port
save_config(config)
flash('设置已成功保存!', 'success')
return redirect(url_for('settings'))
@app.route('/toggle/<username>', methods=['POST'])
@basic_auth_required
def toggle_user(username):
users = read_squid_file()
for user in users:
if user.name == username:
user.is_active = not user.is_active
break
write_squid_file(users)
save_users_to_json(users) # 更新users.json
return '', 200
@app.route('/delete/<username>', methods=['POST'])
@basic_auth_required
def delete_user(username):
users = [u for u in read_squid_file() if u.name != username]
write_squid_file(users)
save_users_to_json(users) # 更新users.json
return '', 200
@app.route('/save_user', methods=['POST'])
@basic_auth_required
def save_user():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': 'Username and password required'}), 400
try:
# 更新明文密码
users = read_squid_file()
for user in users:
if user.name == username:
user.password = password
break
save_users_to_json(users)
# 更新加密密码
subprocess.run(['htpasswd', '-b', SQUID_PASSWD_FILE, username, password], check=True)
return '', 200
except subprocess.CalledProcessError:
return jsonify({'error': 'Failed to update password'}), 500
@app.route('/create_user', methods=['POST'])
@basic_auth_required
def handle_create_user():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': 'Username and password required'}), 400
if create_user(username, password):
return '', 200
else:
return jsonify({'error': 'User already exists'}), 409
@app.route('/copy_proxy/<username>', methods=['POST'])
@basic_auth_required
def copy_proxy(username):
config = load_config()
users = read_squid_file()
user = next((u for u in users if u.name == username), None)
if not user:
return jsonify({'error': 'User not found'}), 404
proxy_url = f"http://{user.name}:{user.password}@{config['proxy_address']}:{config['proxy_port']}"
return jsonify({
'message': 'Proxy URL copied to clipboard (simulated)',
'proxy_url': proxy_url
})
@app.route('/logout')
def logout():
return ('', 401, {'WWW-Authenticate': 'Basic realm="Authorization Required"'})
if __name__ == '__main__':
if not os.path.exists('config'):
os.makedirs('config')
load_config() # 初始化配置文件
app.run(host='0.0.0.0', port=8080, debug=True)