automatic_mask_generator.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. # Copyright (c) Meta Platforms, Inc. and affiliates.
  2. # All rights reserved.
  3. # This source code is licensed under the license found in the
  4. # LICENSE file in the root directory of this source tree.
  5. # Adapted from https://github.com/facebookresearch/segment-anything/blob/main/segment_anything/automatic_mask_generator.py
  6. from typing import Any, Dict, List, Optional, Tuple
  7. import numpy as np
  8. import torch
  9. from torchvision.ops.boxes import batched_nms, box_area # type: ignore
  10. from sam2.modeling.sam2_base import SAM2Base
  11. from sam2.sam2_image_predictor import SAM2ImagePredictor
  12. from sam2.utils.amg import (
  13. area_from_rle,
  14. batch_iterator,
  15. batched_mask_to_box,
  16. box_xyxy_to_xywh,
  17. build_all_layer_point_grids,
  18. calculate_stability_score,
  19. coco_encode_rle,
  20. generate_crop_boxes,
  21. is_box_near_crop_edge,
  22. mask_to_rle_pytorch,
  23. MaskData,
  24. remove_small_regions,
  25. rle_to_mask,
  26. uncrop_boxes_xyxy,
  27. uncrop_masks,
  28. uncrop_points,
  29. )
  30. class SAM2AutomaticMaskGenerator:
  31. def __init__(
  32. self,
  33. model: SAM2Base,
  34. points_per_side: Optional[int] = 32,
  35. points_per_batch: int = 64,
  36. pred_iou_thresh: float = 0.8,
  37. stability_score_thresh: float = 0.95,
  38. stability_score_offset: float = 1.0,
  39. mask_threshold: float = 0.0,
  40. box_nms_thresh: float = 0.7,
  41. crop_n_layers: int = 0,
  42. crop_nms_thresh: float = 0.7,
  43. crop_overlap_ratio: float = 512 / 1500,
  44. crop_n_points_downscale_factor: int = 1,
  45. point_grids: Optional[List[np.ndarray]] = None,
  46. min_mask_region_area: int = 0,
  47. output_mode: str = "binary_mask",
  48. use_m2m: bool = False,
  49. multimask_output: bool = True,
  50. **kwargs,
  51. ) -> None:
  52. """
  53. Using a SAM 2 model, generates masks for the entire image.
  54. Generates a grid of point prompts over the image, then filters
  55. low quality and duplicate masks. The default settings are chosen
  56. for SAM 2 with a HieraL backbone.
  57. Arguments:
  58. model (Sam): The SAM 2 model to use for mask prediction.
  59. points_per_side (int or None): The number of points to be sampled
  60. along one side of the image. The total number of points is
  61. points_per_side**2. If None, 'point_grids' must provide explicit
  62. point sampling.
  63. points_per_batch (int): Sets the number of points run simultaneously
  64. by the model. Higher numbers may be faster but use more GPU memory.
  65. pred_iou_thresh (float): A filtering threshold in [0,1], using the
  66. model's predicted mask quality.
  67. stability_score_thresh (float): A filtering threshold in [0,1], using
  68. the stability of the mask under changes to the cutoff used to binarize
  69. the model's mask predictions.
  70. stability_score_offset (float): The amount to shift the cutoff when
  71. calculated the stability score.
  72. mask_threshold (float): Threshold for binarizing the mask logits
  73. box_nms_thresh (float): The box IoU cutoff used by non-maximal
  74. suppression to filter duplicate masks.
  75. crop_n_layers (int): If >0, mask prediction will be run again on
  76. crops of the image. Sets the number of layers to run, where each
  77. layer has 2**i_layer number of image crops.
  78. crop_nms_thresh (float): The box IoU cutoff used by non-maximal
  79. suppression to filter duplicate masks between different crops.
  80. crop_overlap_ratio (float): Sets the degree to which crops overlap.
  81. In the first crop layer, crops will overlap by this fraction of
  82. the image length. Later layers with more crops scale down this overlap.
  83. crop_n_points_downscale_factor (int): The number of points-per-side
  84. sampled in layer n is scaled down by crop_n_points_downscale_factor**n.
  85. point_grids (list(np.ndarray) or None): A list over explicit grids
  86. of points used for sampling, normalized to [0,1]. The nth grid in the
  87. list is used in the nth crop layer. Exclusive with points_per_side.
  88. min_mask_region_area (int): If >0, postprocessing will be applied
  89. to remove disconnected regions and holes in masks with area smaller
  90. than min_mask_region_area. Requires opencv.
  91. output_mode (str): The form masks are returned in. Can be 'binary_mask',
  92. 'uncompressed_rle', or 'coco_rle'. 'coco_rle' requires pycocotools.
  93. For large resolutions, 'binary_mask' may consume large amounts of
  94. memory.
  95. use_m2m (bool): Whether to add a one step refinement using previous mask predictions.
  96. multimask_output (bool): Whether to output multimask at each point of the grid.
  97. """
  98. assert (points_per_side is None) != (
  99. point_grids is None
  100. ), "Exactly one of points_per_side or point_grid must be provided."
  101. if points_per_side is not None:
  102. self.point_grids = build_all_layer_point_grids(
  103. points_per_side,
  104. crop_n_layers,
  105. crop_n_points_downscale_factor,
  106. )
  107. elif point_grids is not None:
  108. self.point_grids = point_grids
  109. else:
  110. raise ValueError("Can't have both points_per_side and point_grid be None.")
  111. assert output_mode in [
  112. "binary_mask",
  113. "uncompressed_rle",
  114. "coco_rle",
  115. ], f"Unknown output_mode {output_mode}."
  116. if output_mode == "coco_rle":
  117. try:
  118. from pycocotools import mask as mask_utils # type: ignore # noqa: F401
  119. except ImportError as e:
  120. print("Please install pycocotools")
  121. raise e
  122. self.predictor = SAM2ImagePredictor(
  123. model,
  124. max_hole_area=min_mask_region_area,
  125. max_sprinkle_area=min_mask_region_area,
  126. )
  127. self.points_per_batch = points_per_batch
  128. self.pred_iou_thresh = pred_iou_thresh
  129. self.stability_score_thresh = stability_score_thresh
  130. self.stability_score_offset = stability_score_offset
  131. self.mask_threshold = mask_threshold
  132. self.box_nms_thresh = box_nms_thresh
  133. self.crop_n_layers = crop_n_layers
  134. self.crop_nms_thresh = crop_nms_thresh
  135. self.crop_overlap_ratio = crop_overlap_ratio
  136. self.crop_n_points_downscale_factor = crop_n_points_downscale_factor
  137. self.min_mask_region_area = min_mask_region_area
  138. self.output_mode = output_mode
  139. self.use_m2m = use_m2m
  140. self.multimask_output = multimask_output
  141. @classmethod
  142. def from_pretrained(cls, model_id: str, **kwargs) -> "SAM2AutomaticMaskGenerator":
  143. """
  144. Load a pretrained model from the Hugging Face hub.
  145. Arguments:
  146. model_id (str): The Hugging Face repository ID.
  147. **kwargs: Additional arguments to pass to the model constructor.
  148. Returns:
  149. (SAM2AutomaticMaskGenerator): The loaded model.
  150. """
  151. from sam2.build_sam import build_sam2_hf
  152. sam_model = build_sam2_hf(model_id, **kwargs)
  153. return cls(sam_model, **kwargs)
  154. @torch.no_grad()
  155. def generate(self, image: np.ndarray) -> List[Dict[str, Any]]:
  156. """
  157. Generates masks for the given image.
  158. Arguments:
  159. image (np.ndarray): The image to generate masks for, in HWC uint8 format.
  160. Returns:
  161. list(dict(str, any)): A list over records for masks. Each record is
  162. a dict containing the following keys:
  163. segmentation (dict(str, any) or np.ndarray): The mask. If
  164. output_mode='binary_mask', is an array of shape HW. Otherwise,
  165. is a dictionary containing the RLE.
  166. bbox (list(float)): The box around the mask, in XYWH format.
  167. area (int): The area in pixels of the mask.
  168. predicted_iou (float): The model's own prediction of the mask's
  169. quality. This is filtered by the pred_iou_thresh parameter.
  170. point_coords (list(list(float))): The point coordinates input
  171. to the model to generate this mask.
  172. stability_score (float): A measure of the mask's quality. This
  173. is filtered on using the stability_score_thresh parameter.
  174. crop_box (list(float)): The crop of the image used to generate
  175. the mask, given in XYWH format.
  176. """
  177. # Generate masks
  178. mask_data = self._generate_masks(image)
  179. # Encode masks
  180. if self.output_mode == "coco_rle":
  181. mask_data["segmentations"] = [
  182. coco_encode_rle(rle) for rle in mask_data["rles"]
  183. ]
  184. elif self.output_mode == "binary_mask":
  185. mask_data["segmentations"] = [rle_to_mask(rle) for rle in mask_data["rles"]]
  186. else:
  187. mask_data["segmentations"] = mask_data["rles"]
  188. # Write mask records
  189. curr_anns = []
  190. for idx in range(len(mask_data["segmentations"])):
  191. ann = {
  192. "segmentation": mask_data["segmentations"][idx],
  193. "area": area_from_rle(mask_data["rles"][idx]),
  194. "bbox": box_xyxy_to_xywh(mask_data["boxes"][idx]).tolist(),
  195. "predicted_iou": mask_data["iou_preds"][idx].item(),
  196. "point_coords": [mask_data["points"][idx].tolist()],
  197. "stability_score": mask_data["stability_score"][idx].item(),
  198. "crop_box": box_xyxy_to_xywh(mask_data["crop_boxes"][idx]).tolist(),
  199. }
  200. curr_anns.append(ann)
  201. return curr_anns
  202. def _generate_masks(self, image: np.ndarray) -> MaskData:
  203. orig_size = image.shape[:2]
  204. crop_boxes, layer_idxs = generate_crop_boxes(
  205. orig_size, self.crop_n_layers, self.crop_overlap_ratio
  206. )
  207. # Iterate over image crops
  208. data = MaskData()
  209. for crop_box, layer_idx in zip(crop_boxes, layer_idxs):
  210. crop_data = self._process_crop(image, crop_box, layer_idx, orig_size)
  211. data.cat(crop_data)
  212. # Remove duplicate masks between crops
  213. if len(crop_boxes) > 1:
  214. # Prefer masks from smaller crops
  215. scores = 1 / box_area(data["crop_boxes"])
  216. scores = scores.to(data["boxes"].device)
  217. keep_by_nms = batched_nms(
  218. data["boxes"].float(),
  219. scores,
  220. torch.zeros_like(data["boxes"][:, 0]), # categories
  221. iou_threshold=self.crop_nms_thresh,
  222. )
  223. data.filter(keep_by_nms)
  224. data.to_numpy()
  225. return data
  226. def _process_crop(
  227. self,
  228. image: np.ndarray,
  229. crop_box: List[int],
  230. crop_layer_idx: int,
  231. orig_size: Tuple[int, ...],
  232. ) -> MaskData:
  233. # Crop the image and calculate embeddings
  234. x0, y0, x1, y1 = crop_box
  235. cropped_im = image[y0:y1, x0:x1, :]
  236. cropped_im_size = cropped_im.shape[:2]
  237. self.predictor.set_image(cropped_im)
  238. # Get points for this crop
  239. points_scale = np.array(cropped_im_size)[None, ::-1]
  240. points_for_image = self.point_grids[crop_layer_idx] * points_scale
  241. # Generate masks for this crop in batches
  242. data = MaskData()
  243. for (points,) in batch_iterator(self.points_per_batch, points_for_image):
  244. batch_data = self._process_batch(
  245. points, cropped_im_size, crop_box, orig_size, normalize=True
  246. )
  247. data.cat(batch_data)
  248. del batch_data
  249. self.predictor.reset_predictor()
  250. # Remove duplicates within this crop.
  251. keep_by_nms = batched_nms(
  252. data["boxes"].float(),
  253. data["iou_preds"],
  254. torch.zeros_like(data["boxes"][:, 0]), # categories
  255. iou_threshold=self.box_nms_thresh,
  256. )
  257. data.filter(keep_by_nms)
  258. # Return to the original image frame
  259. data["boxes"] = uncrop_boxes_xyxy(data["boxes"], crop_box)
  260. data["points"] = uncrop_points(data["points"], crop_box)
  261. data["crop_boxes"] = torch.tensor([crop_box for _ in range(len(data["rles"]))])
  262. return data
  263. def _process_batch(
  264. self,
  265. points: np.ndarray,
  266. im_size: Tuple[int, ...],
  267. crop_box: List[int],
  268. orig_size: Tuple[int, ...],
  269. normalize=False,
  270. ) -> MaskData:
  271. orig_h, orig_w = orig_size
  272. # Run model on this batch
  273. points = torch.as_tensor(
  274. points, dtype=torch.float32, device=self.predictor.device
  275. )
  276. in_points = self.predictor._transforms.transform_coords(
  277. points, normalize=normalize, orig_hw=im_size
  278. )
  279. in_labels = torch.ones(
  280. in_points.shape[0], dtype=torch.int, device=in_points.device
  281. )
  282. masks, iou_preds, low_res_masks = self.predictor._predict(
  283. in_points[:, None, :],
  284. in_labels[:, None],
  285. multimask_output=self.multimask_output,
  286. return_logits=True,
  287. )
  288. # Serialize predictions and store in MaskData
  289. data = MaskData(
  290. masks=masks.flatten(0, 1),
  291. iou_preds=iou_preds.flatten(0, 1),
  292. points=points.repeat_interleave(masks.shape[1], dim=0),
  293. low_res_masks=low_res_masks.flatten(0, 1),
  294. )
  295. del masks
  296. if not self.use_m2m:
  297. # Filter by predicted IoU
  298. if self.pred_iou_thresh > 0.0:
  299. keep_mask = data["iou_preds"] > self.pred_iou_thresh
  300. data.filter(keep_mask)
  301. # Calculate and filter by stability score
  302. data["stability_score"] = calculate_stability_score(
  303. data["masks"], self.mask_threshold, self.stability_score_offset
  304. )
  305. if self.stability_score_thresh > 0.0:
  306. keep_mask = data["stability_score"] >= self.stability_score_thresh
  307. data.filter(keep_mask)
  308. else:
  309. # One step refinement using previous mask predictions
  310. in_points = self.predictor._transforms.transform_coords(
  311. data["points"], normalize=normalize, orig_hw=im_size
  312. )
  313. labels = torch.ones(
  314. in_points.shape[0], dtype=torch.int, device=in_points.device
  315. )
  316. masks, ious = self.refine_with_m2m(
  317. in_points, labels, data["low_res_masks"], self.points_per_batch
  318. )
  319. data["masks"] = masks.squeeze(1)
  320. data["iou_preds"] = ious.squeeze(1)
  321. if self.pred_iou_thresh > 0.0:
  322. keep_mask = data["iou_preds"] > self.pred_iou_thresh
  323. data.filter(keep_mask)
  324. data["stability_score"] = calculate_stability_score(
  325. data["masks"], self.mask_threshold, self.stability_score_offset
  326. )
  327. if self.stability_score_thresh > 0.0:
  328. keep_mask = data["stability_score"] >= self.stability_score_thresh
  329. data.filter(keep_mask)
  330. # Threshold masks and calculate boxes
  331. data["masks"] = data["masks"] > self.mask_threshold
  332. data["boxes"] = batched_mask_to_box(data["masks"])
  333. # Filter boxes that touch crop boundaries
  334. keep_mask = ~is_box_near_crop_edge(
  335. data["boxes"], crop_box, [0, 0, orig_w, orig_h]
  336. )
  337. if not torch.all(keep_mask):
  338. data.filter(keep_mask)
  339. # Compress to RLE
  340. data["masks"] = uncrop_masks(data["masks"], crop_box, orig_h, orig_w)
  341. data["rles"] = mask_to_rle_pytorch(data["masks"])
  342. del data["masks"]
  343. return data
  344. @staticmethod
  345. def postprocess_small_regions(
  346. mask_data: MaskData, min_area: int, nms_thresh: float
  347. ) -> MaskData:
  348. """
  349. Removes small disconnected regions and holes in masks, then reruns
  350. box NMS to remove any new duplicates.
  351. Edits mask_data in place.
  352. Requires open-cv as a dependency.
  353. """
  354. if len(mask_data["rles"]) == 0:
  355. return mask_data
  356. # Filter small disconnected regions and holes
  357. new_masks = []
  358. scores = []
  359. for rle in mask_data["rles"]:
  360. mask = rle_to_mask(rle)
  361. mask, changed = remove_small_regions(mask, min_area, mode="holes")
  362. unchanged = not changed
  363. mask, changed = remove_small_regions(mask, min_area, mode="islands")
  364. unchanged = unchanged and not changed
  365. new_masks.append(torch.as_tensor(mask).unsqueeze(0))
  366. # Give score=0 to changed masks and score=1 to unchanged masks
  367. # so NMS will prefer ones that didn't need postprocessing
  368. scores.append(float(unchanged))
  369. # Recalculate boxes and remove any new duplicates
  370. masks = torch.cat(new_masks, dim=0)
  371. boxes = batched_mask_to_box(masks)
  372. keep_by_nms = batched_nms(
  373. boxes.float(),
  374. torch.as_tensor(scores),
  375. torch.zeros_like(boxes[:, 0]), # categories
  376. iou_threshold=nms_thresh,
  377. )
  378. # Only recalculate RLEs for masks that have changed
  379. for i_mask in keep_by_nms:
  380. if scores[i_mask] == 0.0:
  381. mask_torch = masks[i_mask].unsqueeze(0)
  382. mask_data["rles"][i_mask] = mask_to_rle_pytorch(mask_torch)[0]
  383. mask_data["boxes"][i_mask] = boxes[i_mask] # update res directly
  384. mask_data.filter(keep_by_nms)
  385. return mask_data
  386. def refine_with_m2m(self, points, point_labels, low_res_masks, points_per_batch):
  387. new_masks = []
  388. new_iou_preds = []
  389. for cur_points, cur_point_labels, low_res_mask in batch_iterator(
  390. points_per_batch, points, point_labels, low_res_masks
  391. ):
  392. best_masks, best_iou_preds, _ = self.predictor._predict(
  393. cur_points[:, None, :],
  394. cur_point_labels[:, None],
  395. mask_input=low_res_mask[:, None, :],
  396. multimask_output=False,
  397. return_logits=True,
  398. )
  399. new_masks.append(best_masks)
  400. new_iou_preds.append(best_iou_preds)
  401. masks = torch.cat(new_masks, dim=0)
  402. return masks, torch.cat(new_iou_preds, dim=0)