Skip to content
Extraits de code Groupes Projets
extract_output_variable.py 6,9 ko
Newer Older
  • Learn to ignore specific revisions
  • import pathlib
    import os.path
    import argparse
    import subprocess
    
    import utilities.module_utilities
    
    import utilities.machine_utilities
    
    import utilities.generic_utilities
    
    import models.nemo.utilities.generic_utilities
    import models.nemo.utilities.namelist_utilities
    import models.nemo.utilities.experiment_utilities
    import models.nemo.utilities.experiment_plan_utilities
    import models.nemo.utilities.argument_parser_utilities
    
    
    
    REQUIRED_ENVIRONMENT_MODULES = {
    
      utilities.machine_utilities.LEMAITRE3: {
    
        utilities.module_utilities.KEY_MODULES: ['CDO/1.9.10-iimpi-2021b'],
        utilities.module_utilities.KEY_RELEASE: 'releases/2021b',
      },
      utilities.machine_utilities.MANNEBACK: {
        utilities.module_utilities.KEY_MODULES: ['CDO/1.9.10-iimpi-2021b'],
        utilities.module_utilities.KEY_RELEASE: 'releases/2021b',
      },
    
      utilities.machine_utilities.LUCIA: {
        utilities.module_utilities.KEY_MODULES: ['CDO/2.0.6-gompi-2022a'],
      }
    
    def extract_output_variable(version, configuration, name, output_name, output_frequency,
      output_variable, start_date, end_date):
    
      '''
      Extract a variable from the output files of an experiment, between two dates, and concatenate the
      extracted data in a single file.
      '''
    
      print('Extracting a variable from the outputs of experiment '
        + '"%s", in configuration "%s", in version "%s"...\n'%(name, configuration, version))
    
      # create the experiment object
      experiment = models.nemo.utilities.experiment_utilities.Experiment(version=version,
        configuration=configuration, name=name)
      experiment_plan = experiment.get_experiment_plan()
      post_processing_path = experiment.get_scratch_post_processing_path(check_existence=False)
    
    
      # create the post processing and temporary directories
    
      pathlib.Path(post_processing_path).mkdir(parents=True, exist_ok=True)
    
      post_processing_temporary_path = os.path.join(post_processing_path, 'temporary')
      utilities.generic_utilities.create_clean_directory(post_processing_temporary_path)
    
      # load the required modules
    
      utilities.module_utilities.load_environment_modules(modules=REQUIRED_ENVIRONMENT_MODULES)
    
    
      # extract the output variable from the relevant legs
    
      number_of_relevant_legs = 0
      extracted_file_names = []
    
    
      start_date = models.nemo.utilities.generic_utilities.parse_date_string(date=start_date)
      end_date = models.nemo.utilities.generic_utilities.parse_date_string(date=end_date)
    
      extracted_data_actual_start_date = None
      extracted_data_actual_end_date = None
    
      for leg in experiment_plan.legs:
        if (leg.end_date >= start_date) and (leg.start_date <= end_date):
          number_of_relevant_legs += 1
    
    
          # update the actual start and end dates of the extracted data
    
          if extracted_data_actual_start_date is None:
            extracted_data_actual_start_date = leg.start_date
    
          extracted_data_actual_end_date = leg.end_date
    
    
          # check that the output file exists
    
          leg_outputs_path = leg.get_scratch_outputs_path(check_existence=True)
          output_file_path = os.path.join(leg_outputs_path, '%s_%s_%s_%s_%s.nc'%(name, output_frequency,
            leg.start_date.strftime('%Y%m%d'), leg.end_date.strftime('%Y%m%d'), output_name))
    
          utilities.generic_utilities.check_file_existence(path=output_file_path,
            file_identifier='following output file')
    
          # extract the variable from the output file
    
          extracted_file_name = leg.format_index() + '.nc'
          extracted_file_names.append(extracted_file_name)
          extracted_file_path = os.path.join(post_processing_temporary_path, extracted_file_name)
    
          subprocess_arguments = ['cdo', 'select,name=%s'%output_variable, output_file_path,
            extracted_file_path]
    
          process = subprocess.run(subprocess_arguments, stdout=subprocess.PIPE,
    
            stderr=subprocess.STDOUT)
    
          print('Data extracted between %s and %s'%(leg.start_date, leg.end_date))
    
      print('')
    
    
      # check that at least one leg matched the requested date range
      if number_of_relevant_legs == 0:
        raise utilities.generic_utilities.ClimateToolboxException(
          'No legs were found in the requested date range.')
    
      print('Data has been extracted between %s and %s. Now concatenating...\n'
        %(extracted_data_actual_start_date, extracted_data_actual_end_date))
    
      # check that the target concatenated file does not exist yet
      concatenated_file_name = '%s_%s_%s_%s_%s.nc'%(name, output_frequency,
        extracted_data_actual_start_date, extracted_data_actual_end_date, output_variable)
      concatenated_file_path = os.path.join(post_processing_path, concatenated_file_name)
    
      if os.path.isfile(concatenated_file_path):
    
        raise utilities.generic_utilities.ClimateToolboxException(
    
          'The target file already exists:\n  %s'%concatenated_file_path)
    
    
      # concatenate the extracted files
    
      subprocess_arguments = ['cdo', 'mergetime'] + extracted_file_names + [concatenated_file_path]
    
      process = subprocess.run(subprocess_arguments, stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT, cwd=post_processing_temporary_path)
    
      # remove the temporary directory
      shutil.rmtree(post_processing_temporary_path)
    
    
    
    #################
    # main function #
    #################
    
    def main():
    
      print('')
    
    
      parser = argparse.ArgumentParser(description=extract_output_variable.__doc__,
        formatter_class=argparse.RawTextHelpFormatter)
      models.nemo.utilities.argument_parser_utilities.add_argument_version(parser=parser,
        required=False)
      models.nemo.utilities.argument_parser_utilities.add_argument_configuration(parser=parser,
        required=False)
    
      models.nemo.utilities.argument_parser_utilities.add_argument_name(parser=parser)
      parser.add_argument(
        '-on', '--output_name',
        required=True,
        choices=[
          'grid_T',
    
        help='name of the output files from which the variable must be extracted',
    
      )
      parser.add_argument(
        '-of', '--output_frequency',
        required=True,
        choices=[
          '1d',
    
        help='frequency of the output files from which the variable must be extracted',
    
      )
      parser.add_argument(
        '-ov', '--output_variable',
        required=True,
    
        help='variable that must be extracted',
    
      )
      parser.add_argument(
        '-sd', '--start_date',
        required=True,
    
        help='date from which the variable must be extracted',
    
      )
      parser.add_argument(
        '-ed', '--end_date',
        required=True,
    
        help='date up to which the variable must be extracted',
    
      )
      arguments = vars(parser.parse_args())
    
    
      try:
        arguments['version'], arguments['configuration'] = (
          models.nemo.utilities.global_information_utilities.get_version_and_configuration(
          tentative_version=arguments['version'], tentative_configuration=arguments['configuration']))
      except utilities.generic_utilities.ClimateToolboxException as e:
        e.display()
        return
    
    
      try:
        extract_output_variable(**arguments)
      except utilities.generic_utilities.ClimateToolboxException as e:
        e.display()
        return
    
    
    if __name__ == '__main__':
      main()