| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840 |
- # Copyright (c) Meta Platforms, Inc. and affiliates. All Rights Reserved
- # pyre-unsafe
- import json
- import os
- import tempfile
- from collections import defaultdict
- from typing import Dict, Optional, Sequence, Tuple
- import numpy as np
- import pycocotools.mask
- from sam3.eval.cgf1_eval import CGF1_METRICS
- from sam3.eval.conversion_util import (
- convert_ytbvis_to_cocovid_gt,
- convert_ytbvis_to_cocovid_pred,
- )
- from sam3.eval.hota_eval_toolkit.run_ytvis_eval import run_ytvis_eval
- from sam3.eval.teta_eval_toolkit import config, Evaluator, metrics
- from sam3.eval.teta_eval_toolkit.datasets import COCO, TAO
- from sam3.eval.ytvis_coco_wrapper import YTVIS
- from sam3.eval.ytvis_eval import VideoDemoF1Eval, YTVISeval
- from sam3.train.nms_helper import process_frame_level_nms, process_track_level_nms
- def _get_metric_index(metric_name: str, iou_threshold: Optional[float] = None) -> int:
- """
- Find the index of a metric in CGF1_METRICS by name and IoU threshold.
- Args:
- metric_name: Name of the metric (e.g., "cgF1", "precision", "recall")
- iou_threshold: IoU threshold (None for average over 0.5:0.95, or specific value like 0.5, 0.75)
- Returns:
- Index of the metric in CGF1_METRICS
- Raises:
- ValueError: If metric not found
- """
- for idx, metric in enumerate(CGF1_METRICS):
- if metric.name == metric_name and metric.iou_threshold == iou_threshold:
- return idx
- raise ValueError(
- f"Metric '{metric_name}' with IoU threshold {iou_threshold} not found in CGF1_METRICS"
- )
- class BasePredFileEvaluator:
- """A base class for evaluating a prediction file."""
- pass
- class YTVISPredFileEvaluator(BasePredFileEvaluator):
- """Evaluate class mAP for YT-VIS prediction files."""
- def __init__(
- self,
- gt_ann_file: str,
- dataset_name: str = "video",
- iou_types: Optional[Sequence[str]] = None,
- ):
- self.gt_ann_file = gt_ann_file
- self.dataset_name = dataset_name
- self.iou_types = list(iou_types) if iou_types is not None else ["bbox", "segm"]
- assert all(iou_type in ["bbox", "segm"] for iou_type in self.iou_types)
- def evaluate(self, pred_file: str) -> Dict[str, float]:
- # use our internal video evaluation toolkit for YT-VIS pred file
- # (i.e. the same one we're using for video phrase AP)
- results = {}
- use_cats = True # YT-VIS mAP evaluation uses categories
- ytvisGT = YTVIS(self.gt_ann_file, ignore_gt_cats=not use_cats)
- # the original YT-VIS GT annotations have uncompressed RLEs ("counts" is an integer list)
- # rather than compressed RLEs ("counts" is a string), so we first convert them here.
- if "segm" in self.iou_types:
- for ann in ytvisGT.dataset["annotations"]:
- ann["segmentations"] = [
- _compress_rle(rle) for rle in ann["segmentations"]
- ]
- with open(pred_file) as f:
- dt = json.load(f)
- # Our prediction file saves "video_id" and absolute (unnormalized) boxes.
- # Note that we should use the official (original) YT-VIS annotations (i.e. the one
- # saved via "scripts/datasets/training/ytvis_split.py", instead of the one saved
- # via "scripts/api_db_to_ytvis_json.py") in this evaluator, which contain absolute
- # boxes coordinates in its GT annotations.
- for d in dt:
- d["image_id"] = d["video_id"]
- ytvisDT = ytvisGT.loadRes(dt)
- for iou_type in self.iou_types:
- ytvisEval = YTVISeval(ytvisGT, ytvisDT, iou_type)
- # set the area ranges for small, medium, and large objects (using
- # absolute pixel areas) as in the official YT-VIS evaluation toolkit:
- # https://github.com/achalddave/ytvosapi/blob/eca601117c9f86bad084cb91f1d918e9ab665a75/PythonAPI/ytvostools/ytvoseval.py#L538
- ytvisEval.params.areaRng = [
- [0**2, 1e5**2],
- [0**2, 128**2],
- [128**2, 256**2],
- [256**2, 1e5**2],
- ]
- ytvisEval.params.areaRngLbl = ["all", "small", "medium", "large"]
- ytvisEval.params.useCats = use_cats
- ytvisEval.evaluate()
- ytvisEval.accumulate()
- ytvisEval.summarize()
- result_key = f"{self.dataset_name}_{'mask' if iou_type == 'segm' else 'bbox'}_mAP_50_95"
- results[result_key] = ytvisEval.stats[0]
- # video-NP level results not supported for `YTVISPredFileEvaluator` yet
- video_np_level_results = {}
- return results, video_np_level_results
- class VideoPhraseApEvaluator(BasePredFileEvaluator):
- """Evaluate Video Phrase AP with YT-VIS format prediction and GT files."""
- def __init__(
- self,
- gt_ann_file: str,
- dataset_name: str = "video",
- iou_types: Optional[Sequence[str]] = None,
- ):
- self.gt_ann_file = gt_ann_file
- self.dataset_name = dataset_name
- self.iou_types = list(iou_types) if iou_types is not None else ["bbox", "segm"]
- assert all(iou_type in ["bbox", "segm"] for iou_type in self.iou_types)
- def evaluate(self, pred_file: str) -> Dict[str, float]:
- with open(self.gt_ann_file) as f:
- gt = json.load(f)
- with open(pred_file) as f:
- dt = json.load(f)
- # For phrase AP and demo F1 evaluation, we need to remap each pair of (video_id, category_id) to
- # a new unique video_id, so that we don't mix detections from different categories under `useCat=False`
- gt, dt = remap_video_category_pairs_to_unique_video_ids(gt, dt)
- if "segm" in self.iou_types:
- for ann in gt["annotations"]:
- ann["segmentations"] = [
- _compress_rle(rle) for rle in ann["segmentations"]
- ]
- for d in dt:
- d["image_id"] = d["video_id"]
- results = {}
- use_cats = False # Phrase AP evaluation does not use categories
- ytvisGT = YTVIS(annotation_file=None, ignore_gt_cats=not use_cats)
- ytvisGT.dataset = gt
- ytvisGT.createIndex()
- ytvisDT = ytvisGT.loadRes(dt)
- for iou_type in self.iou_types:
- phraseApEval = YTVISeval(ytvisGT, ytvisDT, iou_type)
- # set the area ranges for small, medium, and large objects (using
- # absolute pixel areas) as in the official YT-VIS evaluation toolkit:
- # https://github.com/achalddave/ytvosapi/blob/eca601117c9f86bad084cb91f1d918e9ab665a75/PythonAPI/ytvostools/ytvoseval.py#L538
- phraseApEval.params.areaRng = [
- [0**2, 1e5**2],
- [0**2, 128**2],
- [128**2, 256**2],
- [256**2, 1e5**2],
- ]
- phraseApEval.params.areaRngLbl = ["all", "small", "medium", "large"]
- phraseApEval.params.useCats = use_cats
- phraseApEval.evaluate()
- phraseApEval.accumulate()
- phraseApEval.summarize()
- result_prefix = f"{self.dataset_name}"
- result_prefix += f"_{'mask' if iou_type == 'segm' else 'bbox'}_phrase_ap"
- # fetch Phrase AP results from the corresponding indices in `phraseApEval.stats`
- # (see `_summarizeDets` in https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/pycocotools/cocoeval.py)
- results[result_prefix + "_50_95"] = phraseApEval.stats[0] # IoU=0.5:0.95
- results[result_prefix + "_50"] = phraseApEval.stats[1] # IoU=0.5
- results[result_prefix + "_75"] = phraseApEval.stats[2] # IoU=0.75
- # video-NP level results not supported for `VideoPhraseApEvaluator` yet
- video_np_level_results = {}
- return results, video_np_level_results
- class VideoCGF1Evaluator(BasePredFileEvaluator):
- """Evaluate Video Demo F1 with YT-VIS format prediction and GT files."""
- def __init__(
- self,
- gt_ann_file: str,
- dataset_name: str = "video",
- prob_thresh: float = 0.5,
- iou_types: Optional[Sequence[str]] = None,
- ):
- self.gt_ann_file = gt_ann_file
- self.dataset_name = dataset_name
- self.prob_thresh = prob_thresh
- self.iou_types = list(iou_types) if iou_types is not None else ["bbox", "segm"]
- assert all(iou_type in ["bbox", "segm"] for iou_type in self.iou_types)
- def evaluate(self, pred_file: str) -> Dict[str, float]:
- with open(self.gt_ann_file) as f:
- gt = json.load(f)
- with open(pred_file) as f:
- dt = json.load(f)
- # compute IL_MCC and CG-F1 can only be computed if we have "video_np_pairs" keys in the GT JSON
- compute_ilmcc_and_cgf1 = "video_np_pairs" in gt
- if not compute_ilmcc_and_cgf1:
- print(
- f"Warning: IL_MCC and CG-F1 are not computed for {pred_file=} as it does not have 'video_np_pairs' keys in the GT JSON"
- )
- # For phrase AP and demo F1 evaluation, we need to remap each pair of (video_id, category_id) to
- # a new unique video_id, so that we don't mix detections from different categories under `useCat=False`
- gt, dt = remap_video_category_pairs_to_unique_video_ids(
- gt, dt, add_negative_np_pairs=compute_ilmcc_and_cgf1
- )
- if "segm" in self.iou_types:
- for ann in gt["annotations"]:
- ann["segmentations"] = [
- _compress_rle(rle) for rle in ann["segmentations"]
- ]
- for d in dt:
- d["image_id"] = d["video_id"]
- results = {}
- use_cats = False # Demo F1 evaluation does not use categories
- ytvisGT = YTVIS(annotation_file=None, ignore_gt_cats=not use_cats)
- ytvisGT.dataset = gt
- ytvisGT.createIndex()
- ytvisDT = ytvisGT.loadRes(dt)
- video_np_level_results = {}
- for iou_type in self.iou_types:
- demoF1Eval = VideoDemoF1Eval(ytvisGT, ytvisDT, iou_type, self.prob_thresh)
- demoF1Eval.params.useCats = use_cats
- demoF1Eval.params.areaRng = [[0**2, 1e5**2]]
- demoF1Eval.params.areaRngLbl = ["all"]
- demoF1Eval.params.maxDets = [100000]
- demoF1Eval.evaluate()
- demoF1Eval.accumulate()
- demoF1Eval.summarize()
- result_prefix = f"{self.dataset_name}"
- result_prefix += f"_{'mask' if iou_type == 'segm' else 'bbox'}_demo"
- stats = demoF1Eval.stats
- if compute_ilmcc_and_cgf1:
- # Average IoU threshold (0.5:0.95)
- cgf1_micro_avg_idx = _get_metric_index("cgF1", None)
- positive_micro_f1_avg_idx = _get_metric_index("positive_micro_F1", None)
- ilmcc_avg_idx = _get_metric_index("IL_MCC", None)
- results[result_prefix + "_cgf1_micro_50_95"] = stats[cgf1_micro_avg_idx]
- results[result_prefix + "_ilmcc_50_95"] = stats[ilmcc_avg_idx]
- results[result_prefix + "_positive_micro_f1_50_95"] = stats[
- positive_micro_f1_avg_idx
- ]
- # IoU = 0.5
- cgf1_micro_50_idx = _get_metric_index("cgF1", 0.5)
- positive_micro_f1_50_idx = _get_metric_index("positive_micro_F1", 0.5)
- results[result_prefix + "_cgf1_micro_50"] = stats[cgf1_micro_50_idx]
- results[result_prefix + "_ilmcc_50"] = float(
- np.array(stats[cgf1_micro_50_idx])
- / np.array(stats[positive_micro_f1_50_idx])
- )
- results[result_prefix + "_positive_micro_f1_50"] = stats[
- positive_micro_f1_50_idx
- ]
- # IoU = 0.75
- cgf1_micro_75_idx = _get_metric_index("cgF1", 0.75)
- positive_micro_f1_75_idx = _get_metric_index("positive_micro_F1", 0.75)
- results[result_prefix + "_cgf1_micro_75"] = stats[cgf1_micro_75_idx]
- results[result_prefix + "_ilmcc_75"] = float(
- np.array(stats[cgf1_micro_75_idx])
- / np.array(stats[positive_micro_f1_75_idx])
- )
- results[result_prefix + "_positive_micro_f1_75"] = stats[
- positive_micro_f1_75_idx
- ]
- self.extract_video_np_level_results(demoF1Eval, video_np_level_results)
- return results, video_np_level_results
- def extract_video_np_level_results(self, demoF1Eval, video_np_level_results):
- """Aggregate statistics for video-level metrics."""
- num_iou_thrs = len(demoF1Eval.params.iouThrs)
- iou_50_index = int(np.where(demoF1Eval.params.iouThrs == 0.5)[0])
- iou_75_index = int(np.where(demoF1Eval.params.iouThrs == 0.75)[0])
- result_prefix = "mask" if demoF1Eval.params.iouType == "segm" else "bbox"
- assert len(demoF1Eval.evalImgs) == len(demoF1Eval.cocoGt.dataset["images"])
- for i, video in enumerate(demoF1Eval.cocoGt.dataset["images"]):
- # the original video id and category id before remapping
- video_id = video["orig_video_id"]
- category_id = video["orig_category_id"]
- eval_img_dict = demoF1Eval.evalImgs[i]
- TPs = eval_img_dict.get("TPs", np.zeros(num_iou_thrs, dtype=np.int64))
- FPs = eval_img_dict.get("FPs", np.zeros(num_iou_thrs, dtype=np.int64))
- FNs = eval_img_dict.get("FNs", np.zeros(num_iou_thrs, dtype=np.int64))
- assert len(TPs) == len(FPs) == len(FNs) == num_iou_thrs
- # F1 = 2*TP / (2*TP + FP + FN), and we set F1 to 1.0 if denominator is 0
- denominator = 2 * TPs + FPs + FNs
- F1s = np.where(denominator > 0, 2 * TPs / np.maximum(denominator, 1), 1.0)
- local_results = {
- f"{result_prefix}_TP_50_95": float(TPs.mean()),
- f"{result_prefix}_FP_50_95": float(FPs.mean()),
- f"{result_prefix}_FN_50_95": float(FNs.mean()),
- f"{result_prefix}_F1_50_95": float(F1s.mean()),
- f"{result_prefix}_TP_50": float(TPs[iou_50_index]),
- f"{result_prefix}_FP_50": float(FPs[iou_50_index]),
- f"{result_prefix}_FN_50": float(FNs[iou_50_index]),
- f"{result_prefix}_F1_50": float(F1s[iou_50_index]),
- f"{result_prefix}_TP_75": float(TPs[iou_75_index]),
- f"{result_prefix}_FP_75": float(FPs[iou_75_index]),
- f"{result_prefix}_FN_75": float(FNs[iou_75_index]),
- f"{result_prefix}_F1_75": float(F1s[iou_75_index]),
- }
- if (video_id, category_id) not in video_np_level_results:
- video_np_level_results[(video_id, category_id)] = {}
- video_np_level_results[(video_id, category_id)].update(local_results)
- class VideoTetaEvaluator(BasePredFileEvaluator):
- """Evaluate TETA metric using YouTubeVIS format prediction and GT files."""
- def __init__(
- self,
- gt_ann_file: str,
- dataset_name: str = "video",
- tracker_name: str = "Sam3",
- nms_threshold: float = 0.5,
- nms_strategy: str = "none", # "track", "frame", or "none"
- prob_thresh: float = 0.5,
- is_exhaustive: bool = False,
- use_mask: bool = False,
- num_parallel_cores: int = 8,
- ):
- self.gt_ann_file = gt_ann_file
- self.dataset_name = dataset_name
- self.tracker_name = tracker_name
- self.nms_threshold = nms_threshold
- self.nms_strategy = nms_strategy.lower() # Convert to lowercase for consistency
- self.prob_thresh = prob_thresh
- self.metric_prefix = "TETA"
- self.is_exhaustive = is_exhaustive
- self.use_mask = use_mask
- self.num_parallel_cores = num_parallel_cores
- # Verify NMS strategy is valid
- valid_strategies = ["track", "frame", "none"]
- print("current nms_strategy:", self.nms_strategy)
- if self.nms_strategy not in valid_strategies:
- raise ValueError(
- f"Invalid NMS strategy: {self.nms_strategy}. Must be one of {valid_strategies}"
- )
- print(f"Initialized VideoTetaEvaluator with NMS strategy: {self.nms_strategy}")
- print(f"Probability threshold set to: {self.prob_thresh}")
- print(f"Dataset exhaustivity set to: {self.is_exhaustive}")
- print(f"Tracker name set to: {self.tracker_name}")
- print(f"Dataset name set to: {self.dataset_name}")
- print(f"Use mask set to: {self.use_mask}")
- def process_predictions(self, pred_file: str, tmp_dir: str) -> str:
- """Process predictions with selected NMS strategy"""
- with open(pred_file, "r") as f:
- raw_preds = json.load(f)
- print(f"Processing predictions with {self.nms_strategy} NMS strategy")
- # Filter by score threshold
- if self.prob_thresh > 0:
- raw_preds = [d for d in raw_preds if d["score"] >= self.prob_thresh]
- print(
- f"Filtered to {len(raw_preds)} predictions with score >= {self.prob_thresh}"
- )
- # Group predictions by video_id
- video_groups = defaultdict(list)
- for pred in raw_preds:
- video_groups[pred["video_id"]].append(pred)
- # Process based on NMS strategy
- if self.nms_strategy == "track":
- process_track_level_nms(video_groups, nms_threshold=self.nms_threshold)
- elif self.nms_strategy == "frame":
- process_frame_level_nms(video_groups, nms_threshold=self.nms_threshold)
- elif self.nms_strategy == "none":
- print("Skipping NMS processing as strategy is set to 'none'")
- # No processing needed for "none" strategy
- # Save processed predictions
- processed_preds = [
- track for tracks in video_groups.values() for track in tracks
- ]
- processed_path = os.path.join(tmp_dir, "processed_preds.json")
- with open(processed_path, "w") as f:
- json.dump(processed_preds, f)
- print(f"Saved processed predictions to {processed_path}")
- return processed_path
- def evaluate(self, pred_file: str) -> Tuple[Dict[str, float], Dict]:
- """Main evaluation method"""
- print(f"Evaluating TETA Metric with {self.nms_strategy.upper()} NMS strategy")
- with tempfile.TemporaryDirectory() as tmp_dir:
- # Process predictions first
- processed_pred_file = self.process_predictions(pred_file, tmp_dir)
- # Convert GT to COCO-vid format
- gt_dir = os.path.join(tmp_dir, "gt")
- os.makedirs(gt_dir, exist_ok=True)
- gt_coco_path = os.path.join(gt_dir, "annotations.json")
- convert_ytbvis_to_cocovid_gt(self.gt_ann_file, gt_coco_path)
- # Convert processed predictions to COCO-vid format
- pred_dir = os.path.join(tmp_dir, "predictions")
- tracker_dir = os.path.join(pred_dir, self.tracker_name)
- os.makedirs(tracker_dir, exist_ok=True)
- pred_coco_path = os.path.join(tracker_dir, "track_results_cocofmt.json")
- convert_ytbvis_to_cocovid_pred(
- youtubevis_pred_path=processed_pred_file,
- converted_dataset_path=gt_coco_path,
- output_path=pred_coco_path,
- )
- # Configure TETA evaluator
- default_eval_config = config.get_default_eval_config()
- default_eval_config["PRINT_ONLY_COMBINED"] = True
- default_eval_config["DISPLAY_LESS_PROGRESS"] = True
- default_eval_config["OUTPUT_TEMP_RAW_DATA"] = True
- default_eval_config["NUM_PARALLEL_CORES"] = self.num_parallel_cores
- default_dataset_config = config.get_default_dataset_config()
- default_dataset_config["TRACKERS_TO_EVAL"] = [self.tracker_name]
- default_dataset_config["GT_FOLDER"] = gt_dir
- default_dataset_config["OUTPUT_FOLDER"] = pred_dir
- default_dataset_config["TRACKER_SUB_FOLDER"] = tracker_dir
- default_dataset_config["USE_MASK"] = self.use_mask
- evaluator = Evaluator(default_eval_config)
- if self.is_exhaustive:
- dataset_list = [COCO(default_dataset_config)]
- dataset_parsing_key = "COCO"
- else:
- dataset_list = [TAO(default_dataset_config)]
- dataset_parsing_key = "TAO"
- # Run evaluation
- eval_results, _ = evaluator.evaluate(
- dataset_list, [metrics.TETA(exhaustive=self.is_exhaustive)]
- )
- # Extract and format results
- results = {
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_teta": float(
- eval_results[dataset_parsing_key]["TETA"][0]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_loc_a": float(
- eval_results[dataset_parsing_key]["TETA"][1]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_assoc_a": float(
- eval_results[dataset_parsing_key]["TETA"][2]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_cls_a": float(
- eval_results[dataset_parsing_key]["TETA"][3]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_loc_re": float(
- eval_results[dataset_parsing_key]["TETA"][4]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_loc_pr": float(
- eval_results[dataset_parsing_key]["TETA"][5]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_assoc_re": float(
- eval_results[dataset_parsing_key]["TETA"][6]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_assoc_pr": float(
- eval_results[dataset_parsing_key]["TETA"][7]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_cls_re": float(
- eval_results[dataset_parsing_key]["TETA"][8]
- ),
- f"{self.dataset_name}_{'mask' if self.use_mask else 'bbox'}_cls_pr": float(
- eval_results[dataset_parsing_key]["TETA"][9]
- ),
- }
- # video-NP level results not supported for `VideoTetaEvaluator` yet
- video_np_level_results = {}
- return results, video_np_level_results
- class VideoPhraseHotaEvaluator(BasePredFileEvaluator):
- """Evaluate Video Phrase HOTA with YT-VIS format prediction and GT files."""
- def __init__(
- self,
- gt_ann_file: str,
- dataset_name: str = "video",
- prob_thresh: float = 0.5,
- iou_types: Optional[Sequence[str]] = None,
- compute_video_mot_hota: bool = False,
- ):
- self.gt_ann_file = gt_ann_file
- self.dataset_name = dataset_name
- self.prob_thresh = prob_thresh
- self.metric_prefix = "phrase"
- # the list of metrics to collect from the HOTA evaluation results
- self.metric_to_collect = [
- "HOTA",
- "DetA",
- "AssA",
- "DetRe",
- "DetPr",
- "AssRe",
- "AssPr",
- "LocA",
- "OWTA",
- ]
- self.iou_types = list(iou_types) if iou_types is not None else ["bbox", "segm"]
- assert all(iou_type in ["bbox", "segm"] for iou_type in self.iou_types)
- # If True, compute video MOT HOTA, aggregating predictions/GT from all categories.
- self.compute_video_mot_hota = compute_video_mot_hota
- def evaluate(self, pred_file: str) -> Dict[str, float]:
- # use the YT-VIS evaluation toolkit in TrackEval
- with open(self.gt_ann_file) as f:
- gt = json.load(f)
- with open(pred_file) as f:
- dt = json.load(f)
- # keep only predictions with score above the probability threshold
- dt = [d for d in dt if d["score"] > self.prob_thresh]
- for d in dt:
- assert len(d["areas"]) == len(d["bboxes"])
- assert len(d["areas"]) == len(d["segmentations"])
- # remove empty boxes (otherwise they will count as false positives for during
- # per-frame detection accuracy in HOTA evaluation)
- for t in range(len(d["bboxes"])):
- bbox = d["bboxes"][t]
- if d["areas"][t] == 0 or bbox is None or all(x == 0 for x in bbox):
- d["segmentations"][t] = None
- d["bboxes"][t] = None
- d["areas"][t] = None
- # check that box occurence and mask occurence are consistent
- for bbox, mask, area in zip(d["bboxes"], d["segmentations"], d["areas"]):
- assert (area is None) == (bbox is None)
- assert (area is None) == (mask is None)
- # set all scores to 1.0 for HOTA evaluation (just like Demo F1, the exact score
- # value is not used in HOTA metrics; it will be treated as a detection prediction
- # as long as its score is above the threshold)
- d["score"] = 1.0
- # remap the GT and DT annotations for phrase HOTA evaluation
- gt = _fill_in_ann_height_width(gt)
- if not self.compute_video_mot_hota:
- # remap the GT and DT annotations for phrase HOTA evaluation
- gt, dt = self._remap_gt_dt(gt, dt)
- else:
- # Compute video-level MOT HOTA
- # Apply track-level NMS
- video_groups = defaultdict(list)
- for pred in dt:
- video_groups[pred["video_id"]].append(pred)
- process_track_level_nms(video_groups, nms_threshold=0.5)
- dt = [track for tracks in video_groups.values() for track in tracks]
- # Remap GT track ids for class-agnostic HOTA
- gt, dt = remap_gt_dt_class_agnostic(gt, dt)
- # run the HOTA evaluation using TrackEval on the remapped (video_id, category_id) pairs
- out_dict = {}
- video_np_level_results = {}
- for iou_type in self.iou_types:
- output_res, _ = run_ytvis_eval(
- args=[
- "--METRICS",
- "HOTA",
- "--IOU_TYPE",
- iou_type,
- "--DATASET_NAME",
- self.dataset_name,
- "--USE_PARALLEL",
- "True",
- "--NUM_PARALLEL_CORES",
- "8",
- "--PLOT_CURVES",
- "False",
- "--LOG_ON_ERROR",
- "None",
- "--PRINT_ONLY_COMBINED",
- "True",
- "--OUTPUT_SUMMARY",
- "False",
- "--OUTPUT_DETAILED",
- "False",
- "--TIME_PROGRESS",
- "False",
- "--PRINT_CONFIG",
- "False",
- ],
- gt_json=gt,
- dt_json=dt,
- )
- self.extract_video_np_level_results(
- iou_type=iou_type,
- remapped_gt=gt,
- raw_results=output_res[self.dataset_name]["tracker"],
- video_np_level_results=video_np_level_results,
- )
- def _summarize_results(output_res, iou_type, field, suffix):
- eval_res = output_res[self.dataset_name]["tracker"][field]
- result_prefix = f"{self.dataset_name}_{'mask' if iou_type == 'segm' else 'bbox'}_{suffix}"
- for metric_name in self.metric_to_collect:
- eval_res_hota = eval_res["cls_comb_cls_av"]["HOTA"]
- result_key = f"{result_prefix}_{self.metric_prefix}_{metric_name}"
- result_value = float(np.mean(eval_res_hota[metric_name]))
- out_dict[result_key] = result_value
- _summarize_results(output_res, iou_type, "COMBINED_SEQ", "all")
- if "COMBINED_SEQ_CHALLENGING" in output_res[self.dataset_name]["tracker"]:
- _summarize_results(
- output_res, iou_type, "COMBINED_SEQ_CHALLENGING", "challenging"
- )
- # video-NP level results not supported for `VideoPhraseHotaEvaluator` yet
- return out_dict, video_np_level_results
- def _remap_gt_dt(self, gt, dt):
- # For phrase HOTA evaluation, we need to remap each pair of (video_id, category_id) to
- # a new unique video_id, so that we don't mix detections from different categories
- gt, dt = remap_video_category_pairs_to_unique_video_ids(gt, dt)
- # We further map all the categories to category_id=1 in HOTA evaluation toolkit
- # for phrase HOTA (similar to "useCat=False" for video phrase AP)
- remapped_category_id = 1
- gt["categories"] = [
- {
- "supercategory": "object",
- "id": remapped_category_id,
- "name": "_REMAPPED_FOR_PHRASE_METRICS_",
- }
- ]
- for ann in gt["annotations"]:
- ann["category_id"] = remapped_category_id
- for d in dt:
- d["category_id"] = remapped_category_id
- # To be compatible with the TrackEval YT-VIS evaluation toolkit, we need to give
- # unique filenames to each remapped video, so we add remapped video_id as prefix.
- for video in gt["videos"]:
- new_video_id = video["id"]
- video["file_names"] = [
- f"remapped_vid_{new_video_id:012d}/{name}"
- for name in video["file_names"]
- ]
- return gt, dt
- def extract_video_np_level_results(
- self, iou_type, remapped_gt, raw_results, video_np_level_results
- ):
- """Aggregate statistics for video-level metrics."""
- result_prefix = "mask" if iou_type == "segm" else "bbox"
- for video in remapped_gt["videos"]:
- # the original video id and category id before remapping
- video_id = video["orig_video_id"]
- category_id = video["orig_category_id"]
- video_key = f"remapped_vid_{video['id']:012d}"
- results = raw_results[video_key]["_REMAPPED_FOR_PHRASE_METRICS_"]["HOTA"]
- local_results = {}
- for metric_name in self.metric_to_collect:
- result_key = f"{result_prefix}_{metric_name}"
- local_results[result_key] = float(results[metric_name].mean())
- if (video_id, category_id) not in video_np_level_results:
- video_np_level_results[(video_id, category_id)] = {}
- video_np_level_results[(video_id, category_id)].update(local_results)
- class VideoClassBasedHotaEvaluator(VideoPhraseHotaEvaluator):
- def __init__(
- self,
- gt_ann_file: str,
- dataset_name: str = "video",
- prob_thresh: float = 0.5,
- ):
- super().__init__(gt_ann_file, dataset_name, prob_thresh)
- self.metric_prefix = "class"
- def _remap_gt_dt(self, gt, dt):
- return gt, dt # no remapping needed for class-based HOTA evaluation
- def extract_video_np_level_results(self, *args, **kwargs):
- pass # no video-NP level results for class-based HOTA evaluation
- def _compress_rle(rle):
- """Convert RLEs from uncompressed (integer list) to compressed (string) format."""
- if rle is None:
- return None
- if isinstance(rle["counts"], list):
- rle = pycocotools.mask.frPyObjects(rle, rle["size"][0], rle["size"][1])
- rle["counts"] = rle["counts"].decode()
- return rle
- def remap_video_category_pairs_to_unique_video_ids(
- gt_json, dt_json, add_negative_np_pairs=False
- ):
- """
- Remap each pair of (video_id, category_id) to a new unique video_id. This is useful
- for phrase AP and demo F1 evaluation on videos, where we have `useCat=False` and
- rely on separating different NPs (from the same video) into different new video ids,
- so that we don't mix detections from different categories in computeIoU under `useCat=False`.
- This is consistent with how do we phrase AP and demo F1 evaluation on images, where we
- use a remapped unique coco_image_id for each image-NP pair (based in its query["id"] in
- CustomCocoDetectionAPI.load_queries in modulated_detection_api.py)
- """
- # collect the unique video_id-category_id pairs
- video_id_to_video = {v["id"]: v for v in gt_json["videos"]}
- video_id_category_id_pairs = set()
- for pred in dt_json:
- video_id_category_id_pairs.add((pred["video_id"], pred["category_id"]))
- for ann in gt_json["annotations"]:
- video_id_category_id_pairs.add((ann["video_id"], ann["category_id"]))
- # assign the video_id-category_id pairs to unique video ids
- video_id_category_id_pairs = sorted(video_id_category_id_pairs)
- video_id_category_id_to_new_video_id = {
- pair: (i + 1) for i, pair in enumerate(video_id_category_id_pairs)
- }
- # also map the negative NP pairs -- this is needed for IL_MCC and CG-F1 evaluation
- if add_negative_np_pairs:
- for vnp in gt_json["video_np_pairs"]:
- pair = (vnp["video_id"], vnp["category_id"])
- if pair not in video_id_category_id_to_new_video_id:
- video_id_category_id_to_new_video_id[pair] = (
- len(video_id_category_id_to_new_video_id) + 1
- )
- # map the "video_id" in predictions
- for pred in dt_json:
- pred["video_id"] = video_id_category_id_to_new_video_id[
- (pred["video_id"], pred["category_id"])
- ]
- # map the "video_id" in gt_json["annotations"]
- for ann in gt_json["annotations"]:
- ann["video_id"] = video_id_category_id_to_new_video_id[
- (ann["video_id"], ann["category_id"])
- ]
- # map and duplicate gt_json["videos"]
- new_videos = []
- for (
- video_id,
- category_id,
- ), new_video_id in video_id_category_id_to_new_video_id.items():
- video = video_id_to_video[video_id].copy()
- video["id"] = new_video_id
- # preserve the original video_id and category_id of each remapped video entry,
- # so that we can associate sample-level eval metrics with the original video-NP pairs
- video["orig_video_id"] = video_id
- video["orig_category_id"] = category_id
- new_videos.append(video)
- gt_json["videos"] = new_videos
- return gt_json, dt_json
- def remap_gt_dt_class_agnostic(gt, dt):
- """
- For class-agnostic HOTA, merge all GT tracks for each video (across NPs),
- ensure unique track_ids, and set all category_id to 1.
- Also, add orig_video_id and orig_category_id for compatibility.
- """
- # 1. Remap all GT track_ids to be unique per video
- gt_anns_by_video = defaultdict(list)
- for ann in gt["annotations"]:
- gt_anns_by_video[ann["video_id"]].append(ann)
- # Ensure unique track ids across tracks of all videos
- next_tid = 1
- for _, anns in gt_anns_by_video.items():
- # Map old track_ids to new unique ones
- old_to_new_tid = {}
- for ann in anns:
- old_tid = ann["id"]
- if old_tid not in old_to_new_tid:
- old_to_new_tid[old_tid] = next_tid
- next_tid += 1
- ann["id"] = old_to_new_tid[old_tid]
- # Set category_id to 1 for class-agnostic
- ann["category_id"] = 1
- # Set all GT categories to a single category
- gt["categories"] = [
- {
- "supercategory": "object",
- "id": 1,
- "name": "_REMAPPED_FOR_PHRASE_METRICS_",
- }
- ]
- # Add orig_video_id and orig_category_id to each video for compatibility
- anns_by_video = defaultdict(list)
- for ann in gt["annotations"]:
- anns_by_video[ann["video_id"]].append(ann)
- for video in gt["videos"]:
- video["orig_video_id"] = video["id"]
- # Use the first annotation's original category_id if available, else None
- orig_cat = (
- anns_by_video[video["id"]][0]["category_id"]
- if anns_by_video[video["id"]]
- else None
- )
- video["orig_category_id"] = orig_cat
- video["file_names"] = [
- f"remapped_vid_{video['id']:012d}/{name}" for name in video["file_names"]
- ]
- # Set all DT category_id to 1
- for d in dt:
- d["category_id"] = 1
- return gt, dt
- def _fill_in_ann_height_width(gt_json):
- """Fill in missing height/width in GT annotations from its video info."""
- video_id_to_video = {v["id"]: v for v in gt_json["videos"]}
- for ann in gt_json["annotations"]:
- if "height" not in ann or "width" not in ann:
- video = video_id_to_video[ann["video_id"]]
- if "height" not in ann:
- ann["height"] = video["height"]
- if "width" not in ann:
- ann["width"] = video["width"]
- return gt_json
|