| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- # flake8: noqa
- # pyre-unsafe
- import os
- import numpy as np
- from scipy.optimize import linear_sum_assignment
- from .. import _timing
- from ._base_metric import _BaseMetric
- class HOTA(_BaseMetric):
- """Class which implements the HOTA metrics.
- See: https://link.springer.com/article/10.1007/s11263-020-01375-2
- """
- def __init__(self, config=None):
- super().__init__()
- self.plottable = True
- self.array_labels = np.arange(0.05, 0.99, 0.05)
- self.integer_array_fields = ["HOTA_TP", "HOTA_FN", "HOTA_FP"]
- self.float_array_fields = [
- "HOTA",
- "DetA",
- "AssA",
- "DetRe",
- "DetPr",
- "AssRe",
- "AssPr",
- "LocA",
- "OWTA",
- ]
- self.float_fields = ["HOTA(0)", "LocA(0)", "HOTALocA(0)"]
- self.fields = (
- self.float_array_fields + self.integer_array_fields + self.float_fields
- )
- self.summary_fields = self.float_array_fields + self.float_fields
- @_timing.time
- def eval_sequence(self, data):
- """Calculates the HOTA metrics for one sequence"""
- # Initialise results
- res = {}
- for field in self.float_array_fields + self.integer_array_fields:
- res[field] = np.zeros((len(self.array_labels)), dtype=float)
- for field in self.float_fields:
- res[field] = 0
- # Return result quickly if tracker or gt sequence is empty
- if data["num_tracker_dets"] == 0:
- res["HOTA_FN"] = data["num_gt_dets"] * np.ones(
- (len(self.array_labels)), dtype=float
- )
- res["LocA"] = np.ones((len(self.array_labels)), dtype=float)
- res["LocA(0)"] = 1.0
- return res
- if data["num_gt_dets"] == 0:
- res["HOTA_FP"] = data["num_tracker_dets"] * np.ones(
- (len(self.array_labels)), dtype=float
- )
- res["LocA"] = np.ones((len(self.array_labels)), dtype=float)
- res["LocA(0)"] = 1.0
- return res
- # Variables counting global association
- potential_matches_count = np.zeros(
- (data["num_gt_ids"], data["num_tracker_ids"])
- )
- gt_id_count = np.zeros((data["num_gt_ids"], 1))
- tracker_id_count = np.zeros((1, data["num_tracker_ids"]))
- # First loop through each timestep and accumulate global track information.
- for t, (gt_ids_t, tracker_ids_t) in enumerate(
- zip(data["gt_ids"], data["tracker_ids"])
- ):
- # Count the potential matches between ids in each timestep
- # These are normalised, weighted by the match similarity.
- similarity = data["similarity_scores"][t]
- sim_iou_denom = (
- similarity.sum(0)[np.newaxis, :]
- + similarity.sum(1)[:, np.newaxis]
- - similarity
- )
- sim_iou = np.zeros_like(similarity)
- sim_iou_mask = sim_iou_denom > 0 + np.finfo("float").eps
- sim_iou[sim_iou_mask] = (
- similarity[sim_iou_mask] / sim_iou_denom[sim_iou_mask]
- )
- potential_matches_count[
- gt_ids_t[:, np.newaxis], tracker_ids_t[np.newaxis, :]
- ] += sim_iou
- # Calculate the total number of dets for each gt_id and tracker_id.
- gt_id_count[gt_ids_t] += 1
- tracker_id_count[0, tracker_ids_t] += 1
- # Calculate overall jaccard alignment score (before unique matching) between IDs
- global_alignment_score = potential_matches_count / (
- gt_id_count + tracker_id_count - potential_matches_count
- )
- matches_counts = [
- np.zeros_like(potential_matches_count) for _ in self.array_labels
- ]
- # Calculate scores for each timestep
- for t, (gt_ids_t, tracker_ids_t) in enumerate(
- zip(data["gt_ids"], data["tracker_ids"])
- ):
- # Deal with the case that there are no gt_det/tracker_det in a timestep.
- if len(gt_ids_t) == 0:
- for a, alpha in enumerate(self.array_labels):
- res["HOTA_FP"][a] += len(tracker_ids_t)
- continue
- if len(tracker_ids_t) == 0:
- for a, alpha in enumerate(self.array_labels):
- res["HOTA_FN"][a] += len(gt_ids_t)
- continue
- # Get matching scores between pairs of dets for optimizing HOTA
- similarity = data["similarity_scores"][t]
- score_mat = (
- global_alignment_score[
- gt_ids_t[:, np.newaxis], tracker_ids_t[np.newaxis, :]
- ]
- * similarity
- )
- # Hungarian algorithm to find best matches
- match_rows, match_cols = linear_sum_assignment(-score_mat)
- # Calculate and accumulate basic statistics
- for a, alpha in enumerate(self.array_labels):
- actually_matched_mask = (
- similarity[match_rows, match_cols] >= alpha - np.finfo("float").eps
- )
- alpha_match_rows = match_rows[actually_matched_mask]
- alpha_match_cols = match_cols[actually_matched_mask]
- num_matches = len(alpha_match_rows)
- res["HOTA_TP"][a] += num_matches
- res["HOTA_FN"][a] += len(gt_ids_t) - num_matches
- res["HOTA_FP"][a] += len(tracker_ids_t) - num_matches
- if num_matches > 0:
- res["LocA"][a] += sum(
- similarity[alpha_match_rows, alpha_match_cols]
- )
- matches_counts[a][
- gt_ids_t[alpha_match_rows], tracker_ids_t[alpha_match_cols]
- ] += 1
- # Calculate association scores (AssA, AssRe, AssPr) for the alpha value.
- # First calculate scores per gt_id/tracker_id combo and then average over the number of detections.
- for a, alpha in enumerate(self.array_labels):
- matches_count = matches_counts[a]
- ass_a = matches_count / np.maximum(
- 1, gt_id_count + tracker_id_count - matches_count
- )
- res["AssA"][a] = np.sum(matches_count * ass_a) / np.maximum(
- 1, res["HOTA_TP"][a]
- )
- ass_re = matches_count / np.maximum(1, gt_id_count)
- res["AssRe"][a] = np.sum(matches_count * ass_re) / np.maximum(
- 1, res["HOTA_TP"][a]
- )
- ass_pr = matches_count / np.maximum(1, tracker_id_count)
- res["AssPr"][a] = np.sum(matches_count * ass_pr) / np.maximum(
- 1, res["HOTA_TP"][a]
- )
- # Calculate final scores
- res["LocA"] = np.maximum(1e-10, res["LocA"]) / np.maximum(1e-10, res["HOTA_TP"])
- res = self._compute_final_fields(res)
- return res
- def combine_sequences(self, all_res):
- """Combines metrics across all sequences"""
- res = {}
- for field in self.integer_array_fields:
- res[field] = self._combine_sum(all_res, field)
- for field in ["AssRe", "AssPr", "AssA"]:
- res[field] = self._combine_weighted_av(
- all_res, field, res, weight_field="HOTA_TP"
- )
- loca_weighted_sum = sum(
- [all_res[k]["LocA"] * all_res[k]["HOTA_TP"] for k in all_res.keys()]
- )
- res["LocA"] = np.maximum(1e-10, loca_weighted_sum) / np.maximum(
- 1e-10, res["HOTA_TP"]
- )
- res = self._compute_final_fields(res)
- return res
- def combine_classes_class_averaged(self, all_res, ignore_empty_classes=False):
- """Combines metrics across all classes by averaging over the class values.
- If 'ignore_empty_classes' is True, then it only sums over classes with at least one gt or predicted detection.
- """
- res = {}
- for field in self.integer_array_fields:
- if ignore_empty_classes:
- res[field] = self._combine_sum(
- {
- k: v
- for k, v in all_res.items()
- if (
- v["HOTA_TP"] + v["HOTA_FN"] + v["HOTA_FP"]
- > 0 + np.finfo("float").eps
- ).any()
- },
- field,
- )
- else:
- res[field] = self._combine_sum(
- {k: v for k, v in all_res.items()}, field
- )
- for field in self.float_fields + self.float_array_fields:
- if ignore_empty_classes:
- res[field] = np.mean(
- [
- v[field]
- for v in all_res.values()
- if (
- v["HOTA_TP"] + v["HOTA_FN"] + v["HOTA_FP"]
- > 0 + np.finfo("float").eps
- ).any()
- ],
- axis=0,
- )
- else:
- res[field] = np.mean([v[field] for v in all_res.values()], axis=0)
- return res
- def combine_classes_det_averaged(self, all_res):
- """Combines metrics across all classes by averaging over the detection values"""
- res = {}
- for field in self.integer_array_fields:
- res[field] = self._combine_sum(all_res, field)
- for field in ["AssRe", "AssPr", "AssA"]:
- res[field] = self._combine_weighted_av(
- all_res, field, res, weight_field="HOTA_TP"
- )
- loca_weighted_sum = sum(
- [all_res[k]["LocA"] * all_res[k]["HOTA_TP"] for k in all_res.keys()]
- )
- res["LocA"] = np.maximum(1e-10, loca_weighted_sum) / np.maximum(
- 1e-10, res["HOTA_TP"]
- )
- res = self._compute_final_fields(res)
- return res
- @staticmethod
- def _compute_final_fields(res):
- """Calculate sub-metric ('field') values which only depend on other sub-metric values.
- This function is used both for both per-sequence calculation, and in combining values across sequences.
- """
- res["DetRe"] = res["HOTA_TP"] / np.maximum(1, res["HOTA_TP"] + res["HOTA_FN"])
- res["DetPr"] = res["HOTA_TP"] / np.maximum(1, res["HOTA_TP"] + res["HOTA_FP"])
- res["DetA"] = res["HOTA_TP"] / np.maximum(
- 1, res["HOTA_TP"] + res["HOTA_FN"] + res["HOTA_FP"]
- )
- res["HOTA"] = np.sqrt(res["DetA"] * res["AssA"])
- res["OWTA"] = np.sqrt(res["DetRe"] * res["AssA"])
- res["HOTA(0)"] = res["HOTA"][0]
- res["LocA(0)"] = res["LocA"][0]
- res["HOTALocA(0)"] = res["HOTA(0)"] * res["LocA(0)"]
- return res
- def plot_single_tracker_results(self, table_res, tracker, cls, output_folder):
- """Create plot of results"""
- # Only loaded when run to reduce minimum requirements
- from matplotlib import pyplot as plt
- res = table_res["COMBINED_SEQ"]
- styles_to_plot = ["r", "b", "g", "b--", "b:", "g--", "g:", "m"]
- for name, style in zip(self.float_array_fields, styles_to_plot):
- plt.plot(self.array_labels, res[name], style)
- plt.xlabel("alpha")
- plt.ylabel("score")
- plt.title(tracker + " - " + cls)
- plt.axis([0, 1, 0, 1])
- legend = []
- for name in self.float_array_fields:
- legend += [name + " (" + str(np.round(np.mean(res[name]), 2)) + ")"]
- plt.legend(legend, loc="lower left")
- out_file = os.path.join(output_folder, cls + "_plot.pdf")
- os.makedirs(os.path.dirname(out_file), exist_ok=True)
- plt.savefig(out_file)
- plt.savefig(out_file.replace(".pdf", ".png"))
- plt.clf()
|