Something went wrong on our end
Select Git revision
model_maker.py
-
Giovanni La Mura authored
Ensure that DEDFB type index for groups of spheres is larger than the index of the first sphere in the group
Giovanni La Mura authoredEnsure that DEDFB type index for groups of spheres is larger than the index of the first sphere in the group
model_maker.py 28.16 KiB
#!/bin/python3
# Copyright (C) 2024 INAF - Osservatorio Astronomico di Cagliari
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# A copy of the GNU General Public License is distributed along with
# this program in the COPYING file. If not, see: <https://www.gnu.org/licenses/>.
## @package model_maker
# \brief Script to build models from YAML configuration files.
#
# Script to assist in the creation of model input files starting
# from a YAML descriptor.
#
# The script requires python3.
import yaml
from pathlib import PurePath
from sys import argv
## \brief Main execution code
#
# `main()` is the function that handles the creation of the code configuration.
# It returns an integer value as exit code, using 0 to signal successful execution.
#
# \returns result: `int` Number of detected error-level inconsistencies.
def main():
result = 0
config = parse_arguments()
if (config['help_mode'] or config['yml_file_name'] == ""):
print_help()
else:
sconf, gconf = load_model(config['yml_file_name'])
if (sconf is not None) and (gconf is not None):
result = write_legacy_sconf(sconf)
if (result == 0): result += write_legacy_gconf(gconf)
else:
print("ERROR: could not create configuration.")
result = 1
return result
## \brief Populate the dielectric constant data via interpolation
#
# \param sconf: `dict` Scatterer configuration dictionary.
# \return result: `int` An exit code (0 if successful).
def interpolate_constants(sconf):
result = 0
for i in range(sconf['configurations']):
for j in range(sconf['nshl'][i]):
file_idx = sconf['dielec_id'][i][j]
dielec_path = PurePath(sconf['dielec_path'], sconf['dielec_file'][int(file_idx) - 1])
file_name = str(dielec_path)
dielec_file = open(file_name, 'r')
wavelengths = []
rpart = []
ipart = []
str_line = dielec_file.readline()
while (str_line != ""):
if (not str_line.startswith('#')):
split_line = str_line.split(',')
if (len(split_line) == 3):
wavelengths.append(float(split_line[0]))
rpart.append(float(split_line[1]))
ipart.append(float(split_line[2]))
str_line = dielec_file.readline()
dielec_file.close()
wi = 0
x0 = 0.0
x1 = 0.0
ry0 = 0.0
iy0 = 0.0
ry1 = 0.0
iy1 = 0.0
for dci in range(sconf['nxi']):
w = sconf['vec_xi'][dci]
while (w > x1):
x0 = wavelengths[wi]
ry0 = rpart[wi]
iy0 = ipart[wi]
if (wi == len(wavelengths)):
print("ERROR: file %s does not cover requested wavelengths!"%file_name)
return 1
wi += 1
x1 = wavelengths[wi]
ry1 = rpart[wi]
iy1 = ipart[wi]
if (wi > 0):
x0 = wavelengths[wi - 1]
ry0 = rpart[wi - 1]
iy0 = ipart[wi - 1]
dx = w - x0
dry = (ry1 - ry0) / (x1 - x0) * dx
diy = (iy1 - iy0) / (x1 - x0) * dx
ry = ry0 + dry
iy = iy0 + diy
sconf['rdc0'][j][i][dci] = ry
sconf['idc0'][j][i][dci] = iy
else:
if (wavelengths[wi] == w):
sconf['rdc0'][j][i][dci] = rpart[0]
sconf['idc0'][j][i][dci] = ipart[0]
else:
print("ERROR: file %s does not cover requested wavelengths!"%file_name)
return 2
return result
## \brief Create tha calculation configuration structure from YAML input.
#
# \param model_file: `str` Full path to the YAML input file.
# \return sconf, gconf: `tuple` A dictionary tuple for scatterer and
# geometric configurations.
def load_model(model_file):
sconf = None
gconf = None
model = None
try:
with open(model_file, 'r') as stream:
model = yaml.safe_load(stream)
except yaml.YAMLError:
print("ERROR: " + model_file + " is not a valid YAML file!")
except FileNotFoundError:
print("ERROR: " + model_file + " was not found!")
if model is not None:
# Create the sconf dict
sconf = {
'out_file': PurePath(
model['input_settings']['input_folder'],
model['input_settings']['spheres_file']
)
}
sconf['nsph'] = int(model['particle_settings']['n_spheres'])
sconf['application'] = model['particle_settings']['application']
sconf['ies'] = 1 if sconf['application'] == "INCLUSION" else 0
sconf['exdc'] = float(model['material_settings']['extern_diel'])
sconf['wp'] = float(model['radiation_settings']['wp'])
sconf['xip'] = float(model['radiation_settings']['xip'])
sconf['idfc'] = int(model['material_settings']['diel_flag'])
sconf['instpc'] = int(model['radiation_settings']['step_flag'])
sconf['xi_start'] = float(model['radiation_settings']['scale_start'])
sconf['xi_end'] = float(model['radiation_settings']['scale_end'])
sconf['xi_step'] = float(model['radiation_settings']['scale_step'])
sconf['configurations'] = int(model['particle_settings']['n_types'])
sconf['dielec_path'] = model['material_settings']['dielec_path']
sconf['dielec_file'] = model['material_settings']['dielec_file']
num_dielec = 0 # len(model['particle_settings']['dielec_id'])
if (sconf['idfc'] == -1):
num_dielec = len(model['material_settings']['diel_const'])
elif (sconf['idfc'] == 0):
num_dielec = len(model['particle_settings']['dielec_id'])
if (len(model['particle_settings']['n_layers']) != sconf['configurations']):
print("ERROR: Declared number of layers does not match number of types!")
return (None, None)
else:
sconf['nshl'] = [0 for i in range(sconf['configurations'])]
for i in range(sconf['configurations']):
sconf['nshl'][i] = int(model['particle_settings']['n_layers'][i])
if (sconf['application'] == "INCLUSION"):
sconf['nshl'][0] += 1
max_layers = max(sconf['nshl'])
if (num_dielec != sconf['configurations']):
print("ERROR: declared array of optical constants does not match configurations!")
return (None, None)
else:
if (sconf['idfc'] == 0):
sconf['dielec_id'] = [
[ 0 for j in range(max_layers)] for i in range(sconf['configurations'])
]
for i in range(sconf['configurations']):
if (len(model['particle_settings']['dielec_id'][i]) != sconf['nshl'][i]):
print("ERROR: Declared materials in type %d do not match number of layers!"%i)
return (None, None)
else:
for j in range(sconf['nshl'][i]):
sconf['dielec_id'][i][j] = float(model['particle_settings']['dielec_id'][i][j])
if (model['radiation_settings']['scale_name'] == "XI"):
sconf['insn'] = 1
sconf['nxi'] = 1 + int((sconf['xi_end'] - sconf['xi_start']) / sconf['xi_step'])
sconf['vec_xi'] = [0.0 for i in range(sconf['nxi'])]
for i in range(sconf['nxi']):
sconf['vec_xi'][i] = sconf['xi_start'] + i * sconf['xi_step']
# Set up dielectric constants
if (num_dielec != sconf['configurations']):
print("ERROR: delcared dielectric constants do not match number of sphere types!")
return (None, None)
else:
if (sconf['idfc'] == -1):
sconf['rdc0'] = [
[
[0.0 for k in range(1)] for j in range(sconf['configurations'])
] for i in range(max_layers)
]
sconf['idc0'] = [
[
[0.0 for k in range(1)] for j in range(sconf['configurations'])
] for i in range(max_layers)
]
for di in range(max_layers):
for dj in range(sconf['configurations']):
if (len(model['material_settings']['diel_const'][dj]) / 2 != sconf['nshl'][dj]):
print("ERROR: dielectric constants for type %d do not match number of layers!"%(dj + 1))
return (None, None)
else:
sconf['rdc0'][di][dj][0] = float(model['material_settings']['diel_const'][dj][2 * di])
sconf['idc0'][di][dj][0] = float(model['material_settings']['diel_const'][dj][2 * di + 1])
else: # sconf[idfc] != -1 and scaling on XI
print("ERROR: for scaling on XI, optical constants must be defined!")
return (None, None)
elif (model['radiation_settings']['scale_name'] == "WAVELENGTH"):
sconf['insn'] = 3
if (model['material_settings']['match_mode'] == "INTERPOLATE"):
sconf['nxi'] = 1 + int((sconf['xi_end'] - sconf['xi_start']) / sconf['xi_step'])
sconf['vec_xi'] = [0.0 for i in range(sconf['nxi'])]
for i in range(sconf['nxi']):
sconf['vec_xi'][i] = sconf['xi_start'] + i * sconf['xi_step']
# Set up the dielectric constants
if (sconf['idfc'] == 0):
sconf['rdc0'] = [
[
[0.0 for k in range(sconf['nxi'])] for j in range(sconf['configurations'])
] for i in range(max_layers)
]
sconf['idc0'] = [
[
[0.0 for k in range(sconf['nxi'])] for j in range(sconf['configurations'])
] for i in range(max_layers)
]
interpolate_constants(sconf)
else: # sconf[idfc] != 0 and scaling on wavelength
print("ERROR: for wavelength scaling, optical constants must be tabulated!")
return (None, None)
elif (model['material_settings']['match_mode'] == "GRID"):
match_grid(sconf)
else:
print("ERROR: %s is not a recognized match mode!"%(model['material_settings']['match_mode']))
return (None, None)
else:
print("ERROR: %s is not a supported scaling mode!"%(model['radiation_settings']['scale_name']))
return (None, None)
sph_types = model['particle_settings']['sph_types']
if (len(sph_types) != sconf['nsph']):
print("ERROR: vector of sphere types does not match the declared number of spheres!")
return (None, None)
else:
sconf['vec_types'] = [int(str_typ) for str_typ in sph_types]
if (len(model['particle_settings']['radii']) != sconf['configurations']):
print("ERROR: Declared number of radii does not match number of types!")
return (None, None)
else:
sconf['ros'] = [0.0 for i in range(sconf['configurations'])]
for i in range(sconf['configurations']):
sconf['ros'][i] = float(model['particle_settings']['radii'][i])
if (len(model['particle_settings']['rad_frac']) != sconf['configurations']):
print("ERROR: Declared number of fractional radii does not match number of types!")
return (None, None)
else:
sconf['rcf'] = [
[0.0 for j in range(max_layers)] for i in range(sconf['configurations'])
]
for i in range(sconf['configurations']):
if (len(model['particle_settings']['rad_frac'][i]) != sconf['nshl'][i]):
print("ERROR: Declared transition radii in type %d do not match number of layers!"%i)
return (None, None)
else:
for j in range(sconf['nshl'][i]):
sconf['rcf'][i][j] = float(model['particle_settings']['rad_frac'][i][j])
# Create the gconf dict
str_polar = model['radiation_settings']['polarization']
if (str_polar not in ["LINEAR", "CIRCULAR"]):
print("ERROR: %s is not a recognized polarization state."%str_polar)
return (None, None)
gconf = {
'out_file': PurePath(
model['input_settings']['input_folder'],
model['input_settings']['geometry_file']
)
}
gconf['nsph'] = sconf['nsph']
gconf['application'] = model['particle_settings']['application']
gconf['li'] = int(model['geometry_settings']['li'])
gconf['le'] = int(
gconf['li'] if gconf['application'] == "SPHERE" else model['geometry_settings']['le']
)
gconf['inpol'] = 0 if str_polar == "LINEAR" else 1
gconf['npnt'] = int(model['geometry_settings']['npnt'])
gconf['npntts'] = int(model['geometry_settings']['npntts'])
if (gconf['application'] != "SPHERE"):
gconf['iavm'] = int(model['geometry_settings']['iavm'])
gconf['isam'] = int(model['geometry_settings']['isam'])
gconf['jwtm'] = int(model['output_settings']['jwtm'])
gconf['th'] = float(model['geometry_settings']['in_th_start'])
gconf['thstp'] = float(model['geometry_settings']['in_th_step'])
gconf['thlst'] = float(model['geometry_settings']['in_th_end'])
gconf['ph'] = float(model['geometry_settings']['in_ph_start'])
gconf['phstp'] = float(model['geometry_settings']['in_ph_step'])
gconf['phlst'] = float(model['geometry_settings']['in_ph_end'])
gconf['ths'] = float(model['geometry_settings']['sc_th_start'])
gconf['thsstp'] = float(model['geometry_settings']['sc_th_step'])
gconf['thslst'] = float(model['geometry_settings']['sc_th_end'])
gconf['phs'] = float(model['geometry_settings']['sc_ph_start'])
gconf['phsstp'] = float(model['geometry_settings']['sc_ph_step'])
gconf['phslst'] = float(model['geometry_settings']['sc_ph_end'])
gconf['vec_sph_x'] = [0.0 for i in range(gconf['nsph'])]
gconf['vec_sph_y'] = [0.0 for i in range(gconf['nsph'])]
gconf['vec_sph_z'] = [0.0 for i in range(gconf['nsph'])]
if (gconf['application'] != "SPHERE" or gconf['nsph'] != 1):
# TODO: put here a test to allow for empty vectors in case of random generation
if (len(model['geometry_settings']['x_coords']) != gconf['nsph']):
print("ERROR: X coordinates do not match the number of spheres!")
return (None, None)
if (len(model['geometry_settings']['y_coords']) != gconf['nsph']):
print("ERROR: Y coordinates do not match the number of spheres!")
return (None, None)
if (len(model['geometry_settings']['z_coords']) != gconf['nsph']):
print("ERROR: Z coordinates do not match the number of spheres!")
return (None, None)
for si in range(gconf['nsph']):
gconf['vec_sph_x'][si] = float(model['geometry_settings']['x_coords'][si])
gconf['vec_sph_y'][si] = float(model['geometry_settings']['y_coords'][si])
gconf['vec_sph_z'][si] = float(model['geometry_settings']['z_coords'][si])
else: # model is None
print("ERROR: could not parse " + model_file + "!")
return (sconf, gconf)
## \brief Populate the dielectric constant data matching a grid.
#
# Important note: if the configuration requests that more than one
# optical constants file should be used, all the files must provide
# their constants for the same vector of wavelengths.
#
# \param sconf: `dict` Scatterer configuration dictionary.
# \return result: `int` An exit code (0 if successful).
def match_grid(sconf):
result = 0
max_layers = 0
nxi = 0
sconf['vec_xi'] = []
for i in range(sconf['configurations']):
for j in range(sconf['nshl'][i]):
file_idx = sconf['dielec_id'][i][j]
dielec_path = PurePath(sconf['dielec_path'], sconf['dielec_file'][int(file_idx) - 1])
file_name = str(dielec_path)
dielec_file = open(file_name, 'r')
wavelengths = []
rpart = []
ipart = []
str_line = dielec_file.readline()
while (str_line != ""):
if (not str_line.startswith('#')):
split_line = str_line.split(',')
if (len(split_line) == 3):
wavelengths.append(float(split_line[0]))
rpart.append(float(split_line[1]))
ipart.append(float(split_line[2]))
str_line = dielec_file.readline()
dielec_file.close()
if (max_layers == 0):
# This is executed only once
max_layers = max(sconf['nshl'])
w_start = sconf['xi_start']
w_end = sconf['xi_end']
for wi in range(len(wavelengths)):
w = wavelengths[wi]
if (w >= w_start and w <= w_end):
sconf['vec_xi'].append(w)
nxi += 1
sconf['rdc0'] = [
[
[
0.0 for dk in range(nxi)
] for dj in range(sconf['configurations'])
] for di in range(max_layers)
]
sconf['idc0'] = [
[
[
0.0 for dk in range(nxi)
] for dj in range(sconf['configurations'])
] for di in range(max_layers)
]
sconf['nxi'] = nxi
# This is executed for all layers in all configurations
wi = 0
x = wavelengths[wi]
ry = rpart[wi]
iy = ipart[wi]
for dci in range(sconf['nxi']):
w = sconf['vec_xi'][dci]
while (w > x):
x = wavelengths[wi]
ry = rpart[wi]
iy = ipart[wi]
if (wi == len(wavelengths)):
print("ERROR: file %s does not cover requested wavelengths!"%file_name)
return 1
wi += 1
sconf['rdc0'][j][i][dci] = ry
sconf['idc0'][j][i][dci] = iy
return result
## \brief Parse the command line arguments.
#
# The script behaviour can be modified through a set of optional arguments.
# The purpose of this function is to parse the command line in search for
# such arguments and prepare the execution accordingly.
#
# \returns config: `dict` A dictionary containing the script configuration.
def parse_arguments():
config = {
'yml_file_name': "",
'help_mode': False,
}
yml_set = False
for arg in argv[1:]:
if (arg.startswith("--help")):
config['help_mode'] = True
elif (not yml_set):
if (not arg.startswith("--")):
config['yml_file_name'] = arg
yml_set = True
else:
raise Exception("Unrecognized argument \'{0:s}\'".format(arg))
return config
## \brief Print a command-line help summary.
def print_help():
print("###############################################")
print("# #")
print("# NPtm_code MODEL_MAKER #")
print("# #")
print("###############################################")
print(" ")
print("Create input files for FORTRAN and C++ code. ")
print(" ")
print("Usage: \"./model_maker.py CONFIG [OPTIONS]\" ")
print(" ")
print("CONFIG must be a valid YAML configuration file.")
print(" ")
print("Valid options are: ")
print("--help Print this help and exit.")
print(" ")
## \brief Write the geometry configuration dictionary to legacy format.
#
# \param conf: `dict` Geometry configuration dictionary.
# \return result: `int` An exit code (0 if successful).
def write_legacy_gconf(conf):
result = 0
out_file = str(conf['out_file'])
nsph = conf['nsph']
str_line = "INIT"
# Write legacy output
output = open(out_file, 'w')
if (conf['application'] == "SPHERE"):
str_line = " {0:4d} {1:4d} {2:4d} {3:4d} {4:4d} {5:4d}\n".format(
nsph, conf['li'], conf['inpol'], conf['npnt'], conf['npntts'], conf['isam']
)
output.write(str_line)
else:
mxndm = 2 * nsph * conf['li'] * (conf['li'] + 2)
str_line = " {0:4d} {1:4d} {2:4d} {3:4d} {4:4d} {5:4d} {6:4d} {7:4d} {8:4d}\n".format(
nsph, conf['li'], conf['le'], mxndm, conf['inpol'],
conf['npnt'], conf['npntts'], conf['iavm'], conf['isam']
)
output.write(str_line)
for si in range(nsph):
str_line = " {0:15.8E} {1:15.8E} {2:15.8E}\n".format(
conf['vec_sph_x'][si], conf['vec_sph_y'][si], conf['vec_sph_z'][si]
)
output.write(str_line)
str_line = " {0:7.2E} {1:7.2E} {2:7.2E} {3:7.2E} {4:7.2E} {5:7.2E}\n".format(
conf['th'], conf['thstp'], conf['thlst'],
conf['ph'], conf['phstp'], conf['phlst']
)
output.write(str_line)
str_line = " {0:7.2E} {1:7.2E} {2:7.2E} {3:7.2E} {4:7.2E} {5:7.2E}\n".format(
conf['ths'], conf['thsstp'], conf['thslst'],
conf['phs'], conf['phsstp'], conf['phslst']
)
output.write(str_line)
str_line = " {0:d}\n0\n".format(conf['jwtm'])
output.write(str_line)
output.close()
return result
## \brief Write the scatterer configuration dictionary to legacy format.
#
# \param conf: `dict` Scatterer configuration dictionary.
# \return result: `int` An exit code (0 if successful).
def write_legacy_sconf(conf):
result = 0
out_file = str(conf['out_file'])
nsph = conf['nsph']
ies = conf['ies']
exdc = conf['exdc']
wp = conf['wp']
xip = conf['xip']
idfc = conf['idfc']
instpc = conf['instpc']
xi_flag = 3
nxi = conf['nxi']
# Write legacy output
output = open(out_file, 'w')
str_line = " {0:3d}{1:3d}\n".format(nsph, ies)
output.write(str_line)
str_line = " {0:12.7E} {1:12.7E} {2:12.7E} {3:2d} {4:4d} {5:4d} {6:3d}\n".format(
exdc, wp, xip, idfc, nxi, instpc, xi_flag
)
output.write(str_line)
if (instpc == 0):
for ixi in range(nxi):
str_line = "{0:.3E}\n".format(conf['vec_xi'][ixi])
output.write(str_line)
else:
str_line = "{0:.3E} {1:.3E}\n".format(conf['xi_start'], conf['xi_step'])
output.write(str_line)
sphere_line_count = 0
placed_spheres = 0
last_type = 0
dedfb_type = 0
for si in range(nsph):
if (conf['vec_types'][si] > last_type):
dedfb_type = placed_spheres + 1
last_type = conf['vec_types'][si]
str_line = "{0:5d}".format(dedfb_type)
output.write(str_line)
sphere_line_count += 1
placed_spheres += 1
if (sphere_line_count == 16):
output.write("\n")
sphere_line_count = 0
if (sphere_line_count != 0):
output.write("\n")
for ci in range(conf['configurations']):
layers = conf['nshl'][ci]
if (conf['application'] == "INCLUSION" and ci == 0):
layers -= 1
str_line = "{0:3d} {1:15.7E}\n".format(layers, conf['ros'][ci])
output.write(str_line)
for cj in range(conf['nshl'][ci]):
str_line = " {0:.7E}\n".format(conf['rcf'][ci][cj])
output.write(str_line)
if (conf['application'] != "INCLUSION"):
if (conf['idfc'] == 0):
# Write all wavelength dependent constants for each layer in each configuration
for xi in range(conf['configurations']):
for xj in range(conf['nshl'][xi]):
for xk in range(conf['nxi']):
for xii in range(conf['configurations']):
rdc0 = conf['rdc0'][xj][xii][xk]
idc0 = conf['idc0'][xj][xii][xk]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
elif (conf['idfc'] == -1):
# Write reference scale constants for each layer in each configuration
for xi in range(conf['configurations']):
for xj in range(conf['nshl'][xi]):
rdc0 = conf['rdc0'][xj][xi][0]
idc0 = conf['idc0'][xj][xi][0]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
else: # specialized output for INCLUSION
if (conf['idfc'] == 0):
# Write all wavelength dependent constants for each layer in each configuration
for xi in range(conf['configurations']):
layers = conf['nshl'][xi] - 1
for xj in range(layers):
for xk in range(conf['nxi']):
for xii in range(conf['configurations']):
rdc0 = conf['rdc0'][xj][xii][xk]
idc0 = conf['idc0'][xj][xii][xk]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
if (xii == conf['configurations'] - 1):
rdc0 = conf['rdc0'][layers][0][xk]
idc0 = conf['idc0'][layers][0][xk]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
elif (conf['idfc'] == -1):
# Write reference scale constants for each layer in each configuration
for xi in range(conf['configurations']):
layers = conf['nshl'][xi] - 1
for xj in range(layers):
rdc0 = conf['rdc0'][xj][xi][0]
idc0 = conf['idc0'][xj][xi][0]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
if (xi == conf['configurations'] - 1):
rdc0 = conf['rdc0'][layers][0][0]
idc0 = conf['idc0'][layers][0][0]
if (rdc0 != 0.0 or idc0 != 0.0):
str_line = "({0:11.5E},{1:11.5E})\n".format(rdc0, idc0)
output.write(str_line)
output.write("0\n")
output.close()
return result
## \brief Exit code (0 for success)
exit_code = main()
exit(exit_code)