支持批量删除

This commit is contained in:
wzj 2025-06-15 07:56:35 +08:00
parent 25add10b35
commit 5c2381c3a8
3 changed files with 458 additions and 18 deletions

235
app.py
View File

@ -1093,19 +1093,50 @@ def logout():
return redirect(url_for('login'))
from math import ceil
# 每页显示的数量
PER_PAGE = 10
@app.route('/cas')
@login_required
def ca_list():
page = request.args.get('page', 1, type=int)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取总数
if current_user.is_admin:
cursor.execute("SELECT * FROM certificate_authorities")
cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities")
else:
cursor.execute("SELECT * FROM certificate_authorities WHERE created_by = %s", (current_user.id,))
cursor.execute("SELECT COUNT(*) as total FROM certificate_authorities WHERE created_by = %s",
(current_user.id,))
total = cursor.fetchone()['total']
total_pages = ceil(total / PER_PAGE)
# 获取分页数据
offset = (page - 1) * PER_PAGE
if current_user.is_admin:
cursor.execute("SELECT * FROM certificate_authorities ORDER BY created_at DESC LIMIT %s OFFSET %s",
(PER_PAGE, offset))
else:
cursor.execute("""
SELECT * FROM certificate_authorities
WHERE created_by = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (current_user.id, PER_PAGE, offset))
cas = cursor.fetchall()
return render_template('ca_list.html', cas=cas, get_username=get_username)
return render_template('ca_list.html',
cas=cas,
page=page,
total_pages=total_pages,
total=total,
get_username=get_username)
except Error as e:
print(f"Database error: {e}")
flash('获取CA列表失败', 'danger')
@ -1117,6 +1148,204 @@ def ca_list():
return redirect(url_for('index'))
@app.route('/cas/batch_delete', methods=['POST'])
@login_required
def batch_delete_cas():
if not request.is_json:
return jsonify({'success': False, 'message': 'Invalid request'}), 400
data = request.get_json()
ca_ids = data.get('ids', [])
if not ca_ids:
return jsonify({'success': False, 'message': 'No CAs selected'}), 400
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
try:
cursor = conn.cursor()
# 检查权限并删除
for ca_id in ca_ids:
# 验证CA存在且用户有权限
cursor.execute("SELECT created_by FROM certificate_authorities WHERE id = %s", (ca_id,))
ca = cursor.fetchone()
if not ca:
continue
if not current_user.is_admin and ca['created_by'] != current_user.id:
continue
# 检查是否有关联证书
cursor.execute("SELECT COUNT(*) as count FROM certificates WHERE ca_id = %s", (ca_id,))
result = cursor.fetchone()
if result['count'] > 0:
continue
# 获取CA信息以便删除文件
cursor.execute("SELECT cert_path, key_path FROM certificate_authorities WHERE id = %s", (ca_id,))
ca_info = cursor.fetchone()
if ca_info:
# 删除文件
try:
if os.path.exists(ca_info['cert_path']):
os.remove(ca_info['cert_path'])
if os.path.exists(ca_info['key_path']):
os.remove(ca_info['key_path'])
# 删除CA目录
ca_dir = os.path.dirname(ca_info['cert_path'])
if os.path.exists(ca_dir):
shutil.rmtree(ca_dir) # 递归删除目录
except OSError as e:
print(f"文件删除错误: {e}")
continue
# 删除数据库记录
cursor.execute("DELETE FROM certificate_revocation_list WHERE ca_id = %s", (ca_id,))
cursor.execute("DELETE FROM certificate_authorities WHERE id = %s", (ca_id,))
conn.commit()
return jsonify({'success': True, 'message': '批量删除成功'})
except Error as e:
conn.rollback()
print(f"Database error: {e}")
return jsonify({'success': False, 'message': '数据库操作失败'}), 500
finally:
if conn.is_connected():
cursor.close()
conn.close()
@app.route('/certificates')
@login_required
def certificate_list():
page = request.args.get('page', 1, type=int)
conn = get_db_connection()
if conn:
try:
cursor = conn.cursor(dictionary=True)
# 获取总数
if current_user.is_admin:
cursor.execute("SELECT COUNT(*) as total FROM certificates")
else:
cursor.execute("SELECT COUNT(*) as total FROM certificates WHERE created_by = %s", (current_user.id,))
total = cursor.fetchone()['total']
total_pages = ceil(total / PER_PAGE)
# 获取分页数据
offset = (page - 1) * PER_PAGE
if current_user.is_admin:
cursor.execute("""
SELECT c.*, ca.name as ca_name
FROM certificates c
JOIN certificate_authorities ca ON c.ca_id = ca.id
ORDER BY c.created_at DESC
LIMIT %s OFFSET %s
""", (PER_PAGE, offset))
else:
cursor.execute("""
SELECT c.*, ca.name as ca_name
FROM certificates c
JOIN certificate_authorities ca ON c.ca_id = ca.id
WHERE c.created_by = %s
ORDER BY c.created_at DESC
LIMIT %s OFFSET %s
""", (current_user.id, PER_PAGE, offset))
certificates = cursor.fetchall()
return render_template('certificate_list.html',
certificates=certificates,
page=page,
total_pages=total_pages,
total=total,
get_username=get_username)
except Error as e:
print(f"Database error: {e}")
flash('获取证书列表失败', 'danger')
return redirect(url_for('index'))
finally:
if conn.is_connected():
cursor.close()
conn.close()
return redirect(url_for('index'))
@app.route('/certificates/batch_delete', methods=['POST'])
@login_required
def batch_delete_certificates():
if not request.is_json:
return jsonify({'success': False, 'message': 'Invalid request'}), 400
data = request.get_json()
cert_ids = data.get('ids', [])
if not cert_ids:
return jsonify({'success': False, 'message': 'No certificates selected'}), 400
conn = get_db_connection()
if not conn:
return jsonify({'success': False, 'message': 'Database connection failed'}), 500
try:
cursor = conn.cursor(dictionary=True)
# 检查权限并删除
for cert_id in cert_ids:
# 验证证书存在且用户有权限
cursor.execute("SELECT created_by, cert_path, key_path, csr_path FROM certificates WHERE id = %s",
(cert_id,))
cert = cursor.fetchone()
if not cert:
continue
if not current_user.is_admin and cert['created_by'] != current_user.id:
continue
# 删除文件
try:
files_to_delete = [
cert['cert_path'],
cert['key_path'],
cert['csr_path']
]
# 删除所有指定文件
for file_path in files_to_delete:
if file_path and os.path.exists(file_path):
os.remove(file_path)
# 删除证书目录
cert_dir = os.path.dirname(cert['cert_path'])
if os.path.exists(cert_dir):
shutil.rmtree(cert_dir) # 递归删除目录
except OSError as e:
print(f"文件删除错误: {e}")
continue
# 删除数据库记录
cursor.execute("DELETE FROM certificates WHERE id = %s", (cert_id,))
conn.commit()
return jsonify({'success': True, 'message': '批量删除成功'})
except Error as e:
conn.rollback()
print(f"Database error: {e}")
return jsonify({'success': False, 'message': '数据库操作失败'}), 500
finally:
if conn.is_connected():
cursor.close()
conn.close()
@app.route('/cas/create', methods=['GET', 'POST'])
@login_required
def create_ca_view():

View File

@ -7,12 +7,17 @@
<div>
<h2 class="mb-1">CA机构列表</h2>
<div class="text-muted fs-6">
<i class="fas fa-shield-alt me-1"></i>共 {{ cas|length }} 个CA机构
<i class="fas fa-shield-alt me-1"></i>共 {{ total }} 个CA机构
</div>
</div>
<a href="{{ url_for('create_ca_view') }}" class="btn btn-primary btn-sm text-nowrap">
<i class="fas fa-plus me-1"></i> 创建CA机构
</a>
<div>
<button id="batchDeleteBtn" class="btn btn-danger btn-sm me-2" disabled>
<i class="fas fa-trash-alt me-1"></i> 批量删除
</button>
<a href="{{ url_for('create_ca_view') }}" class="btn btn-primary btn-sm text-nowrap">
<i class="fas fa-plus me-1"></i> 创建CA机构
</a>
</div>
</div>
<div class="card border-0 shadow-sm">
@ -21,7 +26,10 @@
<table class="table table-hover mb-0">
<thead class="table-light">
<tr>
<th class="ps-4">ID</th>
<th width="40" class="ps-4">
<input type="checkbox" id="selectAll" class="form-check-input">
</th>
<th>ID</th>
<th>名称</th>
<th>通用名</th>
<th>组织</th>
@ -34,7 +42,10 @@
<tbody>
{% for ca in cas %}
<tr>
<td class="ps-4">{{ ca.id }}</td>
<td class="ps-4">
<input type="checkbox" class="form-check-input ca-checkbox" value="{{ ca.id }}">
</td>
<td>{{ ca.id }}</td>
<td>
<a href="{{ url_for('ca_detail', ca_id=ca.id) }}" class="text-decoration-none">
{{ ca.name }}
@ -56,7 +67,7 @@
</tr>
{% else %}
<tr>
<td colspan="8" class="text-center py-4">
<td colspan="9" class="text-center py-4">
<i class="fas fa-shield-alt fa-3x text-muted mb-3"></i>
<p class="text-muted">暂无CA机构记录</p>
<a href="{{ url_for('create_ca_view') }}" class="btn btn-primary btn-sm">
@ -68,6 +79,107 @@
</tbody>
</table>
</div>
{% if total_pages > 1 %}
<div class="card-footer bg-white">
<nav aria-label="Page navigation">
<ul class="pagination justify-content-center mb-0">
<li class="page-item {% if page == 1 %}disabled{% endif %}">
<a class="page-link" href="{{ url_for('ca_list', page=page-1) }}" aria-label="Previous">
<span aria-hidden="true">&laquo;</span>
</a>
</li>
{% for p in range(1, total_pages + 1) %}
<li class="page-item {% if p == page %}active{% endif %}">
<a class="page-link" href="{{ url_for('ca_list', page=p) }}">{{ p }}</a>
</li>
{% endfor %}
<li class="page-item {% if page == total_pages %}disabled{% endif %}">
<a class="page-link" href="{{ url_for('ca_list', page=page+1) }}" aria-label="Next">
<span aria-hidden="true">&raquo;</span>
</a>
</li>
</ul>
</nav>
</div>
{% endif %}
</div>
</div>
{% endblock %}
{% block scripts %}
{{ super() }}
<script>
document.addEventListener('DOMContentLoaded', function() {
// 启用工具提示
var tooltipTriggerList = [].slice.call(document.querySelectorAll('[data-bs-toggle="tooltip"]'))
var tooltipList = tooltipTriggerList.map(function (tooltipTriggerEl) {
return new bootstrap.Tooltip(tooltipTriggerEl)
})
// 全选/取消全选
const selectAll = document.getElementById('selectAll')
const checkboxes = document.querySelectorAll('.ca-checkbox')
const batchDeleteBtn = document.getElementById('batchDeleteBtn')
selectAll.addEventListener('change', function() {
checkboxes.forEach(checkbox => {
checkbox.checked = selectAll.checked
})
updateBatchDeleteBtn()
})
// 单个复选框变化时更新全选状态
checkboxes.forEach(checkbox => {
checkbox.addEventListener('change', function() {
selectAll.checked = [...checkboxes].every(cb => cb.checked)
updateBatchDeleteBtn()
})
})
// 更新批量删除按钮状态
function updateBatchDeleteBtn() {
const checkedCount = document.querySelectorAll('.ca-checkbox:checked').length
batchDeleteBtn.disabled = checkedCount === 0
}
// 批量删除
batchDeleteBtn.addEventListener('click', function() {
const checkedBoxes = document.querySelectorAll('.ca-checkbox:checked')
const ids = Array.from(checkedBoxes).map(checkbox => checkbox.value)
if (ids.length === 0) {
return
}
if (!confirm(`确定要删除选中的 ${ids.length} 个CA机构吗`)) {
return
}
fetch("{{ url_for('batch_delete_cas') }}", {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Requested-With': 'XMLHttpRequest'
},
body: JSON.stringify({ ids: ids })
})
.then(response => response.json())
.then(data => {
if (data.success) {
alert(data.message)
window.location.reload()
} else {
alert(data.message)
}
})
.catch(error => {
console.error('Error:', error)
alert('操作失败,请稍后再试')
})
})
})
</script>
{% endblock %}

View File

@ -7,12 +7,17 @@
<div>
<h2 class="mb-1">证书列表</h2>
<div class="text-muted fs-6">
<i class="fas fa-certificate me-1"></i>共 {{ certificates|length }} 个证书
<i class="fas fa-certificate me-1"></i>共 {{ total }} 个证书
</div>
</div>
<a href="{{ url_for('create_certificate_view') }}" class="btn btn-primary btn-sm text-nowrap">
<i class="fas fa-plus me-1"></i> 创建证书
</a>
<div>
<button id="batchDeleteBtn" class="btn btn-danger btn-sm me-2" disabled>
<i class="fas fa-trash-alt me-1"></i> 批量删除
</button>
<a href="{{ url_for('create_certificate_view') }}" class="btn btn-primary btn-sm text-nowrap">
<i class="fas fa-plus me-1"></i> 创建证书
</a>
</div>
</div>
<div class="card border-0 shadow-sm">
@ -21,7 +26,10 @@
<table class="table table-hover mb-0">
<thead class="table-light">
<tr>
<th class="ps-4">ID</th>
<th width="40" class="ps-4">
<input type="checkbox" id="selectAll" class="form-check-input">
</th>
<th>ID</th>
<th>通用名</th>
<th>CA机构</th>
<th>状态</th>
@ -33,7 +41,10 @@
<tbody>
{% for cert in certificates %}
<tr>
<td class="ps-4">{{ cert.id }}</td>
<td class="ps-4">
<input type="checkbox" class="form-check-input cert-checkbox" value="{{ cert.id }}">
</td>
<td>{{ cert.id }}</td>
<td>
<a href="{{ url_for('certificate_detail', cert_id=cert.id) }}" class="text-decoration-none">
{{ cert.common_name }}
@ -84,7 +95,7 @@
</tr>
{% else %}
<tr>
<td colspan="7" class="text-center py-4">
<td colspan="8" class="text-center py-4">
<i class="fas fa-certificate fa-3x text-muted mb-3"></i>
<p class="text-muted">暂无证书记录</p>
<a href="{{ url_for('create_certificate_view') }}" class="btn btn-primary btn-sm">
@ -96,6 +107,32 @@
</tbody>
</table>
</div>
{% if total_pages > 1 %}
<div class="card-footer bg-white">
<nav aria-label="Page navigation">
<ul class="pagination justify-content-center mb-0">
<li class="page-item {% if page == 1 %}disabled{% endif %}">
<a class="page-link" href="{{ url_for('certificate_list', page=page-1) }}" aria-label="Previous">
<span aria-hidden="true">&laquo;</span>
</a>
</li>
{% for p in range(1, total_pages + 1) %}
<li class="page-item {% if p == page %}active{% endif %}">
<a class="page-link" href="{{ url_for('certificate_list', page=p) }}">{{ p }}</a>
</li>
{% endfor %}
<li class="page-item {% if page == total_pages %}disabled{% endif %}">
<a class="page-link" href="{{ url_for('certificate_list', page=page+1) }}" aria-label="Next">
<span aria-hidden="true">&raquo;</span>
</a>
</li>
</ul>
</nav>
</div>
{% endif %}
</div>
</div>
{% endblock %}
@ -103,12 +140,74 @@
{% block scripts %}
{{ super() }}
<script>
// 启用工具提示
document.addEventListener('DOMContentLoaded', function() {
// 启用工具提示
var tooltipTriggerList = [].slice.call(document.querySelectorAll('[data-bs-toggle="tooltip"]'))
var tooltipList = tooltipTriggerList.map(function (tooltipTriggerEl) {
return new bootstrap.Tooltip(tooltipTriggerEl)
})
// 全选/取消全选
const selectAll = document.getElementById('selectAll')
const checkboxes = document.querySelectorAll('.cert-checkbox')
const batchDeleteBtn = document.getElementById('batchDeleteBtn')
selectAll.addEventListener('change', function() {
checkboxes.forEach(checkbox => {
checkbox.checked = selectAll.checked
})
updateBatchDeleteBtn()
})
// 单个复选框变化时更新全选状态
checkboxes.forEach(checkbox => {
checkbox.addEventListener('change', function() {
selectAll.checked = [...checkboxes].every(cb => cb.checked)
updateBatchDeleteBtn()
})
})
// 更新批量删除按钮状态
function updateBatchDeleteBtn() {
const checkedCount = document.querySelectorAll('.cert-checkbox:checked').length
batchDeleteBtn.disabled = checkedCount === 0
}
// 批量删除
batchDeleteBtn.addEventListener('click', function() {
const checkedBoxes = document.querySelectorAll('.cert-checkbox:checked')
const ids = Array.from(checkedBoxes).map(checkbox => checkbox.value)
if (ids.length === 0) {
return
}
if (!confirm(`确定要删除选中的 ${ids.length} 个证书吗?`)) {
return
}
fetch("{{ url_for('batch_delete_certificates') }}", {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Requested-With': 'XMLHttpRequest'
},
body: JSON.stringify({ ids: ids })
})
.then(response => response.json())
.then(data => {
if (data.success) {
alert(data.message)
window.location.reload()
} else {
alert(data.message)
}
})
.catch(error => {
console.error('Error:', error)
alert('操作失败,请稍后再试')
})
})
})
</script>
{% endblock %}