Select Git revision
ServletMerge.java
-
Robert Butora authoredRobert Butora authored
model_maker.py 28.16 KiB
#!/bin/python3
# Copyright (C) 2024 INAF - Osservatorio Astronomico di Cagliari
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# A copy of the GNU General Public License is distributed along with
# this program in the COPYING file. If not, see: <https://www.gnu.org/licenses/>.
## @package model_maker
# \brief Script to build models from YAML configuration files.
#
# Script to assist in the creation of model input files starting
# from a YAML descriptor.
#
# The script requires python3.
import yaml
from pathlib import PurePath
from sys import argv
## \brief Main execution code
#
# `main()` is the function that handles the creation of the code configuration.
# It returns an integer value as exit code, using 0 to signal successful execution.
#
# \returns result: `int` Number of detected error-level inconsistencies.
def main():
result = 0
config = parse_arguments()
if (config['help_mode'] or config['yml_file_name'] == ""):
print_help()
else:
sconf, gconf = load_model(config['yml_file_name'])
if (sconf is not None) and (gconf is not None):
result = write_legacy_sconf(sconf)
if (result == 0): result += write_legacy_gconf(gconf)
else:
print("ERROR: could not create configuration.")
result = 1
return result
## \brief Populate the dielectric constant data via interpolation
#
# \param sconf: `dict` Scatterer configuration dictionary.
# \return result: `int` An exit code (0 if successful).
def interpolate_constants(sconf):
result = 0
for i in range(sconf['configurations']):
for j in range(sconf['nshl'][i]):
file_idx = sconf['dielec_id'][i][j]
dielec_path = PurePath(sconf['dielec_path'], sconf['dielec_file'][int(file_idx) - 1])
file_name = str(dielec_path)
dielec_file = open(file_name, 'r')
wavelengths = []
rpart = []
ipart = []
str_line = dielec_file.readline()
while (str_line != ""):
if (not str_line.startswith('#')):
split_line = str_line.split(',')
if (len(split_line) == 3):
wavelengths.append(float(split_line[0]))
rpart.append(float(split_line[1]))
ipart.append(float(split_line[2]))
str_line = dielec_file.readline()
dielec_file.close()
wi = 0
x0 = 0.0
x1 = 0.0
ry0 = 0.0
iy0 = 0.0
ry1 = 0.0
iy1 = 0.0
for dci in range(sconf['nxi']):
w = sconf['vec_xi'][dci]
while (w > x1):
x0 = wavelengths[wi]
ry0 = rpart[wi]
iy0 = ipart[wi]
if (wi == len(wavelengths)):
print("ERROR: file %s does not cover requested wavelengths!"%file_name)
return 1
wi += 1
x1 = wavelengths[wi]
ry1 = rpart[wi]
iy1 = ipart[wi]
if (wi > 0):
x0 = wavelengths[wi - 1]
ry0 = rpart[wi - 1]
iy0 = ipart[wi - 1]
dx = w - x0
dry = (ry1 - ry0) / (x1 - x0) * dx
diy = (iy1 - iy0) / (x1 - x0) * dx
ry = ry0 + dry
iy = iy0 + diy
sconf['rdc0'][j][i][dci] = ry
sconf['idc0'][j][i][dci] = iy
else:
if (wavelengths[wi] == w):
sconf['rdc0'][j][i][dci] = rpart[0]
sconf['idc0'][j][i][dci] = ipart[0]
else:
print("ERROR: file %s does not cover requested wavelengths!"%file_name)
return 2
return result
## \brief Create tha calculation configuration structure from YAML input.
#
# \param model_file: `str` Full path to the YAML input file.
# \return sconf, gconf: `tuple` A dictionary tuple for scatterer and
# geometric configurations.
def load_model(model_file):
sconf = None
gconf = None
model = None
try:
with open(model_file, 'r') as stream:
model = yaml.safe_load(stream)
except yaml.YAMLError:
print("ERROR: " + model_file + " is not a valid YAML file!")
except FileNotFoundError:
print("ERROR: " + model_file + " was not found!")
if model is not None:
# Create the sconf dict
sconf = {
'out_file': PurePath(
model['input_settings']['input_folder'],
model['input_settings']['spheres_file']
)
}
sconf['nsph'] = int(model['particle_settings']['n_spheres'])
sconf['application'] = model['particle_settings']['application']
sconf['ies'] = 1 if sconf['application'] == "INCLUSION" else 0
sconf['exdc'] = float(model['material_settings']['extern_diel'])
sconf['wp'] = float(model['radiation_settings']['wp'])
sconf['xip'] = float(model['radiation_settings']['xip'])
sconf['idfc'] = int(model['material_settings']['diel_flag'])
sconf['instpc'] = int(model['radiation_settings']['step_flag'])
sconf['xi_start'] = float(model['radiation_settings']['scale_start'])
sconf['xi_end'] = float(model['radiation_settings']['scale_end'])
sconf['xi_step'] = float(model['radiation_settings']['scale_step'])
sconf['configurations'] = int(model['particle_settings']['n_types'])
sconf['dielec_path'] = model['material_settings']['dielec_path']
sconf['dielec_file'] = model['material_settings']['dielec_file']
num_dielec = 0 # len(model['particle_settings']['dielec_id'])
if (sconf['idfc'] == -1):
num_dielec = len(model['material_settings']['diel_const'])
elif (sconf['idfc'] == 0):
num_dielec = len(model['particle_settings']['dielec_id'])
if (len(model['particle_settings']['n_layers']) != sconf['configurations']):
print("ERROR: Declared number of layers does not match number of types!")
return (None, None)
else:
sconf['nshl'] = [0 for i in range(sconf['configurations'])]
for i in range(sconf['configurations']):
sconf['nshl'][i] = int(model['particle_settings']['n_layers'][i])
if (sconf['application'] == "INCLUSION"):
sconf['nshl'][0] += 1
max_layers = max(sconf['nshl'])
if (num_dielec != sconf['configurations']):
print("ERROR: declared array of optical constants does not match configurations!")
return (None, None)
else:
if (sconf['idfc'] == 0):
sconf['dielec_id'] = [
[ 0 for j in range(max_layers)] for i in range(sconf['configurations'])
]
for i in range(sconf['configurations']):
if (len(model['particle_settings']['dielec_id'][i]) != sconf['nshl'][i]):
print("ERROR: Declared materials in type %d do not match number of layers!"%i)
return (None, None)
else:
for j in range(sconf['nshl'][i]):
sconf['dielec_id'][i][j] = float(model['particle_settings']['dielec_id'][i][j])
if (model['radiation_settings']['scale_name'] == "XI"):
sconf['insn'] = 1
sconf['nxi'] = 1 + int((sconf['xi_end'] - sconf['xi_start']) / sconf['xi_step'])
sconf['vec_xi'] = [0.0 for i in range(sconf['nxi'])]
for i in range(sconf['nxi']):
sconf['vec_xi'][i] = sconf['xi_start'] + i * sconf['xi_step']
# Set up dielectric constants
if (num_dielec != sconf['configurations']):
print("ERROR: delcared dielectric constants do not match number of sphere types!")
return (None, None)
else:
if (sconf['idfc'] == -1):
sconf['rdc0'] = [
[
[0.0 for k in range(1)] for j in range(sconf['configurations'])
] for i in range(max_layers)
]
sconf['idc0'] = [
[
[0.0 for k in range(1)] for j in range(sconf['configurations'])
] for i in range(max_layers)
]
for di in range(max_layers):
for dj in range(sconf['configurations']):
if (len(model['material_settings']['diel_const'][dj]) / 2 != sconf['nshl'][dj]):
print("ERROR: dielectric constants for type %d do not match number of layers!"%(dj + 1))
return (None, None)
else:
sconf['rdc0'][di][dj][0] = float(model['material_settings']['diel_const'][dj][2 * di])
sconf['idc0'][di][dj][0] = float(model['material_settings']['diel_const'][dj][2 * di + 1])
else: # sconf[idfc] != -1 and scaling on XI
print("ERROR: for scaling on XI, optical constants must be defined!")
return (None, None)
elif (model['radiation_settings']['scale_name'] == "WAVELENGTH"):
sconf['insn'] = 3
if (model['material_settings']['match_mode'] == "INTERPOLATE"):
sconf['nxi'] = 1 + int((sconf['xi_end'] - sconf['xi_start']) / sconf['xi_step'])
sconf['vec_xi'] = [0.0 for i in range(sconf['nxi'])]
for i in range(sconf['nxi']):
sconf['vec_xi'][i] = sconf['xi_start'] + i * sconf['xi_step']
# Set up the dielectric constants
if (sconf['idfc'] == 0):
sconf['rdc0'] = [
[
[0.0 for k in range(sconf['nxi'])] for j in range(sconf['configurations'])
] for i in range(max_layers)
]
sconf['idc0'] = [
[
[0.0 for k in range(sconf['nxi'])] for j in range(sconf['configurations'])
] for i in range(max_layers)
]
interpolate_constants(sconf)
else: # sconf[idfc] != 0 and scaling on wavelength
print("ERROR: for wavelength scaling, optical constants must be tabulated!")
return (None, None)
elif (model['material_settings']['match_mode'] == "GRID"):
match_grid(sconf)
else:
print("ERROR: %s is not a recognized match mode!"%(model['material_settings']['match_mode']))
return (None, None)
else:
print("ERROR: %s is not a supported scaling mode!"%(model['radiation_settings']['scale_name']))
return (None, None)
sph_types = model['particle_settings']['sph_types']
if (len(sph_types) != sconf['nsph']):
print("ERROR: vector of sphere types does not match the declared number of spheres!")
return (None, None)
else:
sconf['vec_types'] = [int(str_typ) for str_typ in sph_types]
if (len(model['particle_settings']['radii']) != sconf['configurations']):
print("ERROR: Declared number of radii does not match number of types!")
return (None, None)
else:
sconf['ros'] = [0.0 for i in range(sconf['configurations'])]
for i in range(sconf['configurations']):
sconf['ros'][i] = float(model['particle_settings']['radii'][i])
if (len(model['particle_settings']['rad_frac']) != sconf['configurations']):
print("ERROR: Declared number of fractional radii does not match number of types!")
return (None, None)
else:
sconf['rcf'] = [
[0.0 for j in range(max_layers)] for i in range(sconf['configurations'])
]
for i in range(sconf['configurations']):
if (len(model['particle_settings']['rad_frac'][i]) != sconf['nshl'][i]):
print("ERROR: Declared transition radii in type %d do not match number of layers!"%i)
return (None, None)
else:
for j in range(sconf['nshl'][i]):
sconf['rcf'][i][j] = float(model['particle_settings']['rad_frac'][i][j])
# Create the gconf dict
str_polar = model['radiation_settings']['polarization']
if (str_polar not in ["LINEAR", "CIRCULAR"]):
print("ERROR: %s is not a recognized polarization state."%str_polar)
return (None, None)
gconf = {
'out_file': PurePath(
model['input_settings']['input_folder'],
model['input_settings']['geometry_file']
)
}
gconf['nsph'] = sconf['nsph']
gconf['application'] = model['particle_settings']['application']
gconf['li'] = int(model['geometry_settings']['li'])
gconf['le'] = int(
gconf['li'] if gconf['application'] == "SPHERE" else model['geometry_settings']['le']
)
gconf['inpol'] = 0 if str_polar == "LINEAR" else 1
gconf['npnt'] = int(model['geometry_settings']['npnt'])
gconf['npntts'] = int(model['geometry_settings']['npntts'])
if (gconf['application'] != "SPHERE"):
gconf['iavm'] = int(model['geometry_settings']['iavm'])
gconf['isam'] = int(model['geometry_settings']['isam'])
gconf['jwtm'] = int(model['output_settings']['jwtm'])
gconf['th'] = float(model['geometry_settings']['in_th_start'])
gconf['thstp'] = float(model['geometry_settings']['in_th_step'])
gconf['thlst'] = float(model['geometry_settings']['in_th_end'])
gconf['ph'] = float(model['geometry_settings']['in_ph_start'])
gconf['phstp'] = float(model['geometry_settings']['in_ph_step'])
gconf['phlst'] = float(model['geometry_settings']['in_ph_end'])
gconf['ths'] = float(model['geometry_settings']['sc_th_start'])
gconf['thsstp'] = float(model['geometry_settings']['sc_th_step'])
gconf['thslst'] = float(model['geometry_settings']['sc_th_end'])
gconf['phs'] = float(model['geometry_settings']['sc_ph_start'])
gconf['phsstp'] = float(model['geometry_settings']['sc_ph_step'])
gconf['phslst'] = float(model['geometry_settings']['sc_ph_end'])
gconf['vec_sph_x'] = [0.0 for i in range(gconf['nsph'])]
gconf['vec_sph_y'] = [0.0 for i in range(gconf['nsph'])]
gconf['vec_sph_z'] = [0.0 for i in range(gconf['nsph'])]
if (gconf['application'] != "SPHERE" or gconf['nsph'] != 1):
# TODO: put here a test to allow for empty vectors in case of random generation
if (len(model['geometry_settings']['x_coords']) != gconf['nsph']):
print("ERROR: X coordinates do not match the number of spheres!")
return (None, None)
if (len(model['geometry_settings']['y_coords']) != gconf['nsph']):
print("ERROR: Y coordinates do not match the number of spheres!")
return (None, None)
if (len(model['geometry_settings']['z_coords']) != gconf['nsph']):
print("ERROR: Z coordinates do not match the number of spheres!")
return (None, None)
for si in range(gconf['nsph']):
gconf['vec_sph_x'][si] = float(model['geometry_settings']['x_coords'][si])
gconf['vec_sph_y'][si] = float(model['geometry_settings']['y_coords'][si])
gconf['vec_sph_z'][si] = float(model['geometry_settings']['z_coords'][si])
else: # model is None
print("ERROR: could not parse " + model_file + "!")
return (sconf, gconf)
## \brief Populate the dielectric constant data matching a grid.
#
# Important note: if the configuration requests that more than one
# optical constants file should be used, all the files must provide
# their constants for the same vector of wavelengths.
#
# \param sconf: `dict` Scatterer configuration dictionary.
# \return result: `int` An exit code (0 if successful).
def match_grid(sconf):
result = 0
max_layers = 0
nxi = 0
sconf['vec_xi'] = []
for i in range(sconf['configurations']):
for j in range(sconf['nshl'][i]):
file_idx = sconf['dielec_id'][i][j]
dielec_path = PurePath(sconf['dielec_path'], sconf['dielec_file'][int(file_idx) - 1])
file_name = str(dielec_path)
dielec_file = open(file_name, 'r')
wavelengths = []
rpart = []
ipart = []
str_line = dielec_file.readline()
while (str_line != ""):
if (not str_line.startswith('#')):
split_line = str_line.split(',')
if (len(split_line) == 3):
wavelengths.append(float(split_line[0]))
rpart.append(float(split_line[1]))
ipart.append(float(split_line[2]))
str_line = dielec_file.readline()
dielec_file.close()
if (max_layers == 0):
# This is executed only once
max_layers = max(sconf['nshl'])
w_start = sconf['xi_start']
w_end = sconf['xi_end']
for wi in range(len(wavelengths)):
w = wavelengths[wi]
if (w >= w_start and w <= w_end):
sconf['vec_xi'].append(w)
nxi += 1
sconf['rdc0'] = [
[
[
0.0 for dk in range(nxi)
] for dj in range(sconf['configurations'])
] for di in range(max_layers)
]
sconf['idc0'] = [
[
[
0.0 for dk in range(nxi)
] for dj in range(sconf['configurations'])
] for di in range(max_layers)
]
sconf['nxi'] = nxi
# This is executed for all layers in all configurations
wi = 0
x = wavelengths[wi]
ry = rpart[wi]
iy = ipart[wi]
for dci in range(sconf['nxi']):
w = sconf['vec_xi'][dci]
while (w > x):
x = wavelengths[wi]
ry = rpart[wi]
iy = ipart[wi]
if (wi == len(wavelengths)):
print("ERROR: file %s does not cover requested wavelengths!"%file_name)
return 1
wi += 1
sconf['rdc0'][j][i][dci] = ry
sconf['idc0'][j][i][dci] = iy
return result
## \brief Parse the command line arguments.
#
# The script behaviour can be modified through a set of optional arguments.
# The purpose of this function is to parse the command line in search for
# such arguments and prepare the execution accordingly.
#
# \returns config: `dict` A dictionary containing the script configuration.
def parse_arguments():
config = {
'yml_file_name': "",
'help_mode': False,
}
yml_set = False
for arg in argv[1:]:
if (arg.startswith("--help")):
config['help_mode'] = True
elif (not yml_set):
if (not arg.startswith("--")):
config['yml_file_name'] = arg
yml_set = True
else:
raise Exception("Unrecognized argument \'{0:s}\'".format(arg))
return config
## \brief Print a command-line help summary.
def print_help():
print("###############################################")
print("# #")
print("# NPtm_code MODEL_MAKER #")
print("# #")
print("###############################################")
print(" ")
print("Create input files for FORTRAN and C++ code. ")
print(" ")
print("Usage: \"./model_maker.py CONFIG [OPTIONS]\" ")
print(" ")
print("CONFIG must be a valid YAML configuration file.")
print(" ")
print("Valid options are: ")
print("--help Print this help and exit.")
print(" ")
## \brief Write the geometry configuration dictionary to legacy format.
#
# \param conf: `dict` Geometry configuration dictionary.
# \return result: `int` An exit code (0 if successful).
def write_legacy_gconf(conf):
result = 0
out_file = str(conf['out_file'])
nsph = conf['nsph']
str_line = "INIT"
# Write legacy output
output = open(out_file, 'w')
if (conf['application'] == "SPHERE"):
str_line = " {0:4d} {1:4d} {2:4d} {3:4d} {4:4d} {5:4d}\n".format(
nsph, conf['li'], conf['inpol'], conf['npnt'], conf['npntts'], conf['isam']
)
output.write(str_line)
else:
mxndm = 2 * nsph * conf['li'] * (conf['li'] + 2)
str_line = " {0:4d} {1:4d} {2:4d} {3:4d} {4:4d} {5:4d} {6:4d} {7:4d} {8:4d}\n".format(
nsph, conf['li'], conf['le'], mxndm, conf['inpol'],
conf['npnt'], conf['npntts'], conf['iavm'], conf['isam']
)
output.write(str_line)
for si in range(nsph):
str_line = " {0:15.8E} {1:15.8E} {2:15.8E}\n".format(
conf['vec_sph_x'][si], conf['vec_sph_y'][si], conf['vec_sph_z'][si]
)
output.write(str_line)
str_line = " {0:7.2E} {1:7.2E} {2:7.2E} {3:7.2E} {4:7.2E} {5:7.2E}\n".format(
conf['th'], conf['thstp'], conf['thlst'],
conf['ph'], conf['phstp'], conf['phlst']
)
output.write(str_line)
str_line = " {0:7.2E} {1:7.2E} {2:7.2E} {3:7.2E} {4:7.2E} {5:7.2E}\n".format(
conf['ths'], conf['thsstp'], conf['thslst'],
conf['phs'], conf['phsstp'], conf['phslst']
)
output.write(str_line)
str_line = " {0:d}\n0\n".format(conf['jwtm'])
output.write(str_line)
output.close()
return result
## \brief Write the scatterer configuration dictionary to legacy format.
#
# \param conf: `dict` Scatterer configuration dictionary.
# \return result: `int` An exit code (0 if successful).
def write_legacy_sconf(conf):
result = 0
out_file = str(conf['out_file'])
nsph = conf['nsph']
ies = conf['ies']
exdc = conf['exdc']
wp = conf['wp']
xip = conf['xip']
idfc = conf['idfc']
instpc = conf['instpc']
xi_flag = 3
nxi = conf['nxi']
# Write legacy output
output = open(out_file, 'w')
str_line = " {0:3d}{1:3d}\n".format(nsph, ies)
output.write(str_line)
str_line = " {0:12.7E} {1:12.7E} {2:12.7E} {3:2d} {4:4d} {5:4d} {6:3d}\n".format(
exdc, wp, xip, idfc, nxi, instpc, xi_flag
)
output.write(str_line)
if (instpc == 0):
for ixi in range(nxi):
str_line = "{0:.3E}\n".format(conf['vec_xi'][ixi])
output.write(str_line)
else:
str_line = "{0:.3E} {1:.3E}\n".format(conf['xi_start'], conf['xi_step'])
output.write(str_line)
sphere_line_count = 0
placed_spheres = 0
last_type = 0
dedfb_type = 0
for si in range(nsph):
if (conf['vec_types'][si] > last_type):
dedfb_type = placed_spheres + 1
last_type = conf['vec_types'][si]
str_line = "{0:5d}".format(dedfb_type)
output.write(str_line)
sphere_line_count += 1
placed_spheres += 1
if (sphere_line_count == 16):
output.write("\n")
sphere_line_count = 0
if (sphere_line_count != 0):
output.write("\n")
for ci in range(conf['configurations']):
layers = conf['nshl'][ci]
if (conf['application'] == "INCLUSION" and ci == 0):
layers -= 1
str_line = "{0:3d} {1:15.7E}\n".format(layers, conf['ros'][ci])
output.write(str_line)
for cj in range(conf['nshl'][ci]):
str_line = " {0:.7E}\n".format(conf['rcf'][ci][cj])
output.write(str_line)
if (conf['application'] != "INCLUSION"):
if (conf['idfc'] == 0):
# Write all wavelength dependent constants for each layer in each configuration
for xi in range(conf['configurations']):
for xj in range(conf['nshl'][xi]):
for xk in range(conf['nxi']):
for xii in range(conf['configurations']):
rdc0 = conf['rdc0'][xj][xii][xk]
idc0 = conf['idc0'][xj][xii][xk]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
elif (conf['idfc'] == -1):
# Write reference scale constants for each layer in each configuration
for xi in range(conf['configurations']):
for xj in range(conf['nshl'][xi]):
rdc0 = conf['rdc0'][xj][xi][0]
idc0 = conf['idc0'][xj][xi][0]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
else: # specialized output for INCLUSION
if (conf['idfc'] == 0):
# Write all wavelength dependent constants for each layer in each configuration
for xi in range(conf['configurations']):
layers = conf['nshl'][xi] - 1
for xj in range(layers):
for xk in range(conf['nxi']):
for xii in range(conf['configurations']):
rdc0 = conf['rdc0'][xj][xii][xk]
idc0 = conf['idc0'][xj][xii][xk]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
if (xii == conf['configurations'] - 1):
rdc0 = conf['rdc0'][layers][0][xk]
idc0 = conf['idc0'][layers][0][xk]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
elif (conf['idfc'] == -1):
# Write reference scale constants for each layer in each configuration
for xi in range(conf['configurations']):
layers = conf['nshl'][xi] - 1
for xj in range(layers):
rdc0 = conf['rdc0'][xj][xi][0]
idc0 = conf['idc0'][xj][xi][0]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
if (xi == conf['configurations'] - 1):
rdc0 = conf['rdc0'][layers][0][0]
idc0 = conf['idc0'][layers][0][0]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
output.write("0\n")
output.close()
return result
## \brief Exit code (0 for success)
exit_code = main()
exit(exit_code)