From f9929aecd10c7914860409231d40c168244afc31 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Wed, 25 Jul 2018 13:55:03 -0600 Subject: [PATCH 01/35] added boltons back to dependencies --- setup.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index c38399cb..3b09a5fe 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='wrfhydropy', - version='0.0.5dev0', + version='0.0.5dev1', packages=find_packages(), package_data={'wrfhydropy': ['core/data/*']}, url='https://github.com/NCAR/wrf_hydro_py', @@ -14,7 +14,8 @@ 'xarray', 'datetime', 'pytest', - 'pytest-datadir-ng'], + 'pytest-datadir-ng', + 'boltons'], author='Joe Mills', author_email='jmills@ucar.edu', description='Crude API for the WRF-Hydro model', From 2410c38649a703ce27f1c40f1d6fc67392e33d05 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Thu, 26 Jul 2018 16:46:19 -0600 Subject: [PATCH 02/35] Changed number of processors to 64 and 2 nodes in default pbscheyenne args --- wrfhydropy/core/schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index cc0f2095..0a71da69 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -24,7 +24,7 @@ def __init__( account: str, email_who: str = None, email_when: str = 'abe', - nproc: int = 360, + nproc: int = 64, nnodes: int = None, ppn: int = 36, queue: str = 'regular', From 8055c1cdeb08d072c552e1fc9c0cccc68a21f3a4 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Thu, 26 Jul 2018 16:46:33 -0600 Subject: [PATCH 03/35] Changed number of processors to 64 and 2 nodes in default pbscheyenne args --- wrfhydropy/core/schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 0a71da69..70391dd2 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -24,7 +24,7 @@ def __init__( account: str, email_who: str = None, email_when: str = 'abe', - nproc: int = 64, + nproc: int = 72, nnodes: int = None, ppn: int = 36, queue: str = 'regular', From f907d1781c4450328d36fcbbfab9c9a24720ecac Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Fri, 27 Jul 2018 13:15:31 -0600 Subject: [PATCH 04/35] Add job pickle method and start work on collect job update to output collect method --- wrfhydropy/core/job.py | 11 +++++++++++ wrfhydropy/core/simulation.py | 3 +++ 2 files changed, 14 insertions(+) diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index 791e9577..0b52f51f 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -142,6 +142,15 @@ def clone(self, N) -> list: clones.append(copy.deepcopy(self)) return(clones) + def pickle(self,path: str): + """Pickle sim object to specified file path + Args: + path: The file path for pickle + """ + path = pathlib.Path(path) + with path.open(mode='wb') as f: + pickle.dump(self, f, 2) + def _run(self): """Private method to run a job""" @@ -218,6 +227,8 @@ def _run(self): current_dir.joinpath('hydro.namelist').unlink() current_dir.joinpath('namelist.hrldas').unlink() + self.pickle(str(self.job_dir.joinpath('WrfHydroJob.pkl'))) + def _write_namelists(self): """Private method to write namelist dicts to FORTRAN namelist files""" self.hydro_namelist.write(str(self.job_dir.joinpath('hydro.namelist'))) diff --git a/wrfhydropy/core/simulation.py b/wrfhydropy/core/simulation.py index 0e304143..f8cf3a46 100644 --- a/wrfhydropy/core/simulation.py +++ b/wrfhydropy/core/simulation.py @@ -266,6 +266,9 @@ def collect(self,sim_dir: Union[str,pathlib.Path]): current_dir = pathlib.Path(os.curdir).absolute() + # Grab job objects and update jobs list + + # Grab outputs as WrfHydroXX classes of file paths # Get channel files if len(list(current_dir.glob('*CHRTOUT*'))) > 0: From 7672cda0bbe0f5e4644920e79d34a844bb5be847 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Fri, 27 Jul 2018 19:34:45 -0600 Subject: [PATCH 05/35] Updated jobs exit statuses and logs as part of Simulation.collect method --- wrfhydropy/core/simulation.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/wrfhydropy/core/simulation.py b/wrfhydropy/core/simulation.py index f8cf3a46..bfb64312 100644 --- a/wrfhydropy/core/simulation.py +++ b/wrfhydropy/core/simulation.py @@ -129,6 +129,22 @@ def run(self): def collect(self): """Collect simulation output after a run""" + + current_dir = pathlib.Path(os.curdir).absolute() + + # Overwrite sim job objects with collected objects matched on job id + ## Create dict of index/ids so that globbed jobs match the original list order + id_index = dict() + for index, item in enumerate(self.jobs): + id_index[item.job_id] = index + + ## Insert collect jobs into sim job list + job_objs = current_dir.rglob('WrfHydroJob.pkl') + for job_obj in job_objs: + collect_job = pickle.load(open(job_obj,mode='rb')) + original_idx = id_index[collect_job.job_id] + self.jobs[original_idx] = collect_job + self.output = SimulationOutput() self.output.collect(sim_dir=os.getcwd()) @@ -266,9 +282,6 @@ def collect(self,sim_dir: Union[str,pathlib.Path]): current_dir = pathlib.Path(os.curdir).absolute() - # Grab job objects and update jobs list - - # Grab outputs as WrfHydroXX classes of file paths # Get channel files if len(list(current_dir.glob('*CHRTOUT*'))) > 0: From 7d60c1cb6c1ea4d17cd99f84b832d21e30aeb2af Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Fri, 27 Jul 2018 19:35:49 -0600 Subject: [PATCH 06/35] Renamed collect methods to distinguish between outputs and sim collect --- wrfhydropy/core/simulation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wrfhydropy/core/simulation.py b/wrfhydropy/core/simulation.py index bfb64312..d1f849b8 100644 --- a/wrfhydropy/core/simulation.py +++ b/wrfhydropy/core/simulation.py @@ -146,7 +146,7 @@ def collect(self): self.jobs[original_idx] = collect_job self.output = SimulationOutput() - self.output.collect(sim_dir=os.getcwd()) + self.output.collect_output(sim_dir=os.getcwd()) def pickle(self,path: str): """Pickle sim object to specified file path @@ -274,7 +274,7 @@ def __init__(self): self.restart_nudging = None """list: List of nudgingLastObs WrfHydroStatic objects""" - def collect(self,sim_dir: Union[str,pathlib.Path]): + def collect_output(self,sim_dir: Union[str,pathlib.Path]): """Collect simulation output after a run Args: sim_dir: The simulation directory From 73134c5747346f48b1bb2c7a67fb7cbed1353b13 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 11:03:22 -0600 Subject: [PATCH 07/35] started adding unit test for wrfhydropy --- wrfhydropy/__init__.py | 3 +- wrfhydropy/core/test_namelist.py | 29 +++++++++++++++++++ .../tests/test_WrfHydroClasses_public.py | 1 - wrfhydropy/tests/test_domain.py | 0 4 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 wrfhydropy/core/test_namelist.py create mode 100644 wrfhydropy/tests/test_domain.py diff --git a/wrfhydropy/__init__.py b/wrfhydropy/__init__.py index 79a5c189..8ea2df29 100644 --- a/wrfhydropy/__init__.py +++ b/wrfhydropy/__init__.py @@ -4,4 +4,5 @@ from .core.simulation import * from .core import schedulers from .core import outputdiffs -from .core import ioutils \ No newline at end of file +from .core import ioutils +from .core import namelist diff --git a/wrfhydropy/core/test_namelist.py b/wrfhydropy/core/test_namelist.py new file mode 100644 index 00000000..df82a732 --- /dev/null +++ b/wrfhydropy/core/test_namelist.py @@ -0,0 +1,29 @@ +from wrfhydropy import namelist + +def test_dict_merge(): + main_dict = {'key_1':'value_1', + 'key_2': 1, + 'sub_dict1': { + 'subdict1_key1':'sub_value1', + 'subdict1_key2':2, + }, + 'sub_dict2': { + 'subdict2_key1':1} + } + + patch_dict = { + 'sub_dict1':{ + 'subdict1_key1':'patched_value' + }, + 'key_2':'patched_value' + } + + patched_dict = namelist.dict_merge(main_dict,patch_dict) + assert patched_dict == {'key_1': 'value_1', + 'key_2': 'patched_value', + 'sub_dict1': + {'subdict1_key1': 'patched_value', 'subdict1_key2': 2}, + 'sub_dict2': {'subdict2_key1': 1} + } + + diff --git a/wrfhydropy/tests/test_WrfHydroClasses_public.py b/wrfhydropy/tests/test_WrfHydroClasses_public.py index 2603e376..eab601b1 100644 --- a/wrfhydropy/tests/test_WrfHydroClasses_public.py +++ b/wrfhydropy/tests/test_WrfHydroClasses_public.py @@ -31,7 +31,6 @@ def tmp_data_dir(tmpdir_factory): ################################## # Domain object tests - # # Make expected data # domain_top_dir= test_data_dir / 'domain' # domain_object = wrfhydropy.WrfHydroDomain( diff --git a/wrfhydropy/tests/test_domain.py b/wrfhydropy/tests/test_domain.py new file mode 100644 index 00000000..e69de29b From 3a0e492f88f14974d3260beb9271e8f31880c48e Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 16:20:46 -0600 Subject: [PATCH 08/35] Made job a ref in scheduler and changed to private method --- wrfhydropy/core/schedulers.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 70391dd2..59723063 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -10,7 +10,7 @@ def __init__(self): super().__init__() @abstractmethod - def add_job(self, job: Job): + def _add_job(self, job: Job): pass @abstractmethod @@ -64,9 +64,8 @@ def __init__( 'queue':queue, 'walltime':walltime} - def add_job(self,job: Job): + def _add_job(self,job: Job): """Add a job to the scheduler""" - job = copy.deepcopy(job) #Override job exe cmd with scheduler exe cmd job._exe_cmd = self._exe_cmd @@ -110,7 +109,7 @@ def schedule(self): qsub_str += pbs_jids[job_num] + "=`qsub -W depend=afterok:$" + pbs_jids[ job_num-1] + " " + pbs_scripts[job_num] + "`;" - qsub_str += 'qrls $' + pbs_jids[0] + ";" + #qsub_str += 'qrls $' + pbs_jids[0] + ";" qsub_str += '"' # This stacks up dependent jobs in PBS in the same order as the job list From d0c4b583c8ad7a08943de7677a8b264913988800 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 16:30:55 -0600 Subject: [PATCH 09/35] Added different names to completed job pickle object --- wrfhydropy/core/job.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index 0b52f51f..ad44428a 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -300,7 +300,7 @@ def _write_run_script(self): """Private method to write a python script to run the job. This is used primarily for compatibility with job schedulers on HPC systems""" - self._pickle() + self.pickle(str(self.job_dir.joinpath('WrfHydroJob.pkl'))) pystr = "" pystr += "# import modules\n" @@ -325,7 +325,7 @@ def _write_run_script(self): pystr += "job = pickle.load(open(job_dir,mode='rb'))\n" pystr += "#Run the job\n" pystr += "job._run()\n" - pystr += "job._pickle()\n" + pystr += "job.pickle(str(self.job_dir.joinpath('WrfHydroJob_complete.pkl')))\n" pystr_file = 'run_job.py' with open(pystr_file,mode='w') as f: @@ -352,10 +352,16 @@ def _solve_model_start_end_times(self): return model_start_time, model_end_time - def _pickle(self): - with self.job_dir.joinpath('WrfHydroJob.pkl').open(mode='wb') as f: + def pickle(self,path: str): + """Pickle sim object to specified file path + Args: + path: The file path for pickle + """ + path = pathlib.Path(path) + with path.open(mode='wb') as f: pickle.dump(self, f, 2) + @property def job_dir(self): """Path: Path to the run directory""" From 15e91ea0170a5843ca4141046fb584c0e0755755 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 16:37:55 -0600 Subject: [PATCH 10/35] Messing with where in the run/schedule/write_script process a job is saved --- wrfhydropy/core/job.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index ad44428a..4d8bb072 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -227,7 +227,7 @@ def _run(self): current_dir.joinpath('hydro.namelist').unlink() current_dir.joinpath('namelist.hrldas').unlink() - self.pickle(str(self.job_dir.joinpath('WrfHydroJob.pkl'))) + self.pickle(str(self.job_dir.joinpath('WrfHydroJob_complete.pkl'))) def _write_namelists(self): """Private method to write namelist dicts to FORTRAN namelist files""" @@ -325,7 +325,6 @@ def _write_run_script(self): pystr += "job = pickle.load(open(job_dir,mode='rb'))\n" pystr += "#Run the job\n" pystr += "job._run()\n" - pystr += "job.pickle(str(self.job_dir.joinpath('WrfHydroJob_complete.pkl')))\n" pystr_file = 'run_job.py' with open(pystr_file,mode='w') as f: From 076754c00d910114749810132103dd9b4fda312d Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 16:49:57 -0600 Subject: [PATCH 11/35] Messing with where in the run/schedule/write_script process a job is saved --- wrfhydropy/core/job.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index 4d8bb072..5151d7fe 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -227,7 +227,7 @@ def _run(self): current_dir.joinpath('hydro.namelist').unlink() current_dir.joinpath('namelist.hrldas').unlink() - self.pickle(str(self.job_dir.joinpath('WrfHydroJob_complete.pkl'))) + self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl'))) def _write_namelists(self): """Private method to write namelist dicts to FORTRAN namelist files""" @@ -300,7 +300,7 @@ def _write_run_script(self): """Private method to write a python script to run the job. This is used primarily for compatibility with job schedulers on HPC systems""" - self.pickle(str(self.job_dir.joinpath('WrfHydroJob.pkl'))) + self.pickle(str(self.job_dir.joinpath('WrfHydroJob_prerun.pkl'))) pystr = "" pystr += "# import modules\n" @@ -321,8 +321,8 @@ def _write_run_script(self): pystr += "\n" pystr += "#load job object\n" - pystr += "job_dir = 'job_' + args.job_id + '/WrfHydroJob.pkl'\n" - pystr += "job = pickle.load(open(job_dir,mode='rb'))\n" + pystr += "job_file = 'job_' + args.job_id + '/WrfHydroJob_prerun.pkl'\n" + pystr += "job = pickle.load(open(job_file,mode='rb'))\n" pystr += "#Run the job\n" pystr += "job._run()\n" @@ -352,7 +352,7 @@ def _solve_model_start_end_times(self): return model_start_time, model_end_time def pickle(self,path: str): - """Pickle sim object to specified file path + """Pickle job object to specified file path Args: path: The file path for pickle """ From 2e558dd3ab8b1ac6a664d6c2ae2701a650f8871a Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 16:54:11 -0600 Subject: [PATCH 12/35] incremented version dev number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3b09a5fe..0aa3d98d 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='wrfhydropy', - version='0.0.5dev1', + version='0.0.5dev2', packages=find_packages(), package_data={'wrfhydropy': ['core/data/*']}, url='https://github.com/NCAR/wrf_hydro_py', From c5ec069d9d215d7a3e55dd03c30b48853045ece7 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 16:59:30 -0600 Subject: [PATCH 13/35] Added phony forced exit to job _run method to check package versioning on cheyenne --- wrfhydropy/core/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index 5151d7fe..55d1fcd2 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -156,7 +156,7 @@ def _run(self): # Create curent dir path to use for all operations. Needed so that everything can be run # relative to the simulation directory - + exit(255) current_dir = pathlib.Path(os.curdir) # Print some basic info about the run From 672ffd4fc381b3112c36b0b21226133ff7765657 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 17:02:14 -0600 Subject: [PATCH 14/35] removed phony forced exit to job _run method to check package versioning on cheyenne --- wrfhydropy/core/job.py | 1 - 1 file changed, 1 deletion(-) diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index 55d1fcd2..582dc46a 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -156,7 +156,6 @@ def _run(self): # Create curent dir path to use for all operations. Needed so that everything can be run # relative to the simulation directory - exit(255) current_dir = pathlib.Path(os.curdir) # Print some basic info about the run From 7a213df30254a57a9fb86cf3444bd4b3d763880d Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 17:08:35 -0600 Subject: [PATCH 15/35] using _ method for scheduler addd job --- wrfhydropy/core/simulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrfhydropy/core/simulation.py b/wrfhydropy/core/simulation.py index d1f849b8..16f99742 100644 --- a/wrfhydropy/core/simulation.py +++ b/wrfhydropy/core/simulation.py @@ -93,7 +93,7 @@ def compose(self, symlink_domain: bool = True, force: bool = False): if self.scheduler is not None: print('Adding jobs to scheduler...') for job in self.jobs: - self.scheduler.add_job(job) + self.scheduler._add_job(job) # Compile model or copy files if self.model.compile_log is not None: From 89d11e0574ea1071308fbf8a0553d719c476f978 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 17:12:31 -0600 Subject: [PATCH 16/35] release hold in pbsscheduler --- wrfhydropy/core/schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 59723063..cbf24cba 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -109,7 +109,7 @@ def schedule(self): qsub_str += pbs_jids[job_num] + "=`qsub -W depend=afterok:$" + pbs_jids[ job_num-1] + " " + pbs_scripts[job_num] + "`;" - #qsub_str += 'qrls $' + pbs_jids[0] + ";" + qsub_str += 'qrls $' + pbs_jids[0] + ";" qsub_str += '"' # This stacks up dependent jobs in PBS in the same order as the job list From 21ab99d8709807f2cc14c1e8da4f4596e1618a40 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Mon, 30 Jul 2018 20:07:15 -0600 Subject: [PATCH 17/35] Add entry command to scheduler to allow for loading modules, envs, etc. --- wrfhydropy/core/schedulers.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index cbf24cba..47725236 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -21,6 +21,7 @@ class PBSCheyenne(Scheduler): """A Scheduler object compatible with PBS on the NCAR Cheyenne system.""" def __init__( self, + entry_cmd: str, account: str, email_who: str = None, email_when: str = 'abe', @@ -31,6 +32,8 @@ def __init__( walltime: str = "12:00:00"): """Initialize an PBSCheyenne object. Args: + entry_cmd: A shell command to execute prior to submitting the job, e.g. loading a + virtual environment. account: The account string email_who: Email address for PBS notifications email_when: PBS email frequency options. Options include 'a' for on abort, @@ -42,6 +45,8 @@ def __init__( walltime: The wall clock time in HH:MM:SS format, max time is 12:00:00 """ + self.entry_cmd = entry_cmd + # Declare attributes. ## property construction self._sim_dir = None @@ -147,7 +152,7 @@ def _write_job_pbs(self): jobstr += "\n" jobstr += "# Not using PBS standard error and out files to capture model output\n" - jobstr += "# but these hidden files might catch output and errors from the scheduler.\n" + jobstr += "# but these files might catch output and errors from the scheduler.\n" jobstr += "#PBS -o {0}\n".format(job.job_dir) jobstr += "#PBS -e {0}\n".format(job.job_dir) jobstr += "\n" @@ -167,6 +172,7 @@ def _write_job_pbs(self): if self.scheduler_opts['queue'] == 'share': jobstr += "export MPI_USE_ARRAY=false\n" + jobstr += self.entry_cmd + "\n" jobstr += 'python run_job.py --job_id {0}\n'.format(job.job_id) pbs_file = job.job_dir.joinpath('job_' + job.job_id + '.pbs') From d18d2443d1cda7647ba7d00b1b9af08f34d2ca96 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 08:09:54 -0600 Subject: [PATCH 18/35] added default empty tring to entry command for scheduler --- wrfhydropy/core/schedulers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 47725236..7993aa70 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -21,8 +21,8 @@ class PBSCheyenne(Scheduler): """A Scheduler object compatible with PBS on the NCAR Cheyenne system.""" def __init__( self, - entry_cmd: str, account: str, + entry_cmd: str = '', email_who: str = None, email_when: str = 'abe', nproc: int = 72, From 43364800241701c0ba7bab5e338b89ea1c976f76 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 08:20:46 -0600 Subject: [PATCH 19/35] removed entry command from scheduler, must be a better way --- wrfhydropy/core/schedulers.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 7993aa70..fb57c60f 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -22,7 +22,6 @@ class PBSCheyenne(Scheduler): def __init__( self, account: str, - entry_cmd: str = '', email_who: str = None, email_when: str = 'abe', nproc: int = 72, @@ -32,8 +31,6 @@ def __init__( walltime: str = "12:00:00"): """Initialize an PBSCheyenne object. Args: - entry_cmd: A shell command to execute prior to submitting the job, e.g. loading a - virtual environment. account: The account string email_who: Email address for PBS notifications email_when: PBS email frequency options. Options include 'a' for on abort, @@ -45,8 +42,6 @@ def __init__( walltime: The wall clock time in HH:MM:SS format, max time is 12:00:00 """ - self.entry_cmd = entry_cmd - # Declare attributes. ## property construction self._sim_dir = None @@ -172,7 +167,6 @@ def _write_job_pbs(self): if self.scheduler_opts['queue'] == 'share': jobstr += "export MPI_USE_ARRAY=false\n" - jobstr += self.entry_cmd + "\n" jobstr += 'python run_job.py --job_id {0}\n'.format(job.job_id) pbs_file = job.job_dir.joinpath('job_' + job.job_id + '.pbs') From c1859905b6e56fe3659f393eb1a69f9afd48dbeb Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 08:31:27 -0600 Subject: [PATCH 20/35] removed entry command from scheduler, must be a better way --- wrfhydropy/core/simulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrfhydropy/core/simulation.py b/wrfhydropy/core/simulation.py index 16f99742..9734bac1 100644 --- a/wrfhydropy/core/simulation.py +++ b/wrfhydropy/core/simulation.py @@ -139,7 +139,7 @@ def collect(self): id_index[item.job_id] = index ## Insert collect jobs into sim job list - job_objs = current_dir.rglob('WrfHydroJob.pkl') + job_objs = current_dir.rglob('WrfHydroJob_postrun.pkl') for job_obj in job_objs: collect_job = pickle.load(open(job_obj,mode='rb')) original_idx = id_index[collect_job.job_id] From 49e4ec7ebde19c94f9e16641dc9cfbe6a567d2ff Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 08:47:07 -0600 Subject: [PATCH 21/35] bug fix in collect method --- wrfhydropy/core/simulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrfhydropy/core/simulation.py b/wrfhydropy/core/simulation.py index 9734bac1..ac991610 100644 --- a/wrfhydropy/core/simulation.py +++ b/wrfhydropy/core/simulation.py @@ -141,7 +141,7 @@ def collect(self): ## Insert collect jobs into sim job list job_objs = current_dir.rglob('WrfHydroJob_postrun.pkl') for job_obj in job_objs: - collect_job = pickle.load(open(job_obj,mode='rb')) + collect_job = pickle.load(job_obj.open(mode='rb')) original_idx = id_index[collect_job.job_id] self.jobs[original_idx] = collect_job From 0ad4743956232db2c897a084d1f1f7cdeedadd63 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 09:00:24 -0600 Subject: [PATCH 22/35] Removed jobs from scheduler object and supplied to schedule method as argument instead --- wrfhydropy/core/schedulers.py | 39 ++++++++++++++--------------------- wrfhydropy/core/simulation.py | 2 +- 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index fb57c60f..3fe96f71 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -9,12 +9,12 @@ class Scheduler(ABC): def __init__(self): super().__init__() - @abstractmethod - def _add_job(self, job: Job): - pass + # @abstractmethod + # def _add_job(self, job: Job): + # pass @abstractmethod - def schedule(self): + def schedule(self,jobs): pass class PBSCheyenne(Scheduler): @@ -49,10 +49,6 @@ def __init__( self._nnodes = nnodes self._ppn = ppn - # Attribute - self.jobs = [] - self.scheduled_jobs = [] - # Setup exe cmd, will overwrite job exe cmd self._exe_cmd = 'mpiexec_mpt ./wrf_hydro.exe' @@ -64,16 +60,11 @@ def __init__( 'queue':queue, 'walltime':walltime} - def _add_job(self,job: Job): - """Add a job to the scheduler""" - - #Override job exe cmd with scheduler exe cmd - job._exe_cmd = self._exe_cmd - - self.jobs.append(job) - - def schedule(self): - """Schedule jobs that have been added to the scheduler""" + def schedule(self,jobs: list): + """Schedule one or more jobs using the scheduler scheduler + Args: + jobs: list of jobs to schedule + """ import subprocess import shlex import pathlib @@ -85,20 +76,20 @@ def schedule(self): # they can't change the order, may not be an issue except for if scheduling fails # somewhere - self._write_job_pbs() + self._write_job_pbs(jobs=jobs) # Make lists to store pbs scripts and pbs job ids to get previous dependency pbs_jids = [] pbs_scripts = [] qsub_str = '/bin/bash -c "' - for job_num, option in enumerate(self.jobs): + for job_num, option in enumerate(jobs): # This gets the pbs script name and pbs jid for submission # the obs jid is stored in a list so that the previous jid can be retrieved for # dependency - job_id = self.jobs[job_num].job_id - pbs_scripts.append(str(self.jobs[job_num].job_dir) + '/job_' + job_id + '.pbs') + job_id = jobs[job_num].job_id + pbs_scripts.append(str(jobs[job_num].job_dir) + '/job_' + job_id + '.pbs') pbs_jids.append('job_' + job_id) # If first job, schedule using hold @@ -120,9 +111,9 @@ def schedule(self): subprocess.run(shlex.split('qrls $' + pbs_jids[0]), cwd=str(current_dir)) - def _write_job_pbs(self): + def _write_job_pbs(self,jobs): """Private method to write bash PBS scripts for submitting each job """ - for job in self.jobs: + for job in jobs: # Write PBS script jobstr = "" diff --git a/wrfhydropy/core/simulation.py b/wrfhydropy/core/simulation.py index ac991610..e60afe47 100644 --- a/wrfhydropy/core/simulation.py +++ b/wrfhydropy/core/simulation.py @@ -121,7 +121,7 @@ def run(self): for job in self.jobs: job._run() else: - self.scheduler.schedule() + self.scheduler.schedule(jobs=self.jobs) # Overwrite the object after run if successfull path = current_dir.joinpath('WrfHydroSim.pkl') From f7a4cbb9d54511c7ae7e5283870806a6eb7bcc21 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 09:02:40 -0600 Subject: [PATCH 23/35] Removed jobs from scheduler object and supplied to schedule method as argument instead --- wrfhydropy/core/simulation.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/wrfhydropy/core/simulation.py b/wrfhydropy/core/simulation.py index e60afe47..13a10217 100644 --- a/wrfhydropy/core/simulation.py +++ b/wrfhydropy/core/simulation.py @@ -89,12 +89,6 @@ def compose(self, symlink_domain: bool = True, force: bool = False): print('Validating job input files') self._validate_jobs() - # Add jobs to scheduler - if self.scheduler is not None: - print('Adding jobs to scheduler...') - for job in self.jobs: - self.scheduler._add_job(job) - # Compile model or copy files if self.model.compile_log is not None: if self.model.compile_log.returncode == 0: From 3307d86cb9a2850cfdaf99782f020a9f69c62480 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 11:01:23 -0600 Subject: [PATCH 24/35] Changed job run method to handle model runtime errors more gracefully --- wrfhydropy/core/job.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index 582dc46a..2712180b 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -210,23 +210,28 @@ def _run(self): self.job_end_time = str(datetime.datetime.now()) # String match diag files for successfull run - self.exit_status = 1 - with current_dir.joinpath('diag_hydro.00000').open() as f: - diag_file = f.read() - if 'The model finished successfully.......' in diag_file: - self.exit_status = 0 - - # cleanup job-specific run files - diag_files = current_dir.glob('*diag*') - for file in diag_files: - shutil.move(str(file), str(self.job_dir)) - - shutil.move(str(self.stdout_file),str(self.job_dir)) - shutil.move(str(self.stderr_file),str(self.job_dir)) - current_dir.joinpath('hydro.namelist').unlink() - current_dir.joinpath('namelist.hrldas').unlink() - - self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl'))) + diag_file = current_dir.joinpath('diag_hydro.00000') + if diag_file.exists(): + with diag_file.open() as f: + diag_file = f.read() + if 'The model finished successfully.......' in diag_file: + self.exit_status = 0 + + # cleanup job-specific run files + diag_files = current_dir.glob('*diag*') + for file in diag_files: + shutil.move(str(file), str(self.job_dir)) + + shutil.move(str(self.stdout_file),str(self.job_dir)) + shutil.move(str(self.stderr_file),str(self.job_dir)) + current_dir.joinpath('hydro.namelist').unlink() + current_dir.joinpath('namelist.hrldas').unlink() + + self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl'))) + else: + self.exit_status = 1 + self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl'))) + raise RuntimeError('Model did not finish successfully') def _write_namelists(self): """Private method to write namelist dicts to FORTRAN namelist files""" From 644f41b5940ac4b28dbb6c913f38723a3a91bf90 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 11:02:46 -0600 Subject: [PATCH 25/35] Incremented dev version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0aa3d98d..150c7f79 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='wrfhydropy', - version='0.0.5dev2', + version='0.0.5dev3', packages=find_packages(), package_data={'wrfhydropy': ['core/data/*']}, url='https://github.com/NCAR/wrf_hydro_py', From 3cbe49b7756a01962711e16d64ab31371c7dc217 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 11:18:20 -0600 Subject: [PATCH 26/35] moved job pickle to end of run --- setup.py | 2 +- wrfhydropy/core/job.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 150c7f79..4fd91216 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='wrfhydropy', - version='0.0.5dev3', + version='0.0.5dev4', packages=find_packages(), package_data={'wrfhydropy': ['core/data/*']}, url='https://github.com/NCAR/wrf_hydro_py', diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index 2712180b..d010ccb6 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -226,13 +226,13 @@ def _run(self): shutil.move(str(self.stderr_file),str(self.job_dir)) current_dir.joinpath('hydro.namelist').unlink() current_dir.joinpath('namelist.hrldas').unlink() - - self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl'))) else: self.exit_status = 1 self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl'))) raise RuntimeError('Model did not finish successfully') + self.pickle(str(self.job_dir.joinpath('WrfHydroJob_postrun.pkl'))) + def _write_namelists(self): """Private method to write namelist dicts to FORTRAN namelist files""" self.hydro_namelist.write(str(self.job_dir.joinpath('hydro.namelist'))) From e17db9438d87cd0ede8803901f7d2497b23d18ae Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 11:37:05 -0600 Subject: [PATCH 27/35] Overwrite job exe cmd with scheduler exe cmd --- wrfhydropy/core/schedulers.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 3fe96f71..59e6dfa8 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -50,7 +50,10 @@ def __init__( self._ppn = ppn # Setup exe cmd, will overwrite job exe cmd - self._exe_cmd = 'mpiexec_mpt ./wrf_hydro.exe' + if self.scheduler_opts['queue'] == 'shared': + self._exe_cmd = 'mpirun -np {0} ./wrf_hydro.exe'.format(self.nproc) + else: + self._exe_cmd = 'mpiexec_mpt ./wrf_hydro.exe' ## Scheduler options dict ## TODO: Make this more elegant than hard coding for maintenance sake @@ -114,6 +117,8 @@ def schedule(self,jobs: list): def _write_job_pbs(self,jobs): """Private method to write bash PBS scripts for submitting each job """ for job in jobs: + # Copy the job because the exe cmd is edited below + job = copy.deepcopy(job) # Write PBS script jobstr = "" @@ -165,6 +170,8 @@ def _write_job_pbs(self,jobs): f.write(jobstr) # Write the python run script for the job + ## Overwrite job exe cmd with scheduler exe cmd + job._exe_cmd = self._exe_cmd job._write_run_script() def _solve_nodes_cores(self): From 022ed609a4ff76fecc5605c8e719b11f30b65637 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 11:40:19 -0600 Subject: [PATCH 28/35] Overwrite job exe cmd with scheduler exe cmd --- wrfhydropy/core/schedulers.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 59e6dfa8..04095f03 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -49,12 +49,6 @@ def __init__( self._nnodes = nnodes self._ppn = ppn - # Setup exe cmd, will overwrite job exe cmd - if self.scheduler_opts['queue'] == 'shared': - self._exe_cmd = 'mpirun -np {0} ./wrf_hydro.exe'.format(self.nproc) - else: - self._exe_cmd = 'mpiexec_mpt ./wrf_hydro.exe' - ## Scheduler options dict ## TODO: Make this more elegant than hard coding for maintenance sake self.scheduler_opts = {'account':account, @@ -63,6 +57,12 @@ def __init__( 'queue':queue, 'walltime':walltime} + # Setup exe cmd, will overwrite job exe cmd + if self.scheduler_opts['queue'] == 'shared': + self._exe_cmd = 'mpirun -np {0} ./wrf_hydro.exe'.format(self.nproc) + else: + self._exe_cmd = 'mpiexec_mpt ./wrf_hydro.exe' + def schedule(self,jobs: list): """Schedule one or more jobs using the scheduler scheduler Args: From 7931272f241f7175cadbd464c64eb9a6dad76410 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Tue, 31 Jul 2018 14:44:58 -0600 Subject: [PATCH 29/35] Added python path the pbsscheduler to handle virtual environments --- wrfhydropy/core/schedulers.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 04095f03..7df0eee1 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -1,6 +1,5 @@ # Note: All other imports for individual schedulers should be done in the respective scheduler # class functions so that imports can be isolated to relevant schedulers -import copy from abc import ABC, abstractmethod from .job import Job @@ -18,6 +17,7 @@ def schedule(self,jobs): pass class PBSCheyenne(Scheduler): + """A Scheduler object compatible with PBS on the NCAR Cheyenne system.""" def __init__( self, @@ -116,6 +116,12 @@ def schedule(self,jobs: list): def _write_job_pbs(self,jobs): """Private method to write bash PBS scripts for submitting each job """ + import copy + import sys + + # Get the current pytohn executable to handle virtual environments in the scheduler + python_path = sys.executable + for job in jobs: # Copy the job because the exe cmd is edited below job = copy.deepcopy(job) @@ -163,7 +169,7 @@ def _write_job_pbs(self,jobs): if self.scheduler_opts['queue'] == 'share': jobstr += "export MPI_USE_ARRAY=false\n" - jobstr += 'python run_job.py --job_id {0}\n'.format(job.job_id) + jobstr += '{0} run_job.py --job_id {1}\n'.format(python_path, job.job_id) pbs_file = job.job_dir.joinpath('job_' + job.job_id + '.pbs') with pbs_file.open(mode='w') as f: From 35a984797758ddcd038702933f005ae233de86a4 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Wed, 1 Aug 2018 06:59:37 -0600 Subject: [PATCH 30/35] added netcdf4 to requirements in setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 4fd91216..f7701c62 100644 --- a/setup.py +++ b/setup.py @@ -9,6 +9,7 @@ license='MIT', install_requires=['pandas', 'f90nml', + 'netcdf4', 'deepdiff', 'pathlib', 'xarray', From 78d3618a52a77a9a067f1a3b8fbc3b27ddacc949 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Wed, 1 Aug 2018 09:29:55 -0600 Subject: [PATCH 31/35] Removed old commented code --- wrfhydropy/core/job.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/wrfhydropy/core/job.py b/wrfhydropy/core/job.py index d010ccb6..ad5f9aa4 100644 --- a/wrfhydropy/core/job.py +++ b/wrfhydropy/core/job.py @@ -83,26 +83,6 @@ def __init__( self._hydro_namelist = None self._hrldas_namelist = None - # # Attributes set by class methods - # self.hrldas_times = {'noahlsm_offline': - # {'kday':None, - # 'khour':None, - # 'start_year':None, - # 'start_month':None, - # 'start_day': None, - # 'start_hour':None, - # 'start_min':None, - # 'restart_filename_requested': None} - # } - # """dict: the HRLDAS namelist used for this job.""" - # - # self.hydro_times = {'hydro_nlist': - # {'restart_file':None}, - # 'nudging_nlist': - # {'nudginglastobsfile':None} - # } - # """dict: the hydro namelist used for this job.""" - self.exit_status = None """int: The exit status of the model job parsed from WRF-Hydro diag files""" From 4b23ee41ebf0c90fc1f4eca823bc961d04589fe6 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Wed, 1 Aug 2018 12:15:38 -0600 Subject: [PATCH 32/35] Removed old commented code --- wrfhydropy/core/ioutils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wrfhydropy/core/ioutils.py b/wrfhydropy/core/ioutils.py index a9ee96b8..90e2dcd4 100644 --- a/wrfhydropy/core/ioutils.py +++ b/wrfhydropy/core/ioutils.py @@ -3,6 +3,7 @@ import os import warnings import pathlib +import netCDF4 from boltons import iterutils From 515bbc97727c0ded2df5bc8ff6a6b9c571697d41 Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Wed, 1 Aug 2018 14:38:17 -0600 Subject: [PATCH 33/35] added exit status to pbs bash script --- wrfhydropy/core/schedulers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index 7df0eee1..abc0a8ec 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -170,6 +170,7 @@ def _write_job_pbs(self,jobs): jobstr += "export MPI_USE_ARRAY=false\n" jobstr += '{0} run_job.py --job_id {1}\n'.format(python_path, job.job_id) + jobstr += 'exit $?\n' pbs_file = job.job_dir.joinpath('job_' + job.job_id + '.pbs') with pbs_file.open(mode='w') as f: From 22c07434ef9a590359f1a1f7c74fa7e0540f786a Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Wed, 1 Aug 2018 16:00:19 -0600 Subject: [PATCH 34/35] updated version to stable and ready to push to pypi --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f7701c62..0f8b712a 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='wrfhydropy', - version='0.0.5dev4', + version='0.0.5', packages=find_packages(), package_data={'wrfhydropy': ['core/data/*']}, url='https://github.com/NCAR/wrf_hydro_py', From c3a444fa9d82893540508c4bd5718d3788b0107f Mon Sep 17 00:00:00 2001 From: jmills-ncar Date: Wed, 1 Aug 2018 16:02:59 -0600 Subject: [PATCH 35/35] removed commented code block from scheduler --- wrfhydropy/core/schedulers.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/wrfhydropy/core/schedulers.py b/wrfhydropy/core/schedulers.py index abc0a8ec..463b2d75 100644 --- a/wrfhydropy/core/schedulers.py +++ b/wrfhydropy/core/schedulers.py @@ -8,10 +8,6 @@ class Scheduler(ABC): def __init__(self): super().__init__() - # @abstractmethod - # def _add_job(self, job: Job): - # pass - @abstractmethod def schedule(self,jobs): pass