forked from EnsemblGSOC/Ensembl-Repeat-Identification
-
Notifications
You must be signed in to change notification settings - Fork 0
/
submit_cluster_job.py
131 lines (102 loc) · 3.72 KB
/
submit_cluster_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Submit a cluster LSF job to train a neural network.
e.g.:
python submit_cluster_job.py --pipeline train.py --configuration configuration.yaml
python submit_cluster_job.py --gpu V100 --pipeline train.py --configuration configuration.yaml
"""
# standard library imports
import argparse
import datetime as dt
import pathlib
import shutil
import subprocess
import sys
# third party imports
import yaml
from pytorch_lightning.utilities import AttributeDict
def main():
"""
main function
"""
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument("--pipeline", type=str, help="pipeline script path")
argument_parser.add_argument(
"--configuration", type=str, help="experiment configuration file path"
)
argument_parser.add_argument(
"--gpu",
default="A100",
choices=["A100", "V100"],
type=str,
help="GPU nodes queue to submit job",
)
argument_parser.add_argument(
"--num_gpus", default=1, type=int, help="number of GPUs to use"
)
argument_parser.add_argument(
"--mem_limit", default=65536, type=int, help="GPU node RAM memory limit"
)
args = argument_parser.parse_args()
# submit new training job
if args.pipeline and args.configuration:
datetime = dt.datetime.now().isoformat(sep="_", timespec="seconds")
pipeline_path = pathlib.Path(args.pipeline)
with open(args.configuration) as file:
configuration = yaml.safe_load(file)
configuration = AttributeDict(configuration)
experiment_name = (
f"{configuration.experiment_prefix}_{configuration.dataset_id}_{datetime}"
)
experiments_directory = configuration.save_directory
experiment_directory = pathlib.Path(
f"{experiments_directory}/{experiment_name}"
)
experiment_directory.mkdir(parents=True, exist_ok=True)
# save configuration file to experiment directory
configuration_copy = shutil.copy(args.configuration, experiment_directory)
pipeline_command_elements = [
f"python {pipeline_path}",
f"--datetime {datetime}",
f"--configuration {configuration_copy}",
]
# no task specified
else:
print(__doc__)
argument_parser.print_help()
sys.exit()
pipeline_command = " ".join(pipeline_command_elements)
# common job arguments
bsub_command_elements = [
"bsub",
f"-M {args.mem_limit}",
f"-o {experiment_directory}/stdout.log",
f"-e {experiment_directory}/stderr.log",
]
# run training on an NVIDIA A100 GPU node
if args.gpu == "A100":
gpu_memory = 81000 # ~80 GiBs, NVIDIA A100 memory with safety margin
# gpu_memory = 81920 # 80 GiBs, total NVIDIA A100 memory
mem_limit = args.mem_limit
bsub_command_elements.append("-q gpu-a100")
# run training on an NVIDIA V100 GPU node
elif args.gpu == "V100":
gpu_memory = 32256 # 31.5 GiBs, NVIDIA V100 memory with safety margin
# gpu_memory = 32510 # ~32 GiBs, total NVIDIA V100 memory
mem_limit = 32768
bsub_command_elements.append("-q gpu")
bsub_command_elements.extend(
[
f'-gpu "num={args.num_gpus}:gmem={gpu_memory}:j_exclusive=yes"',
f"-M {mem_limit}",
f'-R"select[mem>{mem_limit}] rusage[mem={mem_limit}] span[hosts=1]"',
pipeline_command,
]
)
bsub_command = " ".join(bsub_command_elements)
print(f"running command:\n{bsub_command}")
subprocess.run(bsub_command, shell=True)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted with CTRL-C, exiting...")