-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathcluster.py
364 lines (286 loc) · 13 KB
/
cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
"""
Cluster is meant to encapsulate all differences between cluster systems for submission and
resubmission of jobs.
"""
import subprocess
import re
import os
from .submitscripts.sge import sge, vulcan_programs
from .submitscripts.pbs import pbs, sapelo_old_programs
from .submitscripts.slurm import slurm, sapelo_programs
class Cluster:
""" To add a new cluster one must add the following strings to cluster_attributes
add a submission command for a submission sciprt.
a queue_info command as a list of strings with None at the last index (to be replaced by
the job's_id. Should the name and state of a single job.
a regex_string for the job's 'state' or None (if None must add code in job_finished
as explicit as conditional)
a regex_string for the jobs's name to which pulls out the singlepoint number (number must be
in second capture group 2 see below examples) ex. STEP--00-1
# wait time is a more general pause than resub_delay. resub_delay minimizes
# the up-time of optavc. 'wait_time' makes sure the cluster has enough time
# to see jobs
"""
def __init__(self, cluster_name):
self.cluster_name = cluster_name
attributes = self.cluster_attributes(self.cluster_name)
self.submit_command = attributes.get('submit')
self.queue_info = attributes.get('queue_info')
self.job_state = attributes.get('job_state')
self.job_name = attributes.get('job_name')
self.job_id = attributes.get('job_id')
self.resub_delay = attributes.get('resub_delay')
self.wait_time = attributes.get('wait_time')
@staticmethod
def cluster_attributes(cluster_name):
# Please add a cluster here or if optavc can't find your jobs
# on a cluster please adjust 'resub_delay' and 'wait_time'
attributes = {'VULCAN': {'submit': 'qsub',
'queue_info': ['qacct', '-j', None],
'job_state': None,
'job_name': r'jobname\s+.*(--\d*)?-(\d*)',
'job_id': r'Your\s*job(?:-array)?\s*(\d*)',
'resub_delay': lambda sleep: max(10, sleep),
'wait_time': 5},
'SAPELO_OLD': {'submit': 'qsub',
'queue_info': ['qstat', '-f', None],
'job_state': r'\s*job_state\s=\sC',
'job_name': r'\s*Job_Name\s=\s*.*(\-+\d*)?\-(\d*)',
'job_id': r'(\d*)\.sapelo2',
'resub_delay': lambda sleep: max(60, sleep),
'wait_time': 5},
'SAPELO': {'submit': 'sbatch',
'queue_info': ['sacct', '-X', '--format=JobID,JobName%60,STATE',
'-j', None],
'job_state': None,
'job_name': r'\s.+(--\d*)?-(\d+)',
'job_id': r'\.*(\d+)',
'resub_delay': lambda sleep: max(60, sleep),
'wait_time': 30}}
return attributes.get(cluster_name)
def submit(self, options):
""" This is a very limited submit function. A template sh file
is taken from the submitscripts dir, formatted with make_sub_script(), and
then run using the cluster's submit_command
"""
out = self.make_sub_script(options)
if len(options.name) > 60 and self.cluster_name == 'SAPELO':
raise ValueError(f"Your Job's Name is too long please shorten by "
f"{len(options) - 60} characters or increase formatting space in "
f"`cluster_attributes()` for SAP2TEST")
with open('optstep.sh', 'w+') as f:
f.write(out)
submit_command = self.submit_command
pipe = subprocess.PIPE
process = subprocess.run([submit_command, './optstep.sh'], stdout=pipe, stderr=pipe,
encoding='UTF-8')
# subprocess.pipe collects all output.
# run will wait for array process to finish. Individual submissions will return immediately
# from qsub so lack of parallel execution for run is inconsequential
if process.stderr:
print(process.stderr)
print(os.getcwd())
raise RuntimeError(f"\n{submit_command} command has FAILED. see {submit_command} STDERR"
" above\n")
text_output = process.stdout
return self.get_job_id(text_output, options.job_array)
def query_cluster(self, job_id, job_array=False):
""" use subprocess to get detailed information on a single job. output must be interpreted
in cluster by cluster basis.
Parameters
----------
job_id
Returns
-------
str: Output from cluster's command sacct, qstat -f, qacct -j. Can be None for qacct -j if
job has not finished.
"""
pipe = subprocess.PIPE
self.queue_info[-1] = job_id # last spot is held as None until job_id is ready
# encoding UTF-8 will make stderr and stdout be of type str (not bytes)
process = subprocess.run(self.queue_info, stdout=pipe, stderr=pipe, encoding='UTF-8')
if process.stderr:
if self.cluster_name == 'VULCAN':
# qacct is only command which can see finished jobs. However, querying for a running
# job produces stderr
return False, 0
print(process.stderr)
raise RuntimeError(f"Error encountered trying to run {' '.join(self.queue_info)}")
output = process.stdout
job_state = self.job_finished(output)
if not job_array:
job_number = self.get_job_number(output)
else:
job_number = None
return job_state, job_number
def get_job_id(self, submit_output, job_array=False):
""" Use regex to grab job_id. Regexes should be written for the stdout for the cluster's
submit command
Parameters
----------
submit_output: str
output from qsub, sbatch or similar command
job_array: bool
If True we don't need to lookup the job id
Returns
-------
str
"""
if not submit_output and job_array:
raise RuntimeError(f"No output from {self.submit_command}. Cannot find job_id")
if self.job_id:
return re.search(self.job_id, submit_output).group(1)
else:
raise NotImplementedError("Cannot identify job_id on this cluster")
def get_job_number(self, output):
""" Parse output of the `job_info` command for job number number. All cluster's
currently use regex to simply fetch the number
Parameters
----------
output: str
unmodified output from queue_info command
Returns
-------
int: number corresponding to the singlepoint run in 1 based indexing. Singlepoint.disp_num
is 0 indexed
"""
if self.job_name:
try:
job_number = int(re.search(self.job_name, output).group(2))
except (TypeError, AttributeError):
print("Can not find the singlepoint's disp_num check the regex and output below")
print(self.job_name)
print(output)
raise
else:
return job_number
else:
raise NotImplementedError("Encountered cluster with no job_name regex")
def job_finished(self, output):
""" Determine whether a job is no longer queued or running on the cluster
Parameters
----------
output : str
unmodified output from queue_info
Returns
-------
bool : True if job is done
Notes
-----
either use regex to look for pattern indicating job has finished or must supply cluster
specific check in this method
"""
job_state = False
if self.job_state:
# regex must be able to identify ANY finished job. None otherwise
job_state = re.search(self.job_state, output)
else:
if self.cluster_name == 'VULCAN':
if output:
job_state = True
elif self.cluster_name == 'SAPELO':
# simple check for substring in output
if not ("PENDING" in output or "RUNNING" in output):
job_state = True
return job_state
def get_template(self, job_array=False, email=None, parallel='serial'):
""" Only vulcan currently uses array submission. All other programs only submit
singlepoints individually
Parameters
----------
job_array: bool, optional
Returns
-------
str: ready to be dumped to file
"""
if self.cluster_name == "VULCAN":
if job_array:
return sge.sge_array
else:
return sge.sge_basic
elif self.cluster_name == "SAPELO_OLD":
if email:
return pbs.pbs_email
return pbs.pbs_basic
elif self.cluster_name == "SAPELO":
if parallel in ['mpi']:
if email:
return slurm.slurm_mpi_email
return slurm.slurm_mpi
else:
if email:
return slurm.slurm_email
return slurm.slurm_basic
else:
raise NotImplementedError("Please add a new template or label new cluster as SGE, PBS, "
"or SLURM")
def make_sub_script(self, options):
""" Fill a template file with needed options for each cluster
Parameters
----------
options: options.Options
"""
template = self.get_template(options.job_array, options.email, options.parallel)
progname = options.program
scratch = 'scratch'
job_num = int(options.job_array_range[1])
# cluster = self.options.cluster
odict = {
'q': options.queue,
'nslots': options.nslots,
'jarray': '1-{}'.format(job_num),
'progname': progname,
'name': options.name,
'threads': options.threads
}
# Handle various programs ability to set input file names
# cfour only allows ZMAT for the input name. We copy inpu.dat to ZMAT to adjust for this
# fermi does it's own thing with output files. output_name needs to match! up to the user
if progname == 'cfour':
in_out = {'output_name': options.output_name}
elif progname == 'fermi':
in_out = {'input_name': options.input_name}
else:
in_out = {'input_name': options.input_name, 'output_name': options.output_name}
if self.cluster_name in ['SAPELO', "SAPELO_OLD"]:
scratch = options.scratch.lower()
odict.update({'memory': options.memory,
'time': options.time_limit})
if options.email:
odict.update({'email_opts': options.email_opts})
if self.cluster_name == 'SAPELO':
if options.constraint == '':
constraint = 'EPYC|Intel'
else:
constraint = options.constraint
# choose program string
prog = sapelo_programs.progdict.get(options.parallel).get(scratch).get(progname)
prog = prog.format(**in_out)
odict.update({'prog': prog, 'constraint': constraint})
else:
prog = sapelo_old_programs.progdict.get(options.parallel).get(scratch).get(progname)
prog = prog.format(**in_out)
odict.update({'prog': prog})
elif self.cluster_name == 'VULCAN':
prog = vulcan_programs.progdict.get(options.parallel).get(scratch).get(progname)
prog = prog.format(**in_out)
odict.update({'prog': prog})
if options.job_array:
odict.update({'tc': str(job_num)})
return template.format(**odict)
def enforce_job_array(self, original):
""" Check qconf in case -sync processes have all been used. Otherwise job_array should be
False
Returns
-------
bool: value for job_array
"""
pipe = subprocess.PIPE
if self.cluster_name == 'VULCAN':
process = subprocess.Popen(['qconf', '-secl'], stdout=pipe, stderr=pipe,
encoding='UTF-8')
# Two lines for formatting. 4 lines for processes of which 1 is scheduler
if len(process.stdout.readlines()) >= 6:
return False
return original
return False