hota.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. # flake8: noqa
  2. # pyre-unsafe
  3. import os
  4. import numpy as np
  5. from scipy.optimize import linear_sum_assignment
  6. from .. import _timing
  7. from ._base_metric import _BaseMetric
  8. class HOTA(_BaseMetric):
  9. """Class which implements the HOTA metrics.
  10. See: https://link.springer.com/article/10.1007/s11263-020-01375-2
  11. """
  12. def __init__(self, config=None):
  13. super().__init__()
  14. self.plottable = True
  15. self.array_labels = np.arange(0.05, 0.99, 0.05)
  16. self.integer_array_fields = ["HOTA_TP", "HOTA_FN", "HOTA_FP"]
  17. self.float_array_fields = [
  18. "HOTA",
  19. "DetA",
  20. "AssA",
  21. "DetRe",
  22. "DetPr",
  23. "AssRe",
  24. "AssPr",
  25. "LocA",
  26. "OWTA",
  27. ]
  28. self.float_fields = ["HOTA(0)", "LocA(0)", "HOTALocA(0)"]
  29. self.fields = (
  30. self.float_array_fields + self.integer_array_fields + self.float_fields
  31. )
  32. self.summary_fields = self.float_array_fields + self.float_fields
  33. @_timing.time
  34. def eval_sequence(self, data):
  35. """Calculates the HOTA metrics for one sequence"""
  36. # Initialise results
  37. res = {}
  38. for field in self.float_array_fields + self.integer_array_fields:
  39. res[field] = np.zeros((len(self.array_labels)), dtype=float)
  40. for field in self.float_fields:
  41. res[field] = 0
  42. # Return result quickly if tracker or gt sequence is empty
  43. if data["num_tracker_dets"] == 0:
  44. res["HOTA_FN"] = data["num_gt_dets"] * np.ones(
  45. (len(self.array_labels)), dtype=float
  46. )
  47. res["LocA"] = np.ones((len(self.array_labels)), dtype=float)
  48. res["LocA(0)"] = 1.0
  49. return res
  50. if data["num_gt_dets"] == 0:
  51. res["HOTA_FP"] = data["num_tracker_dets"] * np.ones(
  52. (len(self.array_labels)), dtype=float
  53. )
  54. res["LocA"] = np.ones((len(self.array_labels)), dtype=float)
  55. res["LocA(0)"] = 1.0
  56. return res
  57. # Variables counting global association
  58. potential_matches_count = np.zeros(
  59. (data["num_gt_ids"], data["num_tracker_ids"])
  60. )
  61. gt_id_count = np.zeros((data["num_gt_ids"], 1))
  62. tracker_id_count = np.zeros((1, data["num_tracker_ids"]))
  63. # First loop through each timestep and accumulate global track information.
  64. for t, (gt_ids_t, tracker_ids_t) in enumerate(
  65. zip(data["gt_ids"], data["tracker_ids"])
  66. ):
  67. # Count the potential matches between ids in each timestep
  68. # These are normalised, weighted by the match similarity.
  69. similarity = data["similarity_scores"][t]
  70. sim_iou_denom = (
  71. similarity.sum(0)[np.newaxis, :]
  72. + similarity.sum(1)[:, np.newaxis]
  73. - similarity
  74. )
  75. sim_iou = np.zeros_like(similarity)
  76. sim_iou_mask = sim_iou_denom > 0 + np.finfo("float").eps
  77. sim_iou[sim_iou_mask] = (
  78. similarity[sim_iou_mask] / sim_iou_denom[sim_iou_mask]
  79. )
  80. potential_matches_count[
  81. gt_ids_t[:, np.newaxis], tracker_ids_t[np.newaxis, :]
  82. ] += sim_iou
  83. # Calculate the total number of dets for each gt_id and tracker_id.
  84. gt_id_count[gt_ids_t] += 1
  85. tracker_id_count[0, tracker_ids_t] += 1
  86. # Calculate overall jaccard alignment score (before unique matching) between IDs
  87. global_alignment_score = potential_matches_count / (
  88. gt_id_count + tracker_id_count - potential_matches_count
  89. )
  90. matches_counts = [
  91. np.zeros_like(potential_matches_count) for _ in self.array_labels
  92. ]
  93. # Calculate scores for each timestep
  94. for t, (gt_ids_t, tracker_ids_t) in enumerate(
  95. zip(data["gt_ids"], data["tracker_ids"])
  96. ):
  97. # Deal with the case that there are no gt_det/tracker_det in a timestep.
  98. if len(gt_ids_t) == 0:
  99. for a, alpha in enumerate(self.array_labels):
  100. res["HOTA_FP"][a] += len(tracker_ids_t)
  101. continue
  102. if len(tracker_ids_t) == 0:
  103. for a, alpha in enumerate(self.array_labels):
  104. res["HOTA_FN"][a] += len(gt_ids_t)
  105. continue
  106. # Get matching scores between pairs of dets for optimizing HOTA
  107. similarity = data["similarity_scores"][t]
  108. score_mat = (
  109. global_alignment_score[
  110. gt_ids_t[:, np.newaxis], tracker_ids_t[np.newaxis, :]
  111. ]
  112. * similarity
  113. )
  114. # Hungarian algorithm to find best matches
  115. match_rows, match_cols = linear_sum_assignment(-score_mat)
  116. # Calculate and accumulate basic statistics
  117. for a, alpha in enumerate(self.array_labels):
  118. actually_matched_mask = (
  119. similarity[match_rows, match_cols] >= alpha - np.finfo("float").eps
  120. )
  121. alpha_match_rows = match_rows[actually_matched_mask]
  122. alpha_match_cols = match_cols[actually_matched_mask]
  123. num_matches = len(alpha_match_rows)
  124. res["HOTA_TP"][a] += num_matches
  125. res["HOTA_FN"][a] += len(gt_ids_t) - num_matches
  126. res["HOTA_FP"][a] += len(tracker_ids_t) - num_matches
  127. if num_matches > 0:
  128. res["LocA"][a] += sum(
  129. similarity[alpha_match_rows, alpha_match_cols]
  130. )
  131. matches_counts[a][
  132. gt_ids_t[alpha_match_rows], tracker_ids_t[alpha_match_cols]
  133. ] += 1
  134. # Calculate association scores (AssA, AssRe, AssPr) for the alpha value.
  135. # First calculate scores per gt_id/tracker_id combo and then average over the number of detections.
  136. for a, alpha in enumerate(self.array_labels):
  137. matches_count = matches_counts[a]
  138. ass_a = matches_count / np.maximum(
  139. 1, gt_id_count + tracker_id_count - matches_count
  140. )
  141. res["AssA"][a] = np.sum(matches_count * ass_a) / np.maximum(
  142. 1, res["HOTA_TP"][a]
  143. )
  144. ass_re = matches_count / np.maximum(1, gt_id_count)
  145. res["AssRe"][a] = np.sum(matches_count * ass_re) / np.maximum(
  146. 1, res["HOTA_TP"][a]
  147. )
  148. ass_pr = matches_count / np.maximum(1, tracker_id_count)
  149. res["AssPr"][a] = np.sum(matches_count * ass_pr) / np.maximum(
  150. 1, res["HOTA_TP"][a]
  151. )
  152. # Calculate final scores
  153. res["LocA"] = np.maximum(1e-10, res["LocA"]) / np.maximum(1e-10, res["HOTA_TP"])
  154. res = self._compute_final_fields(res)
  155. return res
  156. def combine_sequences(self, all_res):
  157. """Combines metrics across all sequences"""
  158. res = {}
  159. for field in self.integer_array_fields:
  160. res[field] = self._combine_sum(all_res, field)
  161. for field in ["AssRe", "AssPr", "AssA"]:
  162. res[field] = self._combine_weighted_av(
  163. all_res, field, res, weight_field="HOTA_TP"
  164. )
  165. loca_weighted_sum = sum(
  166. [all_res[k]["LocA"] * all_res[k]["HOTA_TP"] for k in all_res.keys()]
  167. )
  168. res["LocA"] = np.maximum(1e-10, loca_weighted_sum) / np.maximum(
  169. 1e-10, res["HOTA_TP"]
  170. )
  171. res = self._compute_final_fields(res)
  172. return res
  173. def combine_classes_class_averaged(self, all_res, ignore_empty_classes=False):
  174. """Combines metrics across all classes by averaging over the class values.
  175. If 'ignore_empty_classes' is True, then it only sums over classes with at least one gt or predicted detection.
  176. """
  177. res = {}
  178. for field in self.integer_array_fields:
  179. if ignore_empty_classes:
  180. res[field] = self._combine_sum(
  181. {
  182. k: v
  183. for k, v in all_res.items()
  184. if (
  185. v["HOTA_TP"] + v["HOTA_FN"] + v["HOTA_FP"]
  186. > 0 + np.finfo("float").eps
  187. ).any()
  188. },
  189. field,
  190. )
  191. else:
  192. res[field] = self._combine_sum(
  193. {k: v for k, v in all_res.items()}, field
  194. )
  195. for field in self.float_fields + self.float_array_fields:
  196. if ignore_empty_classes:
  197. res[field] = np.mean(
  198. [
  199. v[field]
  200. for v in all_res.values()
  201. if (
  202. v["HOTA_TP"] + v["HOTA_FN"] + v["HOTA_FP"]
  203. > 0 + np.finfo("float").eps
  204. ).any()
  205. ],
  206. axis=0,
  207. )
  208. else:
  209. res[field] = np.mean([v[field] for v in all_res.values()], axis=0)
  210. return res
  211. def combine_classes_det_averaged(self, all_res):
  212. """Combines metrics across all classes by averaging over the detection values"""
  213. res = {}
  214. for field in self.integer_array_fields:
  215. res[field] = self._combine_sum(all_res, field)
  216. for field in ["AssRe", "AssPr", "AssA"]:
  217. res[field] = self._combine_weighted_av(
  218. all_res, field, res, weight_field="HOTA_TP"
  219. )
  220. loca_weighted_sum = sum(
  221. [all_res[k]["LocA"] * all_res[k]["HOTA_TP"] for k in all_res.keys()]
  222. )
  223. res["LocA"] = np.maximum(1e-10, loca_weighted_sum) / np.maximum(
  224. 1e-10, res["HOTA_TP"]
  225. )
  226. res = self._compute_final_fields(res)
  227. return res
  228. @staticmethod
  229. def _compute_final_fields(res):
  230. """Calculate sub-metric ('field') values which only depend on other sub-metric values.
  231. This function is used both for both per-sequence calculation, and in combining values across sequences.
  232. """
  233. res["DetRe"] = res["HOTA_TP"] / np.maximum(1, res["HOTA_TP"] + res["HOTA_FN"])
  234. res["DetPr"] = res["HOTA_TP"] / np.maximum(1, res["HOTA_TP"] + res["HOTA_FP"])
  235. res["DetA"] = res["HOTA_TP"] / np.maximum(
  236. 1, res["HOTA_TP"] + res["HOTA_FN"] + res["HOTA_FP"]
  237. )
  238. res["HOTA"] = np.sqrt(res["DetA"] * res["AssA"])
  239. res["OWTA"] = np.sqrt(res["DetRe"] * res["AssA"])
  240. res["HOTA(0)"] = res["HOTA"][0]
  241. res["LocA(0)"] = res["LocA"][0]
  242. res["HOTALocA(0)"] = res["HOTA(0)"] * res["LocA(0)"]
  243. return res
  244. def plot_single_tracker_results(self, table_res, tracker, cls, output_folder):
  245. """Create plot of results"""
  246. # Only loaded when run to reduce minimum requirements
  247. from matplotlib import pyplot as plt
  248. res = table_res["COMBINED_SEQ"]
  249. styles_to_plot = ["r", "b", "g", "b--", "b:", "g--", "g:", "m"]
  250. for name, style in zip(self.float_array_fields, styles_to_plot):
  251. plt.plot(self.array_labels, res[name], style)
  252. plt.xlabel("alpha")
  253. plt.ylabel("score")
  254. plt.title(tracker + " - " + cls)
  255. plt.axis([0, 1, 0, 1])
  256. legend = []
  257. for name in self.float_array_fields:
  258. legend += [name + " (" + str(np.round(np.mean(res[name]), 2)) + ")"]
  259. plt.legend(legend, loc="lower left")
  260. out_file = os.path.join(output_folder, cls + "_plot.pdf")
  261. os.makedirs(os.path.dirname(out_file), exist_ok=True)
  262. plt.savefig(out_file)
  263. plt.savefig(out_file.replace(".pdf", ".png"))
  264. plt.clf()