| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- # Copyright (c) Meta Platforms, Inc. and affiliates. All Rights Reserved
- # pyre-unsafe
- import logging
- import os
- import random
- import sys
- import traceback
- from argparse import ArgumentParser
- from copy import deepcopy
- import submitit
- import torch
- from hydra import compose, initialize_config_module
- from hydra.utils import instantiate
- from iopath.common.file_io import g_pathmgr
- from omegaconf import OmegaConf
- from sam3.train.utils.train_utils import makedir, register_omegaconf_resolvers
- from tqdm import tqdm
- os.environ["HYDRA_FULL_ERROR"] = "1"
- class SlurmEvent:
- QUEUED = "QUEUED"
- START = "START"
- FINISH = "FINISH"
- JOB_ERROR = "JOB_ERROR"
- SLURM_SIGNAL = "SLURM_SIGNAL"
- def handle_custom_resolving(cfg):
- # We'll resolve the config here, so we can catch mistakes early.
- # However, we need to pass the un-resolved config to the launcher
- # (because DVC resolving needs to be done on the node it will run on)
- # First, do a copy without triggering resolving
- cfg_resolved = OmegaConf.to_container(cfg, resolve=False)
- cfg_resolved = OmegaConf.create(cfg_resolved)
- return cfg_resolved
- def single_proc_run(local_rank, main_port, cfg, world_size):
- """Single GPU process"""
- os.environ["MASTER_ADDR"] = "localhost"
- os.environ["MASTER_PORT"] = str(main_port)
- os.environ["RANK"] = str(local_rank)
- os.environ["LOCAL_RANK"] = str(local_rank)
- os.environ["WORLD_SIZE"] = str(world_size)
- try:
- register_omegaconf_resolvers()
- except Exception as e:
- logging.info(e)
- trainer = instantiate(cfg.trainer, _recursive_=False)
- trainer.run()
- def single_node_runner(cfg, main_port: int):
- assert cfg.launcher.num_nodes == 1
- # assert cfg.launcher.gpus_per_node == 1
- num_proc = cfg.launcher.gpus_per_node
- torch.multiprocessing.set_start_method(
- "spawn"
- ) # CUDA runtime does not support `fork`
- if num_proc == 1:
- # directly call single_proc so we can easily set breakpoints
- # mp.spawn does not let us set breakpoints
- single_proc_run(local_rank=0, main_port=main_port, cfg=cfg, world_size=num_proc)
- else:
- mp_runner = torch.multiprocessing.start_processes
- args = (main_port, cfg, num_proc)
- # Note: using "fork" below, "spawn" causes time and error regressions. Using
- # spawn changes the default multiprocessing context to spawn, which doesn't
- # interact well with the dataloaders (likely due to the use of OpenCV).
- mp_runner(single_proc_run, args=args, nprocs=num_proc, start_method="spawn")
- def format_exception(e: Exception, limit=20):
- traceback_str = "".join(traceback.format_tb(e.__traceback__, limit=limit))
- return f"{type(e).__name__}: {e}\nTraceback:\n{traceback_str}"
- class SubmititRunner(submitit.helpers.Checkpointable):
- """A callable which is passed to submitit to launch the jobs."""
- def __init__(self, port, cfg):
- self.cfg = cfg
- self.port = port
- self.has_setup = False
- def run_trainer(self):
- job_env = submitit.JobEnvironment()
- # Need to add this again so the hydra.job.set_env PYTHONPATH
- # is also set when launching jobs.
- add_pythonpath_to_sys_path()
- os.environ["MASTER_ADDR"] = job_env.hostnames[0]
- os.environ["MASTER_PORT"] = str(self.port)
- os.environ["RANK"] = str(job_env.global_rank)
- os.environ["LOCAL_RANK"] = str(job_env.local_rank)
- os.environ["WORLD_SIZE"] = str(job_env.num_tasks)
- register_omegaconf_resolvers()
- cfg_resolved = OmegaConf.to_container(self.cfg, resolve=False)
- cfg_resolved = OmegaConf.create(cfg_resolved)
- trainer = instantiate(cfg_resolved.trainer, _recursive_=False)
- trainer.run()
- def __call__(self):
- job_env = submitit.JobEnvironment()
- self.setup_job_info(job_env.job_id, job_env.global_rank)
- try:
- self.run_trainer()
- except Exception as e:
- # Log the exception. Then raise it again (as what SubmititRunner currently does).
- message = format_exception(e)
- logging.error(message)
- raise e
- def setup_job_info(self, job_id, rank):
- """Set up slurm job info"""
- self.job_info = {
- "job_id": job_id,
- "rank": rank,
- "cluster": self.cfg.get("cluster", None),
- "experiment_log_dir": self.cfg.launcher.experiment_log_dir,
- }
- self.has_setup = True
- def add_pythonpath_to_sys_path():
- if "PYTHONPATH" not in os.environ or not os.environ["PYTHONPATH"]:
- return
- sys.path = os.environ["PYTHONPATH"].split(":") + sys.path
- def main(args) -> None:
- cfg = compose(config_name=args.config)
- if cfg.launcher.experiment_log_dir is None:
- cfg.launcher.experiment_log_dir = os.path.join(
- os.getcwd(), "sam3_logs", args.config
- )
- print("###################### Train App Config ####################")
- print(OmegaConf.to_yaml(cfg))
- print("############################################################")
- add_pythonpath_to_sys_path()
- makedir(cfg.launcher.experiment_log_dir)
- with g_pathmgr.open(
- os.path.join(cfg.launcher.experiment_log_dir, "config.yaml"), "w"
- ) as f:
- f.write(OmegaConf.to_yaml(cfg))
- cfg_resolved = OmegaConf.to_container(cfg, resolve=False)
- cfg_resolved = OmegaConf.create(cfg_resolved)
- with g_pathmgr.open(
- os.path.join(cfg.launcher.experiment_log_dir, "config_resolved.yaml"), "w"
- ) as f:
- f.write(OmegaConf.to_yaml(cfg_resolved, resolve=True))
- submitit_conf = cfg.get("submitit", None)
- assert submitit_conf is not None, "Missing submitit config"
- experiment_log_dir = cfg.launcher.experiment_log_dir
- print(f"Experiment Log Dir:\n{experiment_log_dir}")
- submitit_dir = os.path.join(experiment_log_dir, "submitit_logs")
- # Prioritize cmd line args
- cfg.launcher.gpus_per_node = (
- args.num_gpus if args.num_gpus is not None else cfg.launcher.gpus_per_node
- )
- cfg.launcher.num_nodes = (
- args.num_nodes if args.num_nodes is not None else cfg.launcher.num_nodes
- )
- submitit_conf.use_cluster = (
- args.use_cluster if args.use_cluster is not None else submitit_conf.use_cluster
- )
- if submitit_conf.use_cluster:
- executor = submitit.AutoExecutor(folder=submitit_dir)
- submitit_conf.partition = (
- args.partition
- if args.partition is not None
- else submitit_conf.get("partition", None)
- )
- submitit_conf.account = (
- args.account
- if args.account is not None
- else submitit_conf.get("account", None)
- )
- submitit_conf.qos = (
- args.qos if args.qos is not None else submitit_conf.get("qos", None)
- )
- job_kwargs = {
- "timeout_min": 60 * submitit_conf.timeout_hour,
- "name": (
- submitit_conf.name if hasattr(submitit_conf, "name") else args.config
- ),
- "slurm_partition": submitit_conf.partition,
- "gpus_per_node": cfg.launcher.gpus_per_node,
- "tasks_per_node": cfg.launcher.gpus_per_node, # one task per GPU
- "cpus_per_task": submitit_conf.cpus_per_task,
- "nodes": cfg.launcher.num_nodes,
- "slurm_additional_parameters": {
- "exclude": " ".join(submitit_conf.get("exclude_nodes", [])),
- },
- }
- if "include_nodes" in submitit_conf:
- assert len(submitit_conf["include_nodes"]) >= cfg.launcher.num_nodes, (
- "Not enough nodes"
- )
- job_kwargs["slurm_additional_parameters"]["nodelist"] = " ".join(
- submitit_conf["include_nodes"]
- )
- if submitit_conf.account is not None:
- job_kwargs["slurm_additional_parameters"]["account"] = submitit_conf.account
- if submitit_conf.qos is not None:
- job_kwargs["slurm_additional_parameters"]["qos"] = submitit_conf.qos
- if submitit_conf.get("mem_gb", None) is not None:
- job_kwargs["mem_gb"] = submitit_conf.mem_gb
- elif submitit_conf.get("mem", None) is not None:
- job_kwargs["slurm_mem"] = submitit_conf.mem
- if submitit_conf.get("constraints", None) is not None:
- job_kwargs["slurm_constraint"] = submitit_conf.constraints
- if submitit_conf.get("comment", None) is not None:
- job_kwargs["slurm_comment"] = submitit_conf.comment
- # Supports only cpu-bind option within srun_args. New options can be added here
- if submitit_conf.get("srun_args", None) is not None:
- job_kwargs["slurm_srun_args"] = []
- if submitit_conf.srun_args.get("cpu_bind", None) is not None:
- job_kwargs["slurm_srun_args"].extend(
- ["--cpu-bind", submitit_conf.srun_args.cpu_bind]
- )
- print("###################### SLURM Config ####################")
- print(job_kwargs)
- print("##########################################")
- executor.update_parameters(**job_kwargs)
- if (
- "job_array" in submitit_conf
- and submitit_conf.job_array.get("num_tasks", -1) > 0
- ):
- num_tasks = submitit_conf.job_array.num_tasks
- job_array_config_dir = os.path.join(
- cfg.launcher.experiment_log_dir, "job_array_configs"
- )
- makedir(job_array_config_dir)
- job_indices = range(num_tasks)
- ports = random.sample(
- range(submitit_conf.port_range[0], submitit_conf.port_range[1] + 1),
- k=len(job_indices),
- )
- jobs_runners_configs = []
- with executor.batch():
- task_index = 0
- for indices, main_port in tqdm(zip(job_indices, ports)):
- curr_cfg = deepcopy(cfg)
- curr_cfg.submitit.job_array["task_index"] = task_index
- curr_cfg_resolved = handle_custom_resolving(cfg)
- runner = SubmititRunner(main_port, curr_cfg)
- job = executor.submit(runner)
- jobs_runners_configs.append(
- (job, runner, curr_cfg, curr_cfg_resolved)
- )
- task_index += 1
- for job, runner, job_cfg, job_cfg_resolved in jobs_runners_configs:
- print("Submitit Job ID:", job.job_id)
- # Save job specific config
- job_array_config_file = os.path.join(
- job_array_config_dir, "{}.config.yaml".format(job.job_id)
- )
- with g_pathmgr.open(job_array_config_file, "w") as f:
- f.write(OmegaConf.to_yaml(job_cfg))
- job_array_config_resolved_file = os.path.join(
- job_array_config_dir, "{}.config_resolved.yaml".format(job.job_id)
- )
- with g_pathmgr.open(job_array_config_resolved_file, "w") as f:
- f.write(OmegaConf.to_yaml(job_cfg_resolved, resolve=True))
- runner.setup_job_info(job.job_id, rank=0)
- # runner.log_event(event_type=SlurmEvent.QUEUED)
- else:
- main_port = random.randint(
- submitit_conf.port_range[0], submitit_conf.port_range[1]
- )
- runner = SubmititRunner(main_port, cfg)
- job = executor.submit(runner)
- print(f"Submitit Job ID: {job.job_id}")
- runner.setup_job_info(job.job_id, rank=0)
- else:
- cfg.launcher.num_nodes = 1
- main_port = random.randint(
- submitit_conf.port_range[0], submitit_conf.port_range[1]
- )
- single_node_runner(cfg, main_port)
- if __name__ == "__main__":
- initialize_config_module("sam3.train", version_base="1.2")
- parser = ArgumentParser()
- parser.add_argument(
- "-c",
- "--config",
- required=True,
- type=str,
- help="path to config file (e.g. configs/roboflow_v100_full_ft_100_images.yaml)",
- )
- parser.add_argument(
- "--use-cluster",
- type=int,
- default=None,
- help="whether to launch on a cluster, 0: run locally, 1: run on a cluster",
- )
- parser.add_argument("--partition", type=str, default=None, help="SLURM partition")
- parser.add_argument("--account", type=str, default=None, help="SLURM account")
- parser.add_argument("--qos", type=str, default=None, help="SLURM qos")
- parser.add_argument(
- "--num-gpus", type=int, default=None, help="number of GPUS per node"
- )
- parser.add_argument("--num-nodes", type=int, default=None, help="Number of nodes")
- args = parser.parse_args()
- args.use_cluster = bool(args.use_cluster) if args.use_cluster is not None else None
- register_omegaconf_resolvers()
- main(args)
|