-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Howard Soh
committed
Nov 5, 2022
1 parent
b010b37
commit bb069d2
Showing
1 changed file
with
223 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
#!/usr/bin/env python3 | ||
|
||
''' | ||
Created on Oct 14, 2022 | ||
@author: hsoh | ||
The script reads the point observation NetCDF which is generated by MET tools and prints out the text. | ||
The header which begins with "#" is added. This can be diabled by adding "--hide-header" option | ||
''' | ||
|
||
from os import path | ||
from optparse import OptionParser | ||
|
||
import numpy as np | ||
from netCDF4 import Dataset | ||
|
||
def usage(): | ||
print(f'Usage: python3 print_nc2ascii.py MET_point_obs_nc <--hide-header> <--use-comma>') | ||
print(f' MET_point_obs_nc: NetCDF filename to read (required)') | ||
print(f' --hide-header: to hide the header (optional)') | ||
print(f' --use-comma: use the "comma" as separator for comma separated output (optional)"') | ||
print(f' Note: <> indicates optional arguments') | ||
|
||
def create_parser_options(parser): | ||
parser.add_option("--hide-header", "--hide_header", dest="hide_header", | ||
action="store_true", default=False, help=" Hide header (default: False)") | ||
parser.add_option("--use-comma", "--use_comma", dest="use_comma", | ||
action="store_true", default=False, help=" Use comma as separator (default: False)") | ||
return parser.parse_args() | ||
|
||
|
||
class nc_tools(): | ||
|
||
@staticmethod | ||
def get_num_array(nc_group, var_name): | ||
nc_var = nc_group.variables.get(var_name, None) | ||
return [] if nc_var is None else nc_var[:] | ||
|
||
@staticmethod | ||
def get_ncbyte_array_to_str(nc_var): | ||
nc_str_data = nc_var[:] | ||
if nc_var.datatype.name == 'bytes8': | ||
nc_str_data = [ str(s.compressed(),"utf-8") for s in nc_var[:] ] | ||
return nc_str_data | ||
|
||
@staticmethod | ||
def get_string_array(nc_group, var_name): | ||
nc_var = nc_group.variables.get(var_name, None) | ||
return [] if nc_var is None else nc_tools.get_ncbyte_array_to_str(nc_var) | ||
|
||
|
||
class met_nc_point_obs(): | ||
|
||
#EPSILON = 0.00001 | ||
EPSILON_L = 0.000000001 | ||
EPSILON_U = 0.000000009 | ||
|
||
EPSILON2_L = 0.0025 | ||
EPSILON2_U = 0.9996 | ||
|
||
def get_dim_size(self, nc_group, dim_name): | ||
nc_dim = nc_group.dimensions.get(dim_name, None) | ||
return 0 if nc_dim is None else nc_dim.size | ||
|
||
def get_nc_var_string_data(self, nc_group, var_name): | ||
nc_var = nc_group.variables.get(var_name, None) | ||
return ['NA'] if nc_var is None else nc_tools.get_string_array(nc_group, var_name) | ||
|
||
def is_integer(self, value): | ||
return (abs(value - int(value)) < self.EPSILON_L) or (abs(value - int(value)) < self.EPSILON_U) | ||
|
||
def read_data(self, met_nc_name): | ||
nc_group = Dataset(met_nc_name, "r") | ||
self.use_var_id = False | ||
if 'use_var_id' in nc_group.ncattrs(): | ||
self.use_var_id = nc_group.use_var_id.lower() == "true" | ||
|
||
self.nhdr = self.get_dim_size(nc_group, 'nhdr') | ||
self.hdr_zero = [ 0 for _ in range(0, self.nhdr)] | ||
self.nhdr_typ = self.get_dim_size(nc_group, 'nhdr_typ') # type table | ||
self.nhdr_sid = self.get_dim_size(nc_group, 'nhdr_sid') # station_id table | ||
self.nhdr_vld = self.get_dim_size(nc_group, 'nhdr_vld') # valid time strings | ||
|
||
self.hdr_typ = nc_tools.get_num_array(nc_group, 'hdr_typ') # (nhdr) integer | ||
self.hdr_sid = nc_tools.get_num_array(nc_group, 'hdr_sid') # (nhdr) integer | ||
self.hdr_vld = nc_tools.get_num_array(nc_group, 'hdr_vld') # (nhdr) integer | ||
self.hdr_lat = nc_tools.get_num_array(nc_group, 'hdr_lat') # (nhdr) float | ||
self.hdr_lon = nc_tools.get_num_array(nc_group, 'hdr_lon') # (nhdr) float | ||
self.hdr_elv = nc_tools.get_num_array(nc_group, 'hdr_elv') # (nhdr) float | ||
self.hdr_typ_table = self.get_nc_var_string_data(nc_group, 'hdr_typ_table') # (nhdr_typ, mxstr2) string | ||
self.hdr_sid_table = self.get_nc_var_string_data(nc_group, 'hdr_sid_table') # (nhdr_sid, mxstr2) string | ||
self.hdr_vld_table = self.get_nc_var_string_data(nc_group, 'hdr_vld_table') # (nhdr_vld, mxstr) string | ||
|
||
#Observation data | ||
self.nobs = self.get_dim_size(nc_group, 'nobs') | ||
self.nobs_qty = self.get_dim_size(nc_group, 'nobs_qty') | ||
self.nobs_var = self.get_dim_size(nc_group, 'nobs_var') | ||
|
||
self.obs_qty = nc_tools.get_num_array(nc_group, 'obs_qty') # (nobs_qty) integer, index of self.obs_qty_table | ||
self.obs_hid = nc_tools.get_num_array(nc_group, 'obs_hid') # (nobs) integer | ||
self.obs_vid = nc_tools.get_num_array(nc_group, 'obs_vid') # (nobs) integer, veriable index from self.obs_var_table or GRIB code | ||
if 0 == len(self.obs_vid): | ||
self.obs_vid = nc_tools.get_num_array(nc_group, 'obs_gc' ) # (nobs) integer, veriable index from self.obs_var_table or GRIB code | ||
self.obs_lvl = nc_tools.get_num_array(nc_group, 'obs_lvl') # (nobs) float | ||
self.obs_hgt = nc_tools.get_num_array(nc_group, 'obs_hgt') # (nobs) float | ||
self.obs_val = nc_tools.get_num_array(nc_group, 'obs_val') # (nobs) float | ||
self.obs_qty_table = self.get_nc_var_string_data(nc_group, 'obs_qty_table') # (nobs_qty, mxstr) string | ||
self.obs_var_table = self.get_nc_var_string_data(nc_group, 'obs_var') # (nobs_var, mxstr2) string, required if self.use_var_id is True | ||
|
||
def dump(self, show_header=True, use_comma=False): | ||
hdr_typ = self.hdr_zero if 0 == self.nhdr_typ else self.hdr_typ | ||
hdr_sid = self.hdr_zero if 0 == self.nhdr_sid else self.hdr_sid | ||
if use_comma: | ||
hdr_lat = [ f'{i:.0f}' if self.is_integer(i) else f'{i:.4f}' for i in self.hdr_lat ] | ||
hdr_lon = [ f'{i:.0f}' if self.is_integer(i) else f'{i:.4f}' for i in self.hdr_lon ] | ||
else: | ||
hdr_lat = self.formated_num_array(self.hdr_lat, use_comma) | ||
hdr_lon = self.formated_num_array(self.hdr_lon, use_comma) | ||
hdr_elv = self.formated_num_array(self.hdr_elv, use_comma) | ||
headers = [list(i) for i in zip(hdr_typ, hdr_sid, self.hdr_vld, hdr_lat, hdr_lon, hdr_elv)] | ||
|
||
obs_lvl = self.formated_num_array(self.obs_lvl, use_comma) | ||
obs_hgt = self.formated_num_array(self.obs_hgt, use_comma) | ||
if use_comma: | ||
obs_qty = [ '"NA"' if np.ma.is_masked(i) else f'"{self.obs_qty_table[i]}"' for i in self.obs_qty ] | ||
obs_vid = [ f'"{self.obs_var_table[i]}"' if self.use_var_id else f'"{i}"' for i in self.obs_vid ] | ||
else: | ||
obs_qty = [ 'NA' if np.ma.is_masked(i) else f'{self.obs_qty_table[i]}' for i in self.obs_qty ] | ||
obs_vid = [ f'{self.obs_var_table[i]}' if self.use_var_id else f'{i:3}' for i in self.obs_vid ] | ||
obs_val = [ f'{i:.2f}' if (i == 0) else | ||
f'{i:.8f}' if (abs(i) < 0.00001) else | ||
f'{i:.6f}' if (abs(i) < 0.001) else | ||
f'{i:.3f}' if (abs(i) < 0.01 and abs((i*1000)-int(i*1000)) < 0.0000001) else | ||
f'{i:.3f}' if (abs(i) < 0.01 and abs((i*1000)-int(i*1000)) > 0.999998) else | ||
f'{i:.5f}' if (abs(i) < 0.01) else | ||
f'{i:.2f}' if (abs(i) < 0.1 and abs((i*100)-int(i*100)) < 0.0000001) else | ||
f'{i:.2f}' if (abs(i) < 0.1 and abs((i*100)-int(i*100)) > 0.999998) else | ||
f'{i:.4f}' if (abs(i) < 0.1) else | ||
f'{i:.2f}' if (abs(i*100 - int(i*100)) < 0.00009) else | ||
f'{i:.2f}' if (abs(i*100 - int(i*100)) > 0.99877) else | ||
f'{i:.4f}' for i in self.obs_val ] | ||
|
||
if show_header: | ||
if use_comma: | ||
print(f'#msg_type,station_id,valid_time,lat,lon,elv,var_name/gc,level,height,qty,value') | ||
else: | ||
print(f'#msg_type s_id valid_time lat lon elv var_name/gc level height qty value') | ||
|
||
for obs_data in [list(i) for i in zip(obs_vid, obs_lvl, obs_hgt, obs_qty, obs_val, self.obs_hid)]: | ||
header_arr = headers[obs_data[-1]] | ||
if use_comma: | ||
print(f'"{self.hdr_typ_table[header_arr[0]]}",' | ||
f'"{self.hdr_sid_table[header_arr[1]]}",' | ||
f'"{self.hdr_vld_table[header_arr[2]]}",' | ||
f'{header_arr[3]},{header_arr[4]},{header_arr[5]},' | ||
f'{obs_data[0]},{obs_data[1]},{obs_data[2]},{obs_data[3]},{obs_data[4]}') | ||
else: | ||
print(f'{self.hdr_typ_table[header_arr[0]]} ' | ||
f'{self.hdr_sid_table[header_arr[1]]} ' | ||
f'{self.hdr_vld_table[header_arr[2]]} ' | ||
f'{header_arr[3]} {header_arr[4]} {header_arr[5]} ' | ||
f'{obs_data[0]:4} {obs_data[1]} {obs_data[2]} {obs_data[3]} {obs_data[4]}') | ||
|
||
def is_all_int(self, value_array): | ||
all_int = True | ||
for a_value in value_array: | ||
if np.ma.is_masked(a_value) or not self.is_integer(a_value): | ||
all_int = False | ||
break | ||
return all_int | ||
|
||
def is_2_digit_precision(self, value_array): | ||
result = True | ||
for a_value in value_array: | ||
if np.ma.is_masked(a_value): | ||
continue | ||
if self.EPSILON2_L < abs(a_value*100-int(a_value*100)) < self.EPSILON2_U: | ||
if abs(a_value*10-int(a_value*10)) < self.EPSILON2_U: # to filter x.x9996 | ||
result = False | ||
print(" ======================= is_2_digit_precision ", result , a_value, abs(a_value*100-int(a_value*100)) ) | ||
break | ||
return result | ||
|
||
def formated_num_array(self, value_array, use_comma): | ||
if self.is_all_int(value_array): | ||
str_array = [ f'{i:.0f}' for i in value_array ] if use_comma else [ f'{i:5.0f}' for i in value_array ] | ||
else: | ||
if self.is_2_digit_precision(value_array): | ||
if use_comma: | ||
str_array = [ '"NA"' if np.ma.is_masked(i) else f'{i:.2f}' for i in value_array ] | ||
else: | ||
str_array = [ 'NA' if np.ma.is_masked(i) else f'{i:8.2f}' for i in value_array ] | ||
else: | ||
if use_comma: | ||
str_array = [ '"NA"' if np.ma.is_masked(i) else f'{i:.4f}' for i in value_array ] | ||
else: | ||
str_array = [ 'NA' if np.ma.is_masked(i) else f'{i:8.4f}' for i in value_array ] | ||
return str_array | ||
|
||
if __name__ == '__main__': | ||
usage_str = "%prog [options]" | ||
parser = OptionParser(usage = usage_str) | ||
options, args = create_parser_options(parser) | ||
|
||
if 0 == len(args): | ||
print('" == INFO == Missing input NetCDF filename') | ||
usage() | ||
else: | ||
met_nc_data = met_nc_point_obs() | ||
for nc_name in args: | ||
if not path.exists(nc_name): | ||
if nc_name.startswith('#'): | ||
print(f' == INFO == ignore "{nc_name}" option') | ||
else: | ||
print(f' == INFO == The input file {nc_name} does not exist') | ||
continue | ||
|
||
met_nc_data.read_data(nc_name) | ||
met_nc_data.dump(not options.hide_header, options.use_comma) | ||
print() | ||
|