1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
| import re import csv from datetime import datetime
class AccountAudit: def __init__(self): self.users = [] self.roles = [] self.operations = [] self.role_permissions = { "管理员": {"user_management", "product_management", "order_management", "system_logs"}, "客服": {"user_management", "order_management"}, "财务": {"order_management"}, "商品经理": {"product_management"}, "系统审计员": {"system_logs"} } self.id_card_weights = [7,9,10,5,8,4,2,1,6,3,7,9,10,5,8,4,2] self.id_card_check_codes = ['1','0','X','9','8','7','6','5','4','3','2'] def parse_sql_file(self, sql_file_path): with open(sql_file_path, 'r', encoding='utf-8') as f: sql_content = f.read() self._parse_operations(sql_content) self._parse_users(sql_content) self._parse_roles(sql_content) def _parse_operations(self, sql_content): op_pattern = r"INSERT INTO `operations` VALUES \((\d+), (\d+), '([^']*)', '([^']*)', '([^']*)'\);" matches = re.findall(op_pattern, sql_content) for match in matches: self.operations.append({ "操作ID": int(match[0]), "用户ID": int(match[1]), "操作类型": match[2], "操作模块": match[3], "时间戳": match[4] }) def _parse_users(self, sql_content): user_pattern = r"INSERT INTO `users` VALUES \((\d+), '([^']*)', '([^']*)', '([^']*)', '([^']*)', '([^']*)', '([^']*)'\);" matches = re.findall(user_pattern, sql_content) for match in matches: self.users.append({ "用户ID": int(match[0]), "姓名": match[1], "手机号": match[2], "身份证号": match[3], "银行卡号": match[4], "注册日期": match[5], "角色": match[6] }) def _parse_roles(self, sql_content): role_pattern = r"INSERT INTO `roles` VALUES \((\d+), '([^']*)', '([^']*)'\);" matches = re.findall(role_pattern, sql_content) for match in matches: self.roles.append({ "角色ID": int(match[0]), "角色名称": match[1], "权限": set(match[2].split(',')) }) def validate_name(self, name): if not isinstance(name, str) or len(name) < 2 or len(name) > 4: return False return all('\u4e00' <= char <= '\u9fff' for char in name) def validate_phone(self, phone): if len(phone) != 11 or not phone.isdigit(): return False return phone.startswith('1') and phone[1] in '3456789' def validate_id_card(self, id_card): if len(id_card) != 18: return False if not id_card[:17].isdigit(): return False total = 0 for i in range(17): total += int(id_card[i]) * self.id_card_weights[i] check_code = self.id_card_check_codes[total % 11] return id_card[-1].upper() == check_code def validate_bank_card(self, bank_card): if len(bank_card) < 16 or len(bank_card) > 19 or not bank_card.isdigit(): return False digits = list(map(int, bank_card[::-1])) total = 0 for i in range(len(digits)): if i % 2 == 1: doubled = digits[i] * 2 total += doubled if doubled <= 9 else doubled - 9 else: total += digits[i] return total % 10 == 0 def validate_register_date(self, register_date_str, id_card): try: register_date = datetime.strptime(register_date_str, '%Y/%m/%d') except ValueError: return False, "格式错误" min_date = datetime(2015, 1, 1) max_date = datetime(2025, 10, 31) if not (min_date <= register_date <= max_date): return False, "超出日期范围" if len(id_card) >= 8: birth_year = int(id_card[6:10]) birth_month = int(id_card[10:12]) birth_day = int(id_card[12:14]) try: birth_date = datetime(birth_year, birth_month, birth_day) if register_date < birth_date: return False, "早于出生日期" except ValueError: return False, "身份证出生日期无效" return True, "" def check_info_violation(self, user): if not self.validate_name(user["姓名"]): return True if not self.validate_phone(user["手机号"]): return True if not self.validate_id_card(user["身份证号"]): return True if not self.validate_bank_card(user["银行卡号"]): return True register_valid, _ = self.validate_register_date(user["注册日期"], user["身份证号"]) if not register_valid: return True return False def check_operation_violation(self, user): user_id = user["用户ID"] user_role = user["角色"] allowed_modules = self.role_permissions.get(user_role, set()) for op in self.operations: if op["用户ID"] == user_id: if op["操作模块"] not in allowed_modules: return True return False def audit_all_users(self): violation_records = [] processed_users = set() for user in self.users: if self.check_info_violation(user): key = (user["姓名"], "信息违规") if key not in processed_users: violation_records.append({ "姓名": user["姓名"], "违规类型": "信息违规" }) processed_users.add(key) if self.check_operation_violation(user): key = (user["姓名"], "操作违规") if key not in processed_users: violation_records.append({ "姓名": user["姓名"], "违规类型": "操作违规" }) processed_users.add(key) return violation_records def export_to_csv(self, violation_records, output_path): with open(output_path, 'w', newline='', encoding='utf-8') as f: fieldnames = ["姓名", "违规类型"] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(violation_records) def main(): auditor = AccountAudit() sql_file_path = "data.sql" auditor.parse_sql_file(sql_file_path) violation_records = auditor.audit_all_users() output_csv_path = "out.csv" auditor.export_to_csv(violation_records, output_csv_path) print(f"审计完成,共发现 {len(violation_records)} 条违规记录") print(f"结果已保存至: {output_csv_path}") if __name__ == "__main__": main()
|