
|
""" YOLOv8-L + imgsz=896,且最大“dog”框面积占比 >= 0.25% -> 有狗 若检测无狗 -> ResNet50(ImageNet) Top-5 关键词兜底;若命中犬类 -> 有狗 任一阶段“有狗” => 保留;两者都“无狗” => 立即移动到 non_dog/ """
import os from pathlib import Path import shutil
import torch from ultralytics import YOLO from PIL import Image from torchvision.models import resnet50, ResNet50_Weights
os.environ["OMP_NUM_THREADS"] = "12" os.environ["MKL_NUM_THREADS"] = "12" torch.set_num_threads(12) DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" torch.backends.cudnn.benchmark = True
ROOT = Path(__file__).resolve().parent DOG_DIR = ROOT / "dog" NON_DOG_DIR = ROOT / "non_dog" MODEL_WEIGHTS = "yolov8l.pt" IMG_SIZE = 896 CONF_DET = 0.25 MIN_DOG_AREA_RATIO = 0.0025 TOPK_CLS = 5 DET_BATCH = 2 CLS_BATCH = 128 DET_WORKERS = 12 IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff", ".webp"}
def iter_images(root: Path): return [p for p in root.rglob("*") if p.is_file() and p.suffix.lower() in IMG_EXTS]
def unique_destination(dst: Path) -> Path: if not dst.exists(): return dst i = 1 while True: cand = dst.with_name(f"{dst.stem}__dup{i}{dst.suffix}") if not cand.exists(): return cand i += 1
def dog_class_ids_from_names(names) -> set: ids = set() if isinstance(names, dict): for k, v in names.items(): if str(v).lower() == "dog": ids.add(int(k)) elif isinstance(names, (list, tuple)): for i, v in enumerate(names): if str(v).lower() == "dog": ids.add(int(i)) return ids or {16}
def has_dog_by_detection_result(res, min_area_ratio: float): boxes = res.boxes h, w = res.orig_shape img_area = float(max(1, h) * max(1, w)) cls_list = boxes.cls.int().tolist() if boxes.cls is not None else [] xyxy_list = boxes.xyxy.tolist() if boxes.xyxy is not None else [] max_ar = 0.0 total_dog = 0 valid = 0 names_map = res.names dog_ids = dog_class_ids_from_names(names_map) for ci, (x1, y1, x2, y2) in zip(cls_list, xyxy_list): if ci in dog_ids: total_dog += 1 bw = float(x2) - float(x1) bh = float(y2) - float(y1) ar = max(0.0, (bw * bh) / img_area) if ar > max_ar: max_ar = ar if ar >= min_area_ratio: valid += 1 return (valid > 0), max_ar, valid, total_dog
def main(): assert DOG_DIR.is_dir(), f"未找到 dog 目录:{DOG_DIR}" NON_DOG_DIR.mkdir(parents=True, exist_ok=True)
img_paths = iter_images(DOG_DIR) total = len(img_paths) assert total > 0, f"{DOG_DIR} 下未发现图片。支持扩展名:{sorted(IMG_EXTS)}"
print(f"[信息] 设备: {DEVICE} | 模型: {MODEL_WEIGHTS} | imgsz={IMG_SIZE} | TTA=False | conf={CONF_DET}") print(f"[信息] 线程: OMP/MKL/torch={12} | Ultralytics workers={DET_WORKERS}") print(f"[信息] 待处理图片:{total}\n", flush=True)
det_model = YOLO(MODEL_WEIGHTS) weights = ResNet50_Weights.DEFAULT clf_model = resnet50(weights=weights).to(DEVICE).eval() preprocess = weights.transforms() categories = [c.lower() for c in weights.meta["categories"]] dog_keywords = { "dog","hound","retriever","terrier","spaniel","poodle","shepherd","greyhound","whippet", "bulldog","mastiff","pinscher","pug","boxer","beagle","chihuahua","papillon","malamute", "husky","samoyed","eskimo","corgi","dachshund","doberman","rottweiler","collie","borzoi", "basenji","pomeranian","newfoundland","kuvasz","komondor","pyrenees","akita","setter", "pointer","lhasa","weimaraner","vizsla","affenpinscher","briard","bouvier","elkhound", "otterhound","bloodhound","foxhound","wolfhound","saluki","afghan hound","malinois", "groenendael","keeshond","schipperke","kelpie","pyrenean","sheepdog","mudi","puli", "tibetan mastiff","tibetan terrier","irish terrier","scottish deerhound","irish wolfhound", "great dane","great pyrenees","shih-tzu","shihtzu","toy terrier","springer","cocker", "schnauzer","spitz","ridgeback","rhodesian ridgeback","airedale","cairn","yorkshire", "norfolk","norwich","silky","border collie","old english sheepdog" }
def cls_is_dog_topk_labels(tensor_batch): with torch.inference_mode(): probs = clf_model(tensor_batch).softmax(dim=1) _, topk_idx = probs.topk(TOPK_CLS, dim=1) hits = [] for row in topk_idx: hit = "" for idx in row.tolist(): lbl = categories[idx] if any(kw in lbl for kw in dog_keywords): hit = lbl break hits.append(hit) return hits
moved = 0 kept = 0 processed = 0
for s in range(0, total, DET_BATCH): batch = img_paths[s:s + DET_BATCH]
det_results = det_model.predict( source=[str(p) for p in batch], conf=CONF_DET, imgsz=IMG_SIZE, device=DEVICE, augment=False, verbose=False, workers=DET_WORKERS )
need_cls_indices = [] det_info = {} for off, res in enumerate(det_results): has, max_ar, valid, tot = has_dog_by_detection_result(res, MIN_DOG_AREA_RATIO) det_info[off] = (has, max_ar, valid, tot) if not has: need_cls_indices.append(off)
cls_hits = {} if need_cls_indices: imgs = [] for off in need_cls_indices: p = batch[off] with Image.open(p) as im: imgs.append(preprocess(im.convert("RGB"))) x = torch.stack(imgs, dim=0).to(DEVICE) labels = cls_is_dog_topk_labels(x) for off, lab in zip(need_cls_indices, labels): cls_hits[off] = lab
for off, src in enumerate(batch): i_global = s + off has_det, max_ar, valid_cnt, total_cnt = det_info[off] lab = cls_hits.get(off, "") keep = has_det or (lab != "")
if keep: decided_by = "det" if has_det else "cls" kept += 1 print(f"[{i_global+1:>6}/{total}] KEEP | {src.relative_to(DOG_DIR)} " f"| 决策:{decided_by:3s} | det_max_area:{max_ar:.4%} | cls_hit:{lab}", flush=True) else: dst = NON_DOG_DIR / src.relative_to(DOG_DIR) dst.parent.mkdir(parents=True, exist_ok=True) dst = unique_destination(dst) shutil.move(str(src), str(dst)) moved += 1 print(f"[{i_global+1:>6}/{total}] MOVE | {src.relative_to(DOG_DIR)} -> {dst.relative_to(NON_DOG_DIR)} " f"| 决策:none | det_max_area:{max_ar:.4%}", flush=True)
processed += 1
print("\n===== 完成 =====", flush=True) print(f"总计图片:{total}", flush=True) print(f"狗图片(保留在 dog/):{kept}", flush=True) print(f"非狗图片(已移动到 non_dog/):{moved}", flush=True) print(f"输出目录:{NON_DOG_DIR.resolve()}", flush=True)
if __name__ == "__main__": main()
|