
| import re import csv from datetime import datetime
class AccountAudit: def __init__(self): self.users = [] self.roles = [] self.operations = [] self.role_permissions = { "管理员": {"user_management", "product_management", "order_management", "system_logs"}, "客服": {"user_management", "order_management"}, "财务": {"order_management"}, "商品经理": {"product_management"}, "系统审计员": {"system_logs"} } self.id_card_weights = [7,9,10,5,8,4,2,1,6,3,7,9,10,5,8,4,2] self.id_card_check_codes = ['1','0','X','9','8','7','6','5','4','3','2'] def parse_sql_file(self, sql_file_path): with open(sql_file_path, 'r', encoding='utf-8') as f: sql_content = f.read() self._parse_operations(sql_content) self._parse_users(sql_content) self._parse_roles(sql_content) def _parse_operations(self, sql_content): op_pattern = r"INSERT INTO `operations` VALUES \((\d+), (\d+), '([^']*)', '([^']*)', '([^']*)'\);" matches = re.findall(op_pattern, sql_content) for match in matches: self.operations.append({ "操作ID": int(match[0]), "用户ID": int(match[1]), "操作类型": match[2], "操作模块": match[3], "时间戳": match[4] }) def _parse_users(self, sql_content): user_pattern = r"INSERT INTO `users` VALUES \((\d+), '([^']*)', '([^']*)', '([^']*)', '([^']*)', '([^']*)', '([^']*)'\);" matches = re.findall(user_pattern, sql_content) for match in matches: self.users.append({ "用户ID": int(match[0]), "姓名": match[1], "手机号": match[2], "身份证号": match[3], "银行卡号": match[4], "注册日期": match[5], "角色": match[6] }) def _parse_roles(self, sql_content): role_pattern = r"INSERT INTO `roles` VALUES \((\d+), '([^']*)', '([^']*)'\);" matches = re.findall(role_pattern, sql_content) for match in matches: self.roles.append({ "角色ID": int(match[0]), "角色名称": match[1], "权限": set(match[2].split(',')) }) def validate_name(self, name): if not isinstance(name, str) or len(name) < 2 or len(name) > 4: return False return all('\u4e00' <= char <= '\u9fff' for char in name) def validate_phone(self, phone): if len(phone) != 11 or not phone.isdigit(): return False return phone.startswith('1') and phone[1] in '3456789' def validate_id_card(self, id_card): if len(id_card) != 18: return False if not id_card[:17].isdigit(): return False total = 0 for i in range(17): total += int(id_card[i]) * self.id_card_weights[i] check_code = self.id_card_check_codes[total % 11] return id_card[-1].upper() == check_code def validate_bank_card(self, bank_card): if len(bank_card) < 16 or len(bank_card) > 19 or not bank_card.isdigit(): return False digits = list(map(int, bank_card[::-1])) total = 0 for i in range(len(digits)): if i % 2 == 1: doubled = digits[i] * 2 total += doubled if doubled <= 9 else doubled - 9 else: total += digits[i] return total % 10 == 0 def validate_register_date(self, register_date_str, id_card): try: register_date = datetime.strptime(register_date_str, '%Y/%m/%d') except ValueError: return False, "格式错误" min_date = datetime(2015, 1, 1) max_date = datetime(2025, 10, 31) if not (min_date <= register_date <= max_date): return False, "超出日期范围" if len(id_card) >= 8: birth_year = int(id_card[6:10]) birth_month = int(id_card[10:12]) birth_day = int(id_card[12:14]) try: birth_date = datetime(birth_year, birth_month, birth_day) if register_date < birth_date: return False, "早于出生日期" except ValueError: return False, "身份证出生日期无效" return True, "" def check_info_violation(self, user): if not self.validate_name(user["姓名"]): return True if not self.validate_phone(user["手机号"]): return True if not self.validate_id_card(user["身份证号"]): return True if not self.validate_bank_card(user["银行卡号"]): return True register_valid, _ = self.validate_register_date(user["注册日期"], user["身份证号"]) if not register_valid: return True return False def check_operation_violation(self, user): user_id = user["用户ID"] user_role = user["角色"] allowed_modules = self.role_permissions.get(user_role, set()) for op in self.operations: if op["用户ID"] == user_id: if op["操作模块"] not in allowed_modules: return True return False def audit_all_users(self): violation_records = [] processed_users = set() for user in self.users: if self.check_info_violation(user): key = (user["姓名"], "信息违规") if key not in processed_users: violation_records.append({ "姓名": user["姓名"], "违规类型": "信息违规" }) processed_users.add(key) if self.check_operation_violation(user): key = (user["姓名"], "操作违规") if key not in processed_users: violation_records.append({ "姓名": user["姓名"], "违规类型": "操作违规" }) processed_users.add(key) return violation_records def export_to_csv(self, violation_records, output_path): with open(output_path, 'w', newline='', encoding='utf-8') as f: fieldnames = ["姓名", "违规类型"] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(violation_records) def main(): auditor = AccountAudit() sql_file_path = "data.sql" auditor.parse_sql_file(sql_file_path) violation_records = auditor.audit_all_users() output_csv_path = "out.csv" auditor.export_to_csv(violation_records, output_csv_path) print(f"审计完成,共发现 {len(violation_records)} 条违规记录") print(f"结果已保存至: {output_csv_path}") if __name__ == "__main__": main()
|