1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
""" YOLOv8-L + imgsz=896,且最大“dog”框面积占比 >= 0.25% -> 有狗 若检测无狗 -> ResNet50(ImageNet) Top-5 关键词兜底;若命中犬类 -> 有狗 任一阶段“有狗” => 保留;两者都“无狗” => 立即移动到 non_dog/ """
import os from pathlib import Path import shutil
import torch from ultralytics import YOLO from PIL import Image from torchvision.models import resnet50, ResNet50_Weights
os.environ["OMP_NUM_THREADS"] = "12" os.environ["MKL_NUM_THREADS"] = "12" torch.set_num_threads(12) DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" torch.backends.cudnn.benchmark = True
ROOT = Path(__file__).resolve().parent DOG_DIR = ROOT / "dog" NON_DOG_DIR = ROOT / "non_dog" MODEL_WEIGHTS = "yolov8l.pt" IMG_SIZE = 896 CONF_DET = 0.25 MIN_DOG_AREA_RATIO = 0.0025 TOPK_CLS = 5 DET_BATCH = 2 CLS_BATCH = 128 DET_WORKERS = 12 IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"}
def iter_images(root: Path): return [p for p in root.rglob("*") if p.is_file() and p.suffix.lower() in IMG_EXTS]
def unique_destination(dst: Path) -> Path: if not dst.exists(): return dst i = 1 while True: cand = dst.with_name(f"{dst.stem}__dup{i}{dst.suffix}") if not cand.exists(): return cand i += 1
def dog_class_ids_from_names(names) -> set: ids = set() if isinstance(names, dict): for k, v in names.items(): if str(v).lower() == "dog": ids.add(int(k)) elif isinstance(names, (list, tuple)): for i, v in enumerate(names): if str(v).lower() == "dog": ids.add(int(i)) return ids or {16}
def has_dog_by_detection_result(res, min_area_ratio: float): boxes = res.boxes h, w = res.orig_shape img_area = float(max(1, h) * max(1, w)) cls_list = boxes.cls.int().tolist() if boxes.cls is not None else [] xyxy_list = boxes.xyxy.tolist() if boxes.xyxy is not None else [] max_ar = 0.0 total_dog = 0 valid = 0 names_map = res.names dog_ids = dog_class_ids_from_names(names_map) for ci, (x1, y1, x2, y2) in zip(cls_list, xyxy_list): if ci in dog_ids: total_dog += 1 bw = float(x2) - float(x1) bh = float(y2) - float(y1) ar = max(0.0, (bw * bh) / img_area) if ar > max_ar: max_ar = ar if ar >= min_area_ratio: valid += 1 return (valid > 0), max_ar, valid, total_dog
def main(): assert DOG_DIR.is_dir(), f"未找到 dog 目录:{DOG_DIR}" NON_DOG_DIR.mkdir(parents=True, exist_ok=True)
img_paths = iter_images(DOG_DIR) total = len(img_paths) assert total > 0, f"{DOG_DIR} 下未发现图片。支持扩展名:{sorted(IMG_EXTS)}"
print(f"[信息] 设备: {DEVICE} | 模型: {MODEL_WEIGHTS} | imgsz={IMG_SIZE} | TTA=False | conf={CONF_DET}") print(f"[信息] 线程: OMP/MKL/torch={12} | Ultralytics workers={DET_WORKERS}") print(f"[信息] 待处理图片:{total}\n", flush=True)
det_model = YOLO(MODEL_WEIGHTS) weights = ResNet50_Weights.DEFAULT clf_model = resnet50(weights=weights).to(DEVICE).eval() preprocess = weights.transforms() categories = [c.lower() for c in weights.meta["categories"]] dog_keywords = { "dog","hound","retriever","terrier","spaniel","poodle","shepherd","greyhound","whippet", "bulldog","mastiff","pinscher","pug","boxer","beagle","chihuahua","papillon","malamute", "husky","samoyed","eskimo","corgi","dachshund","doberman","rottweiler","collie","borzoi", "basenji","pomeranian","newfoundland","kuvasz","komondor","pyrenees","akita","setter", "pointer","lhasa","weimaraner","vizsla","affenpinscher","briard","bouvier","elkhound", "otterhound","bloodhound","foxhound","wolfhound","saluki","afghan hound","malinois", "groenendael","keeshond","schipperke","kelpie","pyrenean","sheepdog","mudi","puli", "tibetan mastiff","tibetan terrier","irish terrier","scottish deerhound","irish wolfhound", "great dane","great pyrenees","shih-tzu","shihtzu","toy terrier","springer","cocker", "schnauzer","spitz","ridgeback","rhodesian ridgeback","airedale","cairn","yorkshire", "norfolk","norwich","silky","border collie","old english sheepdog" }
def cls_is_dog_topk_labels(tensor_batch): with torch.inference_mode(): probs = clf_model(tensor_batch).softmax(dim=1) _, topk_idx = probs.topk(TOPK_CLS, dim=1) hits = [] for row in topk_idx: hit = "" for idx in row.tolist(): lbl = categories[idx] if any(kw in lbl for kw in dog_keywords): hit = lbl break hits.append(hit) return hits
moved = 0 kept = 0 processed = 0
for s in range(0, total, DET_BATCH): batch = img_paths[s:s + DET_BATCH]
det_results = det_model.predict( source=[str(p) for p in batch], conf=CONF_DET, imgsz=IMG_SIZE, device=DEVICE, augment=False, verbose=False, workers=DET_WORKERS )
need_cls_indices = [] det_info = {} for off, res in enumerate(det_results): has, max_ar, valid, tot = has_dog_by_detection_result(res, MIN_DOG_AREA_RATIO) det_info[off] = (has, max_ar, valid, tot) if not has: need_cls_indices.append(off)
cls_hits = {} if need_cls_indices: imgs = [] for off in need_cls_indices: p = batch[off] with Image.open(p) as im: imgs.append(preprocess(im.convert("RGB"))) x = torch.stack(imgs, dim=0).to(DEVICE) labels = cls_is_dog_topk_labels(x) for off, lab in zip(need_cls_indices, labels): cls_hits[off] = lab
for off, src in enumerate(batch): i_global = s + off has_det, max_ar, valid_cnt, total_cnt = det_info[off] lab = cls_hits.get(off, "") keep = has_det or (lab != "")
if keep: decided_by = "det" if has_det else "cls" kept += 1 print(f"[{i_global+1:>6}/{total}] KEEP | {src.relative_to(DOG_DIR)} " f"| 决策:{decided_by:3s} | det_max_area:{max_ar:.4%} | cls_hit:{lab}", flush=True) else: dst = NON_DOG_DIR / src.relative_to(DOG_DIR) dst.parent.mkdir(parents=True, exist_ok=True) dst = unique_destination(dst) shutil.move(str(src), str(dst)) moved += 1 print(f"[{i_global+1:>6}/{total}] MOVE | {src.relative_to(DOG_DIR)} -> {dst.relative_to(NON_DOG_DIR)} " f"| 决策:none | det_max_area:{max_ar:.4%}", flush=True)
processed += 1
print("\n===== 完成 =====", flush=True) print(f"总计图片:{total}", flush=True) print(f"狗图片(保留在 dog/):{kept}", flush=True) print(f"非狗图片(已移动到 non_dog/):{moved}", flush=True) print(f"输出目录:{NON_DOG_DIR.resolve()}", flush=True)
if __name__ == "__main__": main()
|