Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • main
  • v0.1
  • v0.2
  • v0.2.1
  • v0.3
  • v0.3.1
  • v0.3.2
  • v0.3.3
8 results

Target

Select target project
  • andrea.giannetti/swiss_army_knife_stable
1 result
Select Git revision
  • main
  • v0.1
  • v0.2
  • v0.2.1
  • v0.3
  • v0.3.1
  • v0.3.2
  • v0.3.3
8 results
Show changes
Showing
with 3659 additions and 0 deletions
import pandas as pd
import yaml
import os
df = pd.read_csv(os.path.join('assets', 'data', 'ch3oh_data_top35.csv'))
config_dict = {
'simulated_ratio_realizations': 1000,
'recompute_kde': True,
'probability_threshold': 0.37,
'best_bandwidths': {
'87-86': 0.1,
'88-86': 0.1,
'88-87': 0.1,
'257-256': 0.4,
'381-380': 0.2,
},
'ratio_limits': {
'87-86': [0, 1.7],
'88-86': [0, 1.7],
'88-87': [0, 1.7],
'257-256': [0, 15],
'381-380': [0, 4],
},
'use_model_for_inference': 'PLACEHOLDER',
'ratios_to_include': [
"87-86",
"88-86",
"88-87",
"257-256",
"381-380"
]
}
intensities_dict = {}
uncertainties_dict = {}
data_dict = df.to_dict(orient='records')
for line_id in ['86', '87', '88', '257', '256', '381', '380']:
intensities_dict[line_id] = {}
uncertainties_dict[line_id] = {}
for row in data_dict:
intensities_dict[line_id][str(row['source_name'])] = row[f'area_{line_id}']
uncertainties_dict[line_id][str(row['source_name'])] = row[f'area_unc_{line_id}']
config_dict['measured_integrated_intensities'] = intensities_dict
config_dict['integrated_intensities_uncertainties'] = uncertainties_dict
with open('config/density_inference_input.yml', 'w') as outfile:
yaml.dump(config_dict, outfile)
import numpy as np
import os
import matplotlib.pyplot as plt
import pickle
from scipy.interpolate import splrep, BSpline, RegularGridInterpolator
from scipy.optimize import curve_fit
from assets.commons import (get_data,
load_config_file,
setup_logger,
validate_parameter)
from typing import Union, List
def approx(x, x_shift, a, b, exponent, scale=1):
return (np.abs(np.arctanh(2 * (x - x_shift) / scale) + a)) ** exponent + b
def main(ratios_to_fit: Union[List[str], None] = None):
config = load_config_file(config_file_path=os.path.join('config', 'config.yml'))
_ratio_to_fit = validate_parameter(
param_to_validate=ratios_to_fit,
default=['-'.join(lines_ratio) for lines_ratio in config['overrides']['lines_to_process']])
data = get_data(limit_rows=None,
use_model_for_inference=config['run_type'])
logger.info(f'Using {config["run_type"]} to fit data...')
logger.info(f'Data contains {data.shape[0]} rows')
bw = {
'87-86': 0.1,
'88-87': 0.1,
'88-86': 0.1,
'257-256': 0.4,
'381-380': 0.2}
x0 = {
'87-86': ((0.55, 2.3, 3.75, 0.9),),
'88-87': ((0.59, 2.3, 3.9, 1.05, 0.96),),
'88-86': ((0.55, 2.5, 4.3, 0.83, 1.08),),
'257-256': ((5, 2.5, 2.45, 0.75, 8), (5, 2.5, 3.33, 0.75, -8)),
'381-380': ((2.04, 2.5, 3.1, 0.6, 2.1), (2.18, 2.5, 3.7, 0.79, -2.4)),
}
min_ratio = {
'87-86': 0.08,
'88-87': 0.08,
'88-86': 0.03,
'257-256': 1,
'381-380': 1}
max_ratio = {
'87-86': 1.04,
'88-87': 1.02,
'88-86': 1.07,
'257-256': 7.5,
'381-380': 2.9}
for ratio_string in _ratio_to_fit:
_column_to_fit = f'ratio_{ratio_string}'
data.sort_values(by=_column_to_fit, inplace=True)
plt.clf()
plt.scatter(data[_column_to_fit], data['avg_nh2'], marker='+', alpha=0.1)
x_reg = np.linspace(min_ratio[ratio_string], max_ratio[ratio_string], 100)
data_clean = data[[_column_to_fit, 'avg_nh2']].loc[data[_column_to_fit] <= max_ratio[ratio_string]].copy()
kde = pickle.load(
open(f'prs/output/run_type/{config["run_type"]}/trained_model/ratio_density_kde_{ratio_string}.pickle',
'rb'))
rgi = RegularGridInterpolator((kde['y'] * bw[ratio_string], kde['x'] * 0.2), kde['values'].reshape(200, 200).T)
density = rgi(data_clean[[_column_to_fit, 'avg_nh2']])
data_clean = data_clean.loc[(density > density.mean() - density.std())]
plt.scatter(data_clean[_column_to_fit], data_clean['avg_nh2'], marker='+', alpha=0.1, color='red')
for components in x0[ratio_string]:
plt.plot(x_reg, approx(x_reg, *components), color='cyan')
plt.show()
if __name__ == '__main__':
logger = setup_logger(name='POLYFIT')
main()
from assets.utils import get_poc_results
from scipy.stats import anderson_ksamp
from assets.commons import setup_logger
import numpy as np
logger = setup_logger(name='PRS - AD test')
df_full = get_poc_results(line_fit_filename='ch3oh_data_top35.csv')
df_full['best_fit'] /= 1e5
df_full['best_fit'] = df_full['best_fit'].round(1)
df_full['mass'] = (df_full['mass'] / 100).round(1)
df_full['distance'] = (df_full['distance']).round(1)
df_full['diameter'] = (df_full['diameter']).round(1)
df_full = df_full[(df_full['mass'] < 100) & (df_full['mass'] > 3)]
# df_full = df_full[df_full['class'] != 'HII']
# df_full = df_full[df_full['class'].isin(['IRb', 'IRw'])]
ad_results = {}
for column in ['mass', 'distance', 'best_fit', 'diameter']:
array_list = [d[column].values for _, d in df_full.groupby(['class'])]
ad_results[column] = anderson_ksamp(array_list)[2]
logger.info(f'The p values of the AD test are: {ad_results}')
import os
import sqlalchemy
import numpy as np
import astropy.units as u
from datetime import datetime
from astropy.io import fits
from astropy import constants
from typing import Union, List
from assets.commons import (validate_parameter,
get_value_if_specified,
load_config_file,
setup_logger,
compute_unique_hash_filename)
from assets.commons.db_utils import upsert, get_pg_engine
from assets.constants import aggregation_function_mapping
from stg.stg_build_db_structure import (MomentZeroMaps,
RatioMaps)
logger = setup_logger(name='PRS')
def populate_mom_zero_table(config_prs: dict,
fits_cube_name: str,
moment_zero_map_fits: str,
aggregated_moment_zero: float,
executed_on: datetime.timestamp,
engine: sqlalchemy.engine,
run_id: str):
"""
Insert record in DB for the moment zero table
:param config_prs: configuration of the presentation layer
:param fits_cube_name: name of the fits cube
:param moment_zero_map_fits: filename of the moment zero map
:param aggregated_moment_zero: the aggregated value of the moment zero
:param executed_on: timestamp of execution
:param engine: SQLAlchemy engine to use
:param run_id: the run unique identifier
"""
_integration_limit_low = get_value_if_specified(config_prs, 'integration_limits')[0] if get_value_if_specified(
config_prs, 'integration_limits') != 'all' else None
_integration_limit_high = get_value_if_specified(config_prs, 'integration_limits')[1] if get_value_if_specified(
config_prs, 'integration_limits') != 'all' else None
moment_zero_dict = {
'mom_zero_name': f'{moment_zero_map_fits}',
'fits_cube_name': f'{fits_cube_name}',
'integration_limit_low': _integration_limit_low,
'integration_limit_high': _integration_limit_high,
'aggregated_moment_zero': aggregated_moment_zero,
'aggregation_function': config_prs['moment_zero_aggregation_function'],
'created_on': executed_on,
'run_id': run_id,
}
upsert(
table_object=MomentZeroMaps,
row_dict=moment_zero_dict,
conflict_keys=[MomentZeroMaps.mom_zero_name,
MomentZeroMaps.run_id],
engine=engine
)
def populate_line_ratios_table(config_prs: dict,
moment_zero_map_fits_list: List[str],
ratio_map_name: str,
aggregated_ratio: float,
engine: sqlalchemy.engine,
executed_on: datetime.timestamp,
run_id: str):
"""
Insert record in DB for the line ratios table
:param config_prs: configuration of the presentation layer
:param moment_zero_map_fits_list: list of the moment 0 maps to use to compute the ratio
:param ratio_map_name: filename of the ratio map
:param aggregated_ratio: aggregated value of the ratio
:param engine: SQLAlchemy engine to use
:param executed_on: timestamp of execution
:param run_id: the run unique identifier
"""
_integration_limit_low = get_value_if_specified(config_prs, 'integration_limits')[0] if get_value_if_specified(
config_prs, 'integration_limits') != 'all' else None
_integration_limit_high = get_value_if_specified(config_prs, 'integration_limits')[1] if get_value_if_specified(
config_prs, 'integration_limits') != 'all' else None
assert len(moment_zero_map_fits_list) == 2
line_ratio_dict = {
'ratio_map_name': f'{ratio_map_name}',
'mom_zero_name_1': f'{moment_zero_map_fits_list[0]}',
'mom_zero_name_2': f'{moment_zero_map_fits_list[1]}',
'aggregated_ratio': aggregated_ratio,
'aggregation_function': config_prs['aggregation_function'],
'created_on': executed_on,
'run_id': run_id,
}
upsert(
table_object=RatioMaps,
row_dict=line_ratio_dict,
conflict_keys=[RatioMaps.ratio_map_name,
RatioMaps.run_id],
engine=engine
)
def compute_moment_zero(cube: str,
config: dict,
cube_path: Union[str, None] = None,
moment_zero_path: Union[str, None] = None,
moment_zero_fits_name: Union[str, None] = None,
hdu_idx: int = 0) -> float:
"""
Compute moment zero map, given a cube
:param cube: filename of the fits cube
:param config: configuration of the presentation layer
:param cube_path: path where the fits cube is to be found
:param moment_zero_path: path where the moment 0 map is to be saved
:param moment_zero_fits_name: filename for the moment zero map
:param hdu_idx: index of the HDU to extract the data and the header
:return: the aggregated value of the moment zero, according to the function specified in the configuration
"""
_cube_path = validate_parameter(cube_path, default=os.path.join('prs', 'fits', 'cubes'))
_moment_zero_path = validate_parameter(moment_zero_path, default=os.path.join('prs', 'fits', 'moments'))
_moment_zero_fits_name = validate_parameter(moment_zero_fits_name, default='test_mom0.fits')
fitsfile = open_fits_file_duck_typing(fitsfile=cube, fits_path=_cube_path)
header = fitsfile[hdu_idx].header.copy()
data = fitsfile[hdu_idx].data
conversion = header['CUNIT3'].strip() == 'HZ'
spectral_unit = 'km/s' if conversion is True else 'Hz'
conversion_factor = 1 if conversion is False else (-constants.c * header['CDELT3'] / header['CRVAL3']).to('km/s').value
mom0 = (data.sum(axis=0) * abs(conversion_factor))
keywords_to_delete = ['NAXIS3', 'CRPIX3', 'CDELT3', 'CRVAL3', 'CUNIT3', 'CTYPE3']
header['BUNIT'] += f' {spectral_unit}'
header['BTYPE'] = 'MOMENT0'
header['NAXIS'] = 2
for key in keywords_to_delete:
del header[key]
fits.writeto(os.path.join(_moment_zero_path, _moment_zero_fits_name), data=mom0, header=header, overwrite=True)
try:
return aggregation_function_mapping[config['moment_zero_aggregation_function']](mom0)
except KeyError:
logger.warning(
'Moment zero aggregation function not set or misconfigured. Trying to use the one for the ratio...')
return aggregation_function_mapping[config['aggregation_function']](mom0)
def open_fits_file_duck_typing(fitsfile: Union[str, fits.PrimaryHDU],
fits_path: str = None) -> fits.PrimaryHDU:
_fits_path = validate_parameter(fits_path, default='.')
try:
hdu = fits.open(os.path.join(_fits_path, fitsfile))
except TypeError:
hdu = fitsfile
return hdu
def compute_image_ratios(fits1: str,
fits2: str,
config: dict,
fits_path: Union[str, None] = None,
ratio_fits_path: Union[str, None] = None,
ratio_fits_name: Union[str, None] = None,
hdu1_idx: int = 0,
hdu2_idx: int = 0):
"""
Compute the ratio of the images and save it to fits. The ratio is computed as fits1/fits2
:param fits1: filename of the first moment 0 map
:param fits2: filename of the second moment 0 map
:param config: configuration of the presentation layer
:param fits_path: path where the fits files are to be found
:param ratio_fits_path: path where the ratio map is to be saved
:param ratio_fits_name: filename for the ratio map
:param hdu1_idx: index of the HDU to extract the data and the header, for the first fits file
:param hdu2_idx: index of the HDU to extract the data and the header, for the second fits file
:return:
"""
_fits_path = validate_parameter(fits_path, default=os.path.join('prs', 'fits', 'moments'))
_ratio_fits_path = validate_parameter(ratio_fits_path, default=os.path.join('prs', 'fits', 'ratios'))
_ratio_fits_name = validate_parameter(ratio_fits_name, default='ratio.fits')
hdu1 = open_fits_file_duck_typing(fitsfile=fits1,
fits_path=_fits_path)
hdu2 = open_fits_file_duck_typing(fitsfile=fits2,
fits_path=_fits_path)
ratio_image_data = hdu1[hdu1_idx].data / hdu2[hdu2_idx].data
if config['aggregation_function'] in ('mean', 'sum'):
aggregated_ratio = aggregation_function_mapping[config['aggregation_function']](ratio_image_data)
elif config['aggregation_function'] == 'weighted_mean':
aggregated_ratio = np.nansum(hdu1[hdu1_idx].data) / np.nansum(hdu2[hdu2_idx].data)
else:
raise NotImplementedError('Aggregation function not configured')
output_header = hdu1[hdu1_idx].header
output_header['BUNIT'] = ''
output_header['BTYPE'] = 'INT.RATIO'
fits.writeto(os.path.join(_ratio_fits_path, _ratio_fits_name),
data=ratio_image_data,
header=output_header,
overwrite=True)
return aggregated_ratio
def main(cube_fits_list: List[str],
run_id: str,
mom0_out_cube1: Union[str, None] = None,
mom0_out_cube2: Union[str, None] = None,
engine: sqlalchemy.engine = None) -> str:
assert len(cube_fits_list) == 2
_mom0_out_cube1 = validate_parameter(mom0_out_cube1, default=cube_fits_list[0].replace('.fits', '_mom0.fits'))
_mom0_out_cube2 = validate_parameter(mom0_out_cube2, default=cube_fits_list[1].replace('.fits', '_mom0.fits'))
config_prs = load_config_file(os.path.join('prs', 'config', 'config.yml'))['flux_computation']
executed_on = datetime.now()
if engine is None:
engine = get_pg_engine(logger=logger)
config_prs.update({
'cube_fits_list': cube_fits_list,
'mom0_image_list': [_mom0_out_cube1, _mom0_out_cube2]
})
ratio_fits_name = f'{compute_unique_hash_filename(config=config_prs)}.fits'
aggregated_mom0_1 = compute_moment_zero(cube=cube_fits_list[0],
config=config_prs,
moment_zero_fits_name=_mom0_out_cube1)
aggregated_mom0_2 = compute_moment_zero(cube=cube_fits_list[1],
config=config_prs,
moment_zero_fits_name=_mom0_out_cube2)
aggregated_image_ratio = compute_image_ratios(fits1=_mom0_out_cube1,
fits2=_mom0_out_cube2,
config=config_prs,
ratio_fits_name=ratio_fits_name)
populate_mom_zero_table(config_prs=config_prs,
fits_cube_name=cube_fits_list[0],
moment_zero_map_fits=_mom0_out_cube1,
aggregated_moment_zero=aggregated_mom0_1,
executed_on=executed_on,
engine=engine,
run_id=run_id)
populate_mom_zero_table(config_prs=config_prs,
fits_cube_name=cube_fits_list[1],
aggregated_moment_zero=aggregated_mom0_2,
moment_zero_map_fits=_mom0_out_cube2,
executed_on=executed_on,
engine=engine,
run_id=run_id)
populate_line_ratios_table(config_prs=config_prs,
moment_zero_map_fits_list=[_mom0_out_cube1, _mom0_out_cube2],
ratio_map_name=ratio_fits_name,
aggregated_ratio=aggregated_image_ratio,
engine=engine,
executed_on=executed_on,
run_id=run_id)
return ratio_fits_name
if __name__ == '__main__':
main(cube_fits_list=['test_cube.fits', 'test_cube.fits'],
mom0_out_cube1='test_cube_mom0.fits',
mom0_out_cube2='test_cube_mom0_1.fits',
run_id='test_run')
import operator
import pandas as pd
import os
import pickle
import matplotlib.pyplot as plt
import numpy as np
from itertools import product
import yaml
from sklearn.neighbors import KernelDensity
from sklearn.model_selection import GridSearchCV
from assets.commons import (load_config_file,
validate_parameter,
setup_logger,
get_postprocessed_data)
from assets.constants import line_ratio_mapping
from multiprocessing import Pool
from typing import Tuple, Union, List
from scipy.integrate import cumtrapz
from scipy.stats import truncnorm
from scipy.interpolate import RegularGridInterpolator
from functools import reduce
def get_truncated_normal(mean: float = 0,
sd: float = 1,
lower_bound: float = 0,
upper_bound: float = 10):
return truncnorm(
(lower_bound - mean) / sd, (upper_bound - mean) / sd, loc=mean, scale=sd)
def train_kde_model(ratio: List[str],
training_data: pd.DataFrame,
points_per_axis: int,
best_bandwidth: Union[None, dict],
model_root_folder: Union[str, None] = None,
ratio_limits: Union[None, dict] = None,
rt_adjustment_factor: Union[int, float] = 2) -> Tuple[str, KernelDensity]:
"""
Create the KDE from the modelled datapoints.
:param points_per_axis: number of point in the KDE grid evaluation
:param ratio: the line ratio to use in the model, in the form of a list of 2 line indices, according to the
collisional coefficients file
:param best_bandwidth: bandwidth to use for the KDE smoothing
:param training_data: the dataframe containing the data for the KDE modelling
:param model_root_folder: the folder referring to the model used for computation;
defaults to fiducial model (constant_abundance_p15_q05)
:param rt_adjustment_factor: a scaling factor to apply to the specified bandwidth for the computation of the KDE
for the sparser RT-only data
:param ratio_limits: fixed plotting limits
:return: the string representing the ratio modelled and the KDE itself
"""
_model_root_folder = validate_parameter(
model_root_folder,
default=os.path.join('prs', 'output', 'run_type', 'constant_abundance_p15_q05')
)
ratio_string = '-'.join(ratio)
scaled_grid, model, positions, x, y, z = get_kde(points_per_axis=points_per_axis,
ratio_string=ratio_string,
training_data=training_data,
best_bandwidth=best_bandwidth[ratio_string])
grid = scaled_grid.copy()
grid[0] = scaled_grid[0] * 0.2
grid[1] = scaled_grid[1] * best_bandwidth[ratio_string]
plot_kde_ratio_nh2(grid=grid,
values_on_grid=z.reshape(points_per_axis, points_per_axis),
ratio_string=ratio_string,
model_root_folder=_model_root_folder,
training_data=training_data,
suffix_outfile='_ml',
ratio_limits=ratio_limits[ratio_string])
scaled_grid, _, _, _, _, z_rt_only = get_kde(points_per_axis=points_per_axis,
ratio_string=ratio_string,
training_data=training_data[training_data['source'] == 'RT'],
best_bandwidth=best_bandwidth[ratio_string],
x=x / rt_adjustment_factor,
y=y / rt_adjustment_factor,
bw_adjustment_factor=rt_adjustment_factor)
grid *= rt_adjustment_factor
plot_kde_ratio_nh2(grid=grid,
values_on_grid=z_rt_only.reshape(points_per_axis, points_per_axis),
ratio_string=ratio_string,
model_root_folder=_model_root_folder,
training_data=training_data[training_data['source'] == 'RT'],
ratio_limits=ratio_limits[ratio_string])
with open(
os.path.join(_model_root_folder, 'trained_model', f'ratio_density_kde_{ratio_string}.pickle'), 'wb'
) as outfile:
pickle.dump({'x': x, 'y': y, 'positions': positions, 'values': z, 'values_rt_only': z_rt_only}, outfile)
return ratio_string, model
def get_kde(points_per_axis: int,
ratio_string: str,
training_data: pd.DataFrame,
x: np.array = None,
y: np.array = None,
best_bandwidth: float = None,
bw_adjustment_factor: Union[float, int] = 1) -> tuple:
"""
Compute the Kernel Density Estimate (KDE) for a given ratio and training data.
:param points_per_axis: Number of points to use along each axis for the KDE grid.
:param ratio_string: The ratio string indicating which ratio of the training data to use.
:param training_data: The DataFrame containing the training data.
:param x: Optional. The x-axis values for the KDE grid. Defaults to a computed range if None.
:param y: Optional. The y-axis values for the KDE grid. Defaults to a computed range if None.
:param best_bandwidth: The best bandwidth to use for KDE. Defaults to 0.2 if None.
:param bw_adjustment_factor: The adjustment factor to apply to the bandwidth. Defaults to 1.
:return: A tuple containing the grid, the KDE model, the positions, x-axis values, y-axis values, and the
computed KDE values.
"""
_best_bandwidth = bw_adjustment_factor * validate_parameter(best_bandwidth, 0.2)
_x_bandwidth = bw_adjustment_factor * 0.2
log_nh2 = np.log10(training_data['avg_nh2'])
xy_train = np.array([log_nh2, training_data[f'ratio_{ratio_string}']]).T
xy_train_scaled = xy_train / np.array([_x_bandwidth, _best_bandwidth])
# The grid search suggests a bandwidth of 0.1, but I need to make it larger due to the relatively large spacing
# between models
model = KernelDensity(bandwidth=1, kernel='epanechnikov')
model.fit(xy_train_scaled)
if x is None and y is None:
xmin = log_nh2.min() - 0.5
xmax = log_nh2.max() + 0.5
ymin = np.max([training_data[f'ratio_{ratio_string}'].min() - 0.5, 0])
ymax = training_data[f'ratio_{ratio_string}'].max() * 1.15
x = np.linspace(xmin, xmax, points_per_axis) / _x_bandwidth
y = np.linspace(ymin, ymax, points_per_axis) / _best_bandwidth
grid = np.meshgrid(x, y, indexing='ij')
positions = np.vstack([grid[0].ravel(), grid[1].ravel()])
z = np.exp(model.score_samples(positions.T)) + 1e-5
z /= z.max()
return grid, model, positions, x, y, z
def plot_kde_ratio_nh2(grid: np.array,
values_on_grid: np.array,
ratio_string: str,
model_root_folder: str,
training_data: pd.DataFrame,
suffix_outfile: str = None,
ratio_limits: Union[None, list] = None):
"""
Plot the Kernel Density Estimate (KDE) of a ratio against average H2 density along the line-of-sight and save
the plot as a PNG file.
:param grid: The grid of x and y values used for the KDE.
:param values_on_grid: The computed KDE values on the grid.
:param ratio_string: The ratio string indicating which ratio of the training data to plot.
:param model_root_folder: The root folder where the model and figures are stored.
:param training_data: The DataFrame containing the training data.
:param suffix_outfile: Optional. The suffix to append to the output file name. Defaults to an empty string if None.
:param ratio_limits: Optional. The limits for the ratio axis. Defaults to None, which auto-scales the axis.
:return: None. Saves the plot as a PNG file in the specified folder.
"""
plt.rcParams.update({'font.size': 20})
_suffix_outfile = validate_parameter(suffix_outfile, default='')
plt.clf()
plt.figure(figsize=(8, 6))
plt.scatter(training_data['avg_nh2'], training_data[f'ratio_{ratio_string}'], marker='+', alpha=0.1,
facecolor='grey')
plt.contour(10 ** grid[0], grid[1], values_on_grid, levels=np.arange(0.05, 0.95, 0.15))
plt.semilogx()
plt.xlabel(r'<$n$(H$_2$)> [cm$^{-3}$]')
plt.ylabel(f'Ratio {line_ratio_mapping[ratio_string]}')
plt.ylim(ratio_limits)
plt.tight_layout()
plt.savefig(os.path.join(
model_root_folder,
'figures',
f'ratio_vs_avg_density_los_kde_{ratio_string}{_suffix_outfile}.png'))
def get_results(x_array: np.array,
probability_density: np.array,
probability_threshold: float = 0.05,
interp_points: int = 100) -> Tuple[float, float, List[float]]:
"""
Perform the data fitting, computing the best fit and the highest probability density interval
:param x_array: the density array
:param probability_density: the probability density values
:param probability_threshold: the probability mass contained in the wings
:param interp_points: the number of points used to interpolate the probability density
:return: the probability density threshold, the best-fit value for density, and the HPD interval
"""
probability, centered_probability_density, ordered_idxs = \
get_probability_distribution(probability_density=probability_density,
x_array=x_array)
probability_density_threshold = get_probability_density_threshold(
ordered_probability=probability[ordered_idxs],
ordered_probability_density=centered_probability_density[ordered_idxs],
probability_threshold=probability_threshold
)
hpd_interval = get_hpd_interval(x_array=x_array,
probability_density=probability_density,
hpd_threshold=probability_density_threshold,
interp_points=interp_points)
return probability_density_threshold, x_array[ordered_idxs][-1], hpd_interval
def get_probability_density_threshold(ordered_probability: np.array,
ordered_probability_density: np.array,
probability_threshold=0.05) -> float:
"""
Compute the threshold in probability density that leaves out the requested probability mass
:param ordered_probability: the probability array, ordered according to increasing probability density
:param ordered_probability_density: the ordered probability density array, in ascending order
:param probability_threshold: the probability mass in the tails, outside the HPD
:return: the threshold in HPD that leaves the specified probability mass in the tails
"""
ctrapz_ordered = np.cumsum(ordered_probability)
threshold_idx = np.argmin(np.abs(ctrapz_ordered - probability_threshold))
lower_index = np.max([threshold_idx - 1, 0])
upper_index = np.min([threshold_idx + 1, len(ctrapz_ordered) - 1])
x_interp = np.linspace(lower_index, upper_index, 200)
interpolated_probability = np.interp(x_interp,
[lower_index, threshold_idx, upper_index],
np.take(ctrapz_ordered, [lower_index, threshold_idx, upper_index]))
interp_idx = np.argmin(abs(interpolated_probability - probability_threshold))
weighted_probability_density_threshold = np.interp(x_interp,
[lower_index, threshold_idx, upper_index],
np.take(ordered_probability_density,
[lower_index, threshold_idx, upper_index]))
return weighted_probability_density_threshold[interp_idx]
def get_hpd_interval(x_array: np.array,
probability_density: np.array,
hpd_threshold: float,
interp_points: int = 100) -> List[float]:
"""
Compute the highest probability density interval, that leaves out the given probability density mass
:param x_array: the density array
:param probability_density: the probability density array
:param hpd_threshold: the probability density threshold
:param interp_points: the number of points used to interpolate the probability density
:return: a list of the boundaries fot the HPD (can be composed by multiple intervals
"""
hpd_interval = []
elements_above_threshold = np.argwhere(probability_density - hpd_threshold >= 0).flatten()
intervals = np.split(elements_above_threshold, np.nonzero(np.diff(elements_above_threshold) != 1)[0] + 1)
idx_list = []
try:
for interval in intervals:
idx_list.append([np.min(interval) - 1, np.max(interval)])
flat_list = [item for sublist in idx_list for item in sublist]
for idx in flat_list:
if idx < 0:
hpd_interval.append(x_array[0])
elif idx == len(x_array) - 1:
hpd_interval.append(x_array[-1])
else:
x_interp = np.linspace(x_array[idx], x_array[idx + 1], interp_points)
interpolated_probability_density = np.interp(x_interp, x_array[idx:idx + 2],
probability_density[idx:idx + 2])
interp_idx = np.argmin(abs(interpolated_probability_density - hpd_threshold))
hpd_interval.append(x_interp[interp_idx])
except ValueError:
pass
return hpd_interval
def get_probability_distribution(probability_density: np.array,
x_array: np.array) -> Tuple[np.array, np.array, np.array]:
"""
Compute the probability mass distribution, and returns the centered probability density array, and the ordered
indices that sort the probability density in ascending order
:param probability_density: the probability density array
:param x_array: the density array
:return: the normalized probability mass distribution, the centered probability density array, and the indices that
sort the probability density array in ascending order
"""
dx_array = np.diff(x_array)
avg_y_array = 0.5 * (probability_density[1:] + probability_density[0:-1])
probability = avg_y_array * dx_array
probability /= probability.sum()
ordered_idxs = np.argsort(avg_y_array)
return probability, avg_y_array, ordered_idxs
def compute_best_bandwidth(data: pd.DataFrame,
bandwidths: List[float],
line_pairs: List[List[str]],
nthreads: int = 15,
crossvalidation_folds: int = 5,
training_thinning: int = 10,
kernel: str = None) -> List[float]:
"""
Function that compute the best bandwidth for the KDE via cross-validation. Consider that due to the rough spacing
in the characteristic density of the clumps the best bandwidth is too small. We suggest scaling it up by a factor
of 2.
:param data: the data to be used for the cross validation
:param bandwidths: the list of bandwidths to be tested
:param line_pairs: the list of ratios for which the best bandwidth should be determined
:param nthreads: the number of threads to use
:param crossvalidation_folds: the number of folds for cross validation
:param training_thinning: thinning factor for training data set (reduces computational times)
:param kernel: the kernel type, as supported by sklearn; defaults to epanechnikov
:return: the list of best bandwidths, one for each ratio
"""
_kernel = validate_parameter(kernel, 'epanechnikov')
best_bandwidths = []
for ratio in line_pairs:
ratio_string = '-'.join(ratio)
y_train = data[[f'ratio_{ratio_string}']]
model_y = KernelDensity(kernel=_kernel)
cv_grid_y = GridSearchCV(model_y, {'bandwidth': bandwidths}, cv=crossvalidation_folds, n_jobs=nthreads,
verbose=4)
cv_grid_y.fit(y_train.sample(frac=1 / training_thinning))
best_bandwidths.append(cv_grid_y.best_estimator_)
return best_bandwidths
def recompute_and_save_kdes(data: pd.DataFrame,
line_pairs: List[List[str]],
points_per_axis: int,
nthreads: int = 10,
pickle_models_dict_filename: Union[None, str] = None,
model_root_folder: Union[None, str] = None,
best_bandwidths: Union[None, dict] = None,
ratio_limits: Union[None, dict] = None):
"""
Retrieve the dictionary of the KDE models, either by computing it or unpickling it from previous runs
:param points_per_axis: number of points for the KDE grid evaluation
:param data: the data to be used for retraining
:param line_pairs: all the lines pairs that constitute the ratios
:param nthreads: the numbers of separate threads to use
:param pickle_models_dict_filename: the filename of the pickle file containing the model dictionary
:param model_root_folder: the folder referring to the model used for computation;
defaults to fiducial model (constant_abundance_p15_q05)
:param best_bandwidths: kernel bandwidths to use for each ratio
:param ratio_limits: fixed plotting limits
"""
_model_root_folder = validate_parameter(
model_root_folder,
default=os.path.join('prs', 'output', 'run_type', 'constant_abundance_p15_q05')
)
_pickle_models_dict_filename = validate_parameter(pickle_models_dict_filename, default='models_dict.pickle')
models_dict = {}
default_bandwidths = {
'87-86': 0.1,
'88-86': 0.1,
'88-87': 0.1,
'257-256': 0.4,
'381-380': 0.2,
}
_best_bandwidths = validate_parameter(best_bandwidths, default=default_bandwidths)
parallel_args = product(line_pairs, [data], [points_per_axis],
[_best_bandwidths], [_model_root_folder], [ratio_limits])
with Pool(nthreads) as pool:
results = pool.starmap(train_kde_model, parallel_args)
for ratio_string, model in results:
models_dict[ratio_string] = model
with open(os.path.join(_model_root_folder, 'trained_model', _pickle_models_dict_filename), 'wb') as outfile:
pickle.dump(models_dict, outfile)
def plot_results(log_x_grid: np.array,
probability_density: np.array,
probability_density_threshold: float,
hpd_interval: List[float],
source_name: Union[str, None] = None,
model_root_folder: Union[None, str] = None):
"""
Plot the results of the fitting, the posterior of the number density of H2
:param source_name: the source name string
:param hpd_interval: the list of HPD interval boundaries
:param log_x_grid: the logarithmic grid in number density
:param probability_density: the array with the probability density values
:param probability_density_threshold: the threshold where the probability density is to be cut to get the HPD
interval
:param model_root_folder: the folder referring to the model used for computation;
defaults to fiducial model (constant_abundance_p15_q05)
"""
_model_root_folder = validate_parameter(
model_root_folder,
default=os.path.join('prs', 'output', 'run_type', 'constant_abundance_p15_q05')
)
if (source_name is not None) and (source_name != 'default_source'):
_source_name_string = source_name
else:
_source_name_string = ''
plt.figure(figsize=(8, 6))
plt.rcParams.update({'font.size': 16})
plt.plot(10 ** log_x_grid, probability_density, marker='x')
plt.axhline(y=probability_density_threshold, color='r', linestyle='--')
plt.xlim(0.5 * np.min(np.array(hpd_interval)),
2 * np.max(np.array(hpd_interval)))
plt.semilogx()
plt.xlabel(r'Number density [cm$^{-3}$]')
plt.ylabel(r'Probability density')
plt.title('$n$(H$_{2}$)' + f' posterior for {_source_name_string}')
plt.tight_layout()
plt.savefig(os.path.join(_model_root_folder, 'figures', f'density_pdf{_source_name_string}.png'))
plt.clf()
def normalize_probability_density(log_x_grid: np.array,
probability_density: np.array) -> np.array:
"""
Normalize the probability density, so that the integral of the probability is 1
:param log_x_grid: the logarithmic grid in number density
:param probability_density: the array with the probability density values
:return: the normalized probability density function
"""
ecdf = cumtrapz(probability_density, 10 ** log_x_grid)
renorm_constant = ecdf[-1]
ecdf /= renorm_constant
probability_density /= renorm_constant
return probability_density, ecdf
def compute_probability_density(x_grid: np.array,
measured_integrated_intensities: Tuple[float, float],
integrated_intensities_uncertainty: Tuple[float, float],
model: KernelDensity,
ratio_realizations: int = 1000,
best_bandwidth: Union[None, float] = None,
ratio_string: Union[None, str] = None) -> np.array:
"""
Compute the PDF from the model and the realizations of the ratio, given its measurement and the uncertainties.
For now, the ratio meaaurement is assumed to have a gaussian uncertainty.
:param x_grid: the grid in number density
:param measured_integrated_intensities: the measured value of the integrated intensities
:param integrated_intensities_uncertainty: the rms uncertainty on the integrated intensity
:param model: the KDE model for the ratio
:param ratio_realizations: the number of realizations of the ratio, to compute the PDF
:param best_bandwidth: the bandwidth to use for KDE
:return: a dictionary containing the PDF of the density and the ratio string
"""
_best_bandwidth = validate_parameter(best_bandwidth, 0.2)
_ratio_string = validate_parameter(ratio_string, default='not_specified')
simulated_integrated_intensities_0 = get_truncated_normal(mean=measured_integrated_intensities[0],
sd=integrated_intensities_uncertainty[0],
lower_bound=1e-5,
upper_bound=measured_integrated_intensities[0] + 10 *
integrated_intensities_uncertainty[0]) \
.rvs(size=ratio_realizations)
simulated_integrated_intensities_1 = get_truncated_normal(mean=measured_integrated_intensities[1],
sd=integrated_intensities_uncertainty[1],
lower_bound=1e-5,
upper_bound=measured_integrated_intensities[1] + 10 *
integrated_intensities_uncertainty[1]) \
.rvs(size=ratio_realizations)
simulated_ratios = simulated_integrated_intensities_0 / simulated_integrated_intensities_1
interp_function = RegularGridInterpolator((model['x'], model['y']),
model['values'].reshape(model['x'].shape[0], model['y'].shape[0]),
bounds_error=False,
fill_value=0)
positions = np.array(list(product(x_grid / 0.2, simulated_ratios / _best_bandwidth)))
z = interp_function(positions)
probability_density = np.nansum(z.reshape((len(x_grid), ratio_realizations)).T, axis=0)
probability_density /= np.trapz(probability_density, x_grid)
return {_ratio_string: probability_density}
def main(measured_integrated_intensity_dict: dict,
integrated_intensity_uncertainty_dict: dict,
ratio_list: list,
probability_threshold: float,
source_names: list,
recompute_kde: bool = False,
points_per_axis: int = 200,
ratio_realizations: int = 1000,
nthreads: int = 10,
limit_rows: Union[None, int] = None,
use_model_for_inference: Union[None, str] = None,
best_bandwidths: Union[None, dict] = None,
ratio_limits: Union[None, dict] = None):
_use_model_for_inference = validate_parameter(
use_model_for_inference,
default='constant_abundance_p15_q05'
)
_model_root_folder = os.path.join('prs', 'output', 'run_type', _use_model_for_inference)
data, line_pairs = get_inference_data(use_model_for_inference=_use_model_for_inference,
limit_rows=limit_rows)
measured_integrated_intensity_coupled = {}
integrated_intensity_uncertainty_coupled = {}
if recompute_kde is True:
recompute_and_save_kdes(data=data,
line_pairs=line_pairs,
nthreads=nthreads,
points_per_axis=points_per_axis,
model_root_folder=_model_root_folder,
best_bandwidths=best_bandwidths,
ratio_limits=ratio_limits)
x_grid = np.linspace(np.log10(0.7 * np.nanmin(data['avg_nh2'])),
np.log10(1.3 * np.nanmax(data['avg_nh2'])), points_per_axis)
results_dict = {}
posteriors = {
'PDF': {},
'ECDF': {},
'density_grid': {}
}
for source_name in source_names:
_ratio_list_per_source = []
for ratio_string in ratio_list:
line_ids = validate_line_ids(line_pairs, ratio_string)
if np.isnan(measured_integrated_intensity_dict[line_ids[0]][source_name]) or \
np.isnan(measured_integrated_intensity_dict[line_ids[1]][source_name]):
logger.warning(f'The ratio {ratio_string} is not available for source {source_name}. Proceeding with the remaining ratios...')
else:
measured_integrated_intensity_coupled[ratio_string] = [
measured_integrated_intensity_dict[line_ids[0]][source_name],
measured_integrated_intensity_dict[line_ids[1]][source_name]
]
integrated_intensity_uncertainty_coupled[ratio_string] = [
integrated_intensity_uncertainty_dict[line_ids[0]][source_name],
integrated_intensity_uncertainty_dict[line_ids[1]][source_name]
]
_ratio_list_per_source.append(ratio_string)
# Compute combined probability density as the product of the individual ratio probability densities
parallel_args = []
for ratio_string in _ratio_list_per_source:
with open(
os.path.join(_model_root_folder, 'trained_model', f'ratio_density_kde_{ratio_string}.pickle'), 'rb'
) as infile:
model_dict = pickle.load(infile)
parallel_args.append([x_grid,
measured_integrated_intensity_coupled[ratio_string],
integrated_intensity_uncertainty_coupled[ratio_string],
model_dict,
ratio_realizations,
best_bandwidths[ratio_string],
ratio_string])
with Pool(nthreads) as pool:
results = pool.starmap(compute_probability_density, parallel_args)
results = reduce(operator.ior, results, {})
combined_probability_density = np.array(list(results.values())).prod(axis=0)
if np.isnan(combined_probability_density.sum()):
logger.warning(f'Source {source_name} does not have ratios compatible with modelling with this grid')
problematic_ratios = [key for key in results.keys() if np.isnan(results[key].sum())]
logger.warning(f'The ratios outside the simulated boundaries are {problematic_ratios}')
results_dict[source_name] = {
'best_fit': np.nan,
'hpd_interval': [np.nan, np.nan]
}
else:
combined_probability_density, combined_ecdf = normalize_probability_density(x_grid,
combined_probability_density)
probability_density_threshold, best_fit, hpd_interval = \
get_results(x_array=10 ** x_grid,
probability_density=combined_probability_density,
probability_threshold=probability_threshold)
plot_results(log_x_grid=x_grid,
probability_density=combined_probability_density,
probability_density_threshold=probability_density_threshold,
hpd_interval=hpd_interval,
source_name=source_name,
model_root_folder=_model_root_folder)
logger.info(f'Processed source {source_name}')
logger.info(f'The best fit value is {best_fit}, with an HPD interval of {hpd_interval}')
logger.info(f'The threshold is {probability_density_threshold}')
results_dict[source_name] = {
'best_fit': float(best_fit),
'hpd_interval': [float(element) for element in hpd_interval]
}
posteriors['PDF'][source_name] = combined_probability_density
posteriors['ECDF'][source_name] = combined_ecdf
posteriors['density_grid'][source_name] = 10 ** x_grid
with open(os.path.join(_model_root_folder, 'volume_density_results.yml'), 'w') as outfile:
yaml.dump(results_dict, outfile)
with open(os.path.join(_model_root_folder, 'posteriors.pickle'), 'wb') as outfile:
pickle.dump(posteriors, outfile)
def validate_line_ids(line_pairs, ratio_string):
line_ids = ratio_string.split('-')
if line_ids not in line_pairs:
raise ValueError(f'Specified ratio {ratio_string} not present in the model ({line_pairs})')
return line_ids
def get_inference_data(use_model_for_inference: str, limit_rows: int):
line_pairs, data = get_postprocessed_data(limit_rows=limit_rows,
use_model_for_inference=use_model_for_inference)
data['source'] = 'RT'
try:
_inferred_data_file = os.path.join(os.path.join('prs', 'output', 'run_type', use_model_for_inference),
'data',
'inferred_data.csv')
inferred_data = pd.read_csv(_inferred_data_file, index_col=0)
inferred_data['source'] = 'ML'
data = pd.concat([data, inferred_data], axis=0, ignore_index=True)
except IOError:
logger.warning('Inferred data not found. Proceeding with RT-generated data only...')
return data, line_pairs
if __name__ == '__main__':
external_input = load_config_file(config_file_path='config/density_inference_input.yml')
logger = setup_logger(name='PRS - DENSITY INFERENCE')
try:
limit_rows = external_input['limit_rows']
except KeyError:
limit_rows = None
try:
use_model_for_inference = external_input['use_model_for_inference']
if use_model_for_inference == 'PLACEHOLDER':
logger.warning('No model specified for inference in density_inference_input.yml. Using fiducial.')
use_model_for_inference = 'constant_abundance_p15_q05'
except KeyError:
use_model_for_inference = None
try:
points_per_axis = external_input['points_per_axis']
except KeyError:
points_per_axis = 200
try:
nthreads = external_input['nthreads']
except KeyError:
nthreads = 1
try:
sources = set()
for line_id in external_input['measured_integrated_intensities']:
sources = sources.union(set(external_input['measured_integrated_intensities'][line_id].keys()))
_intensities_dict = external_input['measured_integrated_intensities']
_uncertainty_dict = external_input['integrated_intensities_uncertainties']
except AttributeError:
sources = ['default_source']
_intensities_dict = {line_id: {'default_source': external_input['measured_integrated_intensities'][line_id]}
for line_id in external_input['measured_integrated_intensities']}
_uncertainty_dict = {
line_id: {'default_source': external_input['integrated_intensities_uncertainties'][line_id]}
for line_id in external_input['integrated_intensities_uncertainties']}
sources = list(sources)
sources.sort()
logger.info(f'Using {use_model_for_inference} for inference')
main(measured_integrated_intensity_dict=_intensities_dict,
integrated_intensity_uncertainty_dict=_uncertainty_dict,
source_names=sources,
ratio_list=external_input['ratios_to_include'],
probability_threshold=external_input['probability_threshold'],
recompute_kde=external_input['recompute_kde'],
points_per_axis=points_per_axis,
ratio_realizations=external_input['simulated_ratio_realizations'],
nthreads=nthreads,
limit_rows=limit_rows,
use_model_for_inference=use_model_for_inference,
best_bandwidths=external_input['best_bandwidths'],
ratio_limits=external_input['ratio_limits'])
import pandas as pd
import os
import pickle
import numpy as np
from itertools import product
from assets.commons import (load_config_file,
validate_parameter,
setup_logger)
from assets.commons.parsing import parse_grid_overrides
from assets.commons.training_utils import (compute_and_add_similarity_cols,
plot_results,
get_avg_profiles)
from assets.utils import get_profile_input
from typing import Union, List
def main(species: Union[str, None] = None,
tdust: float = 21.0,
nh2: float = 1e5,
model_root_folder: Union[str, None] = None,
ratios_to_process: Union[List[List[str]], None] = None) -> pd.DataFrame:
_model_root_folder = validate_parameter(
model_root_folder,
default=os.path.join('prs', 'output', 'run_type', 'constant_abundance_p15_q05')
)
_ratios_to_process = validate_parameter(
ratios_to_process,
default=[['87', '86'], ['88', '87'], ['88', '86'], ['257', '256'], ['381', '380']]
)
line_set = set([line_id for ratio in _ratios_to_process for line_id in ratio])
(overrides_grid, overrides_abundances) = get_profile_input(species=species,
model_root_folder=_model_root_folder)
(avg_density_profile, avg_temperature_profile,
molecule_coldens, std_density) = \
get_avg_profiles(
value_at_reference_density=nh2,
value_at_reference_temperature=tdust,
**overrides_abundances,
**overrides_grid
)
x_interp = pd.DataFrame()
x_interp['avg_nh2'] = np.log10(avg_density_profile)
x_interp['avg_tdust'] = np.log10(avg_temperature_profile)
x_interp['molecule_column_density'] = np.log10(molecule_coldens)
x_interp['px_index'] = x_interp.index.copy()
x_interp['nh2'] = np.log10(nh2)
x_interp['tdust'] = np.log10(tdust)
x_interp['std_nh2'] = np.log10(std_density)
results_df = pd.DataFrame()
x_interp['px_index'] = x_interp.index.copy()
x_interp = x_interp.dropna(subset=['avg_nh2'])
indices = x_interp['px_index'].reset_index(drop=True)
results_df['px_index'] = indices
for line in line_set:
with open(os.path.join(_model_root_folder, 'trained_model', f'average_features_per_target_bin_{line}.pickle'),
'rb') as infile:
average_features_per_taget_bin = pickle.load(infile)
x_interp = compute_and_add_similarity_cols(average_features_per_target_bin=average_features_per_taget_bin,
input_df=x_interp,
columns_to_drop=['px_index'],
similarity_bins=50)
with open(os.path.join(_model_root_folder, 'trained_model', f'ml_scaler_{line}.pickle'), 'rb') as infile:
scaler = pickle.load(infile)
with open(os.path.join(_model_root_folder, 'trained_model', f'ml_model_{line}.pickle'), 'rb') as infile:
model = pickle.load(infile)
columns = model.feature_names_in_
x_interp_transformed = pd.DataFrame(scaler.transform(x_interp[columns]), columns=columns)
results_df[f'mom_zero_{line}'] = model.predict(x_interp_transformed)
for column in x_interp.columns:
if column.startswith('sim_'):
x_interp.drop(columns=column, inplace=True)
results_df = results_df.merge(x_interp, on=['px_index'], how='inner', validate='1:1')
ratio_string_list = []
for line_pairs in _ratios_to_process:
ratio_string = f'ratio_{line_pairs[0]}-{line_pairs[1]}'
ratio_string_list.append(ratio_string)
results_df[ratio_string] = \
10 ** results_df[f'mom_zero_{line_pairs[0]}'] / 10 ** results_df[f'mom_zero_{line_pairs[1]}']
return results_df[['nh2', 'tdust', 'avg_tdust', 'avg_nh2', 'px_index'] + ratio_string_list]
if __name__ == '__main__':
config = load_config_file(config_file_path=os.path.join('config', 'config.yml'))
external_input = load_config_file(config_file_path=os.path.join('config', 'ml_modelling.yml'))
logger = setup_logger(name='PRS - ML modelling')
try:
limit_rows = external_input['limit_rows']
except KeyError:
limit_rows = None
try:
use_model_for_training = external_input['use_model_for_training']
except KeyError:
use_model_for_training = None
_use_model_for_training = validate_parameter(
use_model_for_training,
default='constant_abundance_p15_q05'
)
_model_root_folder = os.path.join('prs', 'output', 'run_type', _use_model_for_training)
logger.info(f'Using {_use_model_for_training} for training in folder {_model_root_folder}')
tdust_grid = parse_grid_overrides(par_name='tdust', config=external_input)
nh2_grid = parse_grid_overrides(par_name='nh2', config=external_input)
inferred_data_list = []
for (tdust, nh2) in product(tdust_grid, nh2_grid):
logger.info(f'Producing emulated data for {nh2}, {tdust}')
if nh2 not in [1e3, 3e3, 9e3, 27e3, 81e3, 243e3, 729e3, 2187e3]:
inferred_data_list.append(
main(nh2=nh2,
tdust=tdust,
model_root_folder=_model_root_folder,
ratios_to_process=config['overrides']['lines_to_process'])
)
inferred_data = pd.concat(inferred_data_list, axis=0, ignore_index=True)
plot_results(inferred_data=inferred_data,
use_model_for_inference=_use_model_for_training,
ratios_to_process=config['overrides']['lines_to_process'])
inferred_data['avg_nh2'] = 10 ** inferred_data['avg_nh2']
inferred_data.to_csv(os.path.join(_model_root_folder, 'data', 'inferred_data.csv'))
logger.info('Completed emulation')
import pandas as pd
import os
import pickle
import numpy as np
from itertools import product
from assets.commons import (load_config_file,
validate_parameter,
setup_logger)
from assets.commons.parsing import parse_grid_overrides
from assets.commons.training_utils import compute_and_add_similarity_cols
from assets.commons.training_utils import plot_results, get_avg_profiles
from typing import Tuple, Union, List
from assets.utils import get_profile_input
def main(species: Union[str, None] = None,
tdust: float = 21.0,
nh2: float = 1e5,
model_root_folder: Union[str, None] = None,
ratios_to_process: Union[List[List[str]], None] = None) -> pd.DataFrame:
_model_root_folder = validate_parameter(
model_root_folder,
default=os.path.join('prs', 'output', 'run_type', 'constant_abundance_p15_q05')
)
_ratios_to_process = validate_parameter(
ratios_to_process,
default=[['87', '86'], ['88', '87'], ['88', '86'], ['257', '256'], ['381', '380']]
)
(overrides_grid, overrides_abundances) = get_profile_input(species=species,
model_root_folder=_model_root_folder)
(avg_density_profile, avg_temperature_profile,
molecule_coldens, std_density) = \
get_avg_profiles(
value_at_reference_density=nh2,
value_at_reference_temperature=tdust,
**overrides_abundances,
**overrides_grid
)
x_interp = pd.DataFrame()
x_interp['avg_nh2'] = np.log10(avg_density_profile)
x_interp['avg_tdust'] = np.log10(avg_temperature_profile)
x_interp['molecule_column_density'] = np.log10(molecule_coldens)
x_interp['px_index'] = x_interp.index.copy()
x_interp['nh2'] = np.log10(nh2)
x_interp['tdust'] = np.log10(tdust)
x_interp['std_nh2'] = np.log10(std_density)
results_df = pd.DataFrame()
x_interp['px_index'] = x_interp.index.copy()
x_interp = x_interp.dropna(subset=['avg_nh2'])
indices = x_interp['px_index'].reset_index(drop=True)
results_df['px_index'] = indices
ratio_string_list = []
for line_ratio in _ratios_to_process:
line_ratio_string = '-'.join(line_ratio)
ratio_string_list.append(f'ratio_{line_ratio_string}')
with open(os.path.join(_model_root_folder, 'trained_model', f'average_features_per_target_bin_{line_ratio_string}.pickle'),
'rb') as infile:
average_features_per_taget_bin = pickle.load(infile)
x_interp = compute_and_add_similarity_cols(average_features_per_target_bin=average_features_per_taget_bin,
input_df=x_interp,
columns_to_drop=['px_index'],
similarity_bins=50)
with open(os.path.join(_model_root_folder, 'trained_model', f'ml_scaler_{line_ratio_string}.pickle'), 'rb') as infile:
scaler = pickle.load(infile)
with open(os.path.join(_model_root_folder, 'trained_model', f'ml_model_{line_ratio_string}.pickle'), 'rb') as infile:
model = pickle.load(infile)
columns = model.feature_names_in_
x_interp_transformed = pd.DataFrame(scaler.transform(x_interp[columns]), columns=columns)
results_df[f'ratio_{line_ratio_string}'] = 10 ** model.predict(x_interp_transformed)
for column in x_interp.columns:
if column.startswith('sim_'):
x_interp.drop(columns=column, inplace=True)
results_df = results_df.merge(x_interp, on=['px_index'], how='inner', validate='1:1')
return results_df[['nh2', 'tdust', 'avg_tdust', 'avg_nh2', 'px_index'] + ratio_string_list]
if __name__ == '__main__':
config = load_config_file(config_file_path=os.path.join('config', 'config.yml'))
external_input = load_config_file(config_file_path=os.path.join('config', 'ml_modelling.yml'))
logger = setup_logger(name='PRS - ML modelling')
try:
limit_rows = external_input['limit_rows']
except KeyError:
limit_rows = None
try:
use_model_for_training = external_input['use_model_for_training']
except KeyError:
use_model_for_training = None
_use_model_for_training = validate_parameter(
use_model_for_training,
default='constant_abundance_p15_q05'
)
_model_root_folder = os.path.join('prs', 'output', 'run_type', _use_model_for_training)
logger.info(f'Using {_use_model_for_training} for training in folder {_model_root_folder}')
tdust_grid = parse_grid_overrides(par_name='tdust', config=external_input)
nh2_grid = parse_grid_overrides(par_name='nh2', config=external_input)
inferred_data_list = []
for (tdust, nh2) in product(tdust_grid, nh2_grid):
logger.info(f'Producing emulated data for {nh2}, {tdust}')
if nh2 not in [1e3, 3e3, 9e3, 27e3, 81e3, 243e3, 729e3, 2187e3]:
inferred_data_list.append(
main(nh2=nh2,
tdust=tdust,
model_root_folder=_model_root_folder,
ratios_to_process=config['overrides']['lines_to_process'])
)
inferred_data = pd.concat(inferred_data_list, axis=0, ignore_index=True)
plot_results(inferred_data=inferred_data,
use_model_for_inference=_use_model_for_training,
ratios_to_process=config['overrides']['lines_to_process'])
inferred_data['avg_nh2'] = 10 ** inferred_data['avg_nh2']
inferred_data.to_csv(os.path.join(_model_root_folder, 'data', 'inferred_data.csv'))
logger.info('Completed emulation')
import matplotlib.pyplot as plt
import numpy as np
import os
import seaborn as sns
import pandas as pd
import sqlalchemy
import xarray as xr
from typing import Union, Tuple, List
from contextlib import closing
from itertools import product
from astropy.io import fits
from astropy import units as u
from sqlalchemy.orm import Session, aliased
from sqlalchemy import and_, or_, func, cast, Numeric
from sqlalchemy import engine as sqla_engine
from stg.stg_build_db_structure import (GridPars,
GridFiles,
StarsPars,
ModelPars,
RatioMaps,
MomentZeroMaps)
from assets.commons import (load_config_file,
setup_logger,
validate_parameter)
from assets.commons.parsing import parse_grid_overrides
from assets.commons.grid_utils import compute_los_average_weighted_profile
from assets.commons.db_utils import get_pg_engine
from assets.constants import line_ratio_mapping
plt.rcParams.update({'font.size': 20})
logger = setup_logger(name='PRS - INSPECT')
def get_clump_avg_density(density_at_ref_value: np.array,
avg_densities_dict: dict) -> np.array:
"""
Create the numpy array of the average densities over the entire clump for all the models in the grid.
:param density_at_ref_value: the array with the characteristic values of the density
:param avg_densities_dict: the dictionary of the clump densities, indexed by the characteristic density at reference value
:return: an array with the sorted average number densities
"""
_avg_densities = np.zeros(shape=len(density_at_ref_value))
for (idx, dens) in enumerate(density_at_ref_value):
_avg_densities[idx] = avg_densities_dict[dens]
return _avg_densities
def get_aggregated_ratio_from_db(
dust_temperature: float,
gas_density: float,
lines: Union[list, tuple],
session: Session,
run_id: str,
is_isothermal: bool = False,
is_homogeneous: bool = False) -> float:
"""
Get the aggregated ratio from the database, according to the aggregation function specified in the pre config file;
this function works for a homogeneous model. For more complex models the query must be revised.
:param dust_temperature: the dust temperature of the model
:param gas_density: the gas density of the model
:param lines: the lines used to compute the ratio
:param session: the SQLAlchemy session to use
:param run_id: the id of the run
:param is_isothermal: whether the model is isothermal
:param is_homogeneous: whether the model has a homogeneous density distribution
:return: the aggregated ratio
"""
_dust_temperature = np.round(dust_temperature, 2)
_gas_density = np.round(gas_density, 2)
density_column, dust_temperature_column = get_filter_columns(is_homogeneous=is_homogeneous,
is_isothermal=is_isothermal)
mom_zero_1, mom_zero_2 = aliased(MomentZeroMaps), aliased(MomentZeroMaps)
model_pars_1, model_pars_2 = aliased(ModelPars), aliased(ModelPars)
results = session.query(RatioMaps) \
.join(mom_zero_1,
and_(RatioMaps.mom_zero_name_1 == mom_zero_1.mom_zero_name,
RatioMaps.run_id == mom_zero_1.run_id)) \
.join(model_pars_1,
and_(mom_zero_1.fits_cube_name == model_pars_1.fits_cube_name,
mom_zero_1.run_id == model_pars_1.run_id)) \
.join(mom_zero_2,
and_(RatioMaps.mom_zero_name_2 == mom_zero_2.mom_zero_name,
RatioMaps.run_id == mom_zero_2.run_id)) \
.join(model_pars_2,
and_(mom_zero_2.fits_cube_name == model_pars_2.fits_cube_name,
mom_zero_2.run_id == model_pars_2.run_id)) \
.join(GridPars) \
.filter(
and_(func.round(cast(dust_temperature_column, Numeric), 2) == _dust_temperature,
func.round(cast(density_column, Numeric), 2) == _gas_density,
GridPars.run_id == run_id,
model_pars_1.iline == lines[0],
model_pars_2.iline == lines[1])).order_by(GridPars.created_on.desc()).first()
return results.aggregated_ratio
def get_filter_columns(is_homogeneous: bool,
is_isothermal: bool) -> Tuple[sqlalchemy.Column, sqlalchemy.Column]:
"""
Get the appropriate columns for filtering based on the input parameters.
:param is_homogeneous: Flag indicating if the grid is homogeneous or not.
:param is_isothermal: Flag indicating if the grid is isothermal or not.
:return: A tuple containing the appropriate density and dust temperature
columns for the given filter conditions.
"""
# Select correct column for filtering
if is_isothermal is True:
dust_temperature_column = GridPars.dust_temperature
else:
dust_temperature_column = GridPars.dust_temperature_at_reference
if is_homogeneous is True:
density_column = GridPars.central_density
else:
density_column = GridPars.density_at_reference
return density_column, dust_temperature_column
def get_results(
dust_temperature: float,
gas_density: float,
lines: Union[list, tuple],
run_id: str,
session: Session,
is_isothermal: bool = False,
is_homogeneous: bool = False) -> Tuple[float, str, str, float, str, str, tuple, np.array]:
"""
Get results from the database, given the parameters of the model
:param dust_temperature: the characteristic dust temperature
:param gas_density: the characteristic number density of molecular hydrogen
:param lines: the lines used to compute the ratio
:param run_id: the run_id of the model
:param session: a session to query the database
:param is_isothermal: whether the model is isothermal
:param is_homogeneous: whether the model has a homogeneous density distribution
:return: a tuple containing the aggregated line ratios, the moment zero maps names, the pixel size, the ratio map
name, the fits grid name (for the number density), the scaling factor between the grid and image pixel (must be
integer!), and the pixel-by-pixel measured line ratios
"""
density_column, dust_temperature_column = get_filter_columns(is_homogeneous=is_homogeneous,
is_isothermal=is_isothermal)
mom_zero_1, mom_zero_2 = aliased(MomentZeroMaps), aliased(MomentZeroMaps)
model_pars_1, model_pars_2 = aliased(ModelPars), aliased(ModelPars)
# Query database
results = session.query(GridPars, RatioMaps, GridFiles, model_pars_1) \
.join(mom_zero_1,
and_(RatioMaps.mom_zero_name_1 == mom_zero_1.mom_zero_name,
RatioMaps.run_id == mom_zero_1.run_id)) \
.join(model_pars_1,
and_(mom_zero_1.fits_cube_name == model_pars_1.fits_cube_name,
mom_zero_1.run_id == model_pars_1.run_id)) \
.join(mom_zero_2,
and_(RatioMaps.mom_zero_name_2 == mom_zero_2.mom_zero_name,
RatioMaps.run_id == mom_zero_2.run_id)) \
.join(model_pars_2,
and_(mom_zero_2.fits_cube_name == model_pars_2.fits_cube_name,
mom_zero_2.run_id == model_pars_2.run_id)) \
.join(GridPars) \
.join(GridFiles) \
.join(StarsPars, isouter=True) \
.filter(
and_(dust_temperature_column == dust_temperature,
density_column == gas_density,
GridFiles.quantity == 'gas_number_density',
GridPars.run_id == run_id),
model_pars_1.iline == lines[0],
model_pars_2.iline == lines[1]).order_by(GridPars.created_on.desc()).first()
with fits.open(os.path.join('prs', 'fits', 'ratios', results[1].ratio_map_name)) as ratio_fitsfile:
ratio_values = ratio_fitsfile[0].data
assert (results[3].npix % results[0].grid_shape_1 == 0) and (results[3].npix % results[0].grid_shape_2 == 0)
return results[1].aggregated_ratio, \
results[1].mom_zero_name_1, \
results[1].mom_zero_name_2, \
(results[0].grid_size_3 * u.pc).to(u.cm).value / results[0].grid_shape_3, \
results[1].ratio_map_name, \
results[2].fits_grid_name, \
(int(results[3].npix / results[0].grid_shape_1), int(results[3].npix / results[0].grid_shape_2)), \
ratio_values
def get_grid_values(
quantity_name: str,
dust_temperature: float,
gas_density: float,
run_id: str,
session: Session,
is_isothermal: bool = False,
is_homogeneous: bool = False) -> np.array:
"""
Retrieve the grid values, given the model parameters
:param quantity_name: the quantity to extract
:param dust_temperature: the characteristic dust temperature
:param gas_density: the characteristic number density of molecular hydrogen
:param run_id: the run_id to process
:param session: a session to query the database
:param is_isothermal: whether the model is isothermal
:param is_homogeneous: whether the model has a homogeneous density distribution
:return: the array of grid values
"""
density_column, dust_temperature_column = get_filter_columns(is_homogeneous=is_homogeneous,
is_isothermal=is_isothermal)
# Query database
results = session.query(GridPars, GridFiles) \
.join(GridFiles) \
.filter(
and_(dust_temperature_column == dust_temperature,
density_column == gas_density,
GridFiles.quantity == quantity_name,
GridPars.run_id == run_id)).order_by(GridPars.created_on.desc()).first()
with fits.open(os.path.join('prs', 'fits', 'grids', results[1].fits_grid_name)) as fitsfile:
data = fitsfile[0].data
return data
def get_column_density_vs_mom0(molecule_column_density_grid: np.array,
h2_volume_density_grid: np.array,
mom_zero_name_1: str,
mom_zero_name_2: str,
grid_pixel_size: float,
lines: Tuple[str, str],
gas_density: float,
dust_temperature: float,
zoom_ratios: Tuple[int, int]) -> pd.DataFrame:
"""
Get a DataFrame containing the column density of H2 and the moment zero.
:param molecule_column_density_grid: Array of column densities for the molecule of interest.
:param h2_volume_density_grid: Array of column densities for H2.
:param mom_zero_name_1: Name of the first moment file.
:param mom_zero_name_2: Name of the second moment file.
:param grid_pixel_size: Size of the grid pixels.
:param lines: Tuple of the names of the lines id for which to compute the moment zero.
:param gas_density: Gas density at the scaling radius.
:param dust_temperature: Dust temperature at the scaling radius.
:param zoom_ratios: Tuple of the horizontal and vertical zoom ratios.
:return: A DataFrame containing the column density of H2 and the molecule of interest as a function of mom0.
"""
h2_column_density_map = np.nansum(h2_volume_density_grid * grid_pixel_size, axis=2).flatten()
molecule_column_density_map = np.nansum(molecule_column_density_grid * grid_pixel_size, axis=2).flatten()
df_results = pd.DataFrame(data=h2_column_density_map, columns=['H2_column_density'])
df_results['molecule_column_density'] = molecule_column_density_map
df_results['nh2'] = gas_density
df_results['tdust'] = dust_temperature
with fits.open(os.path.join('prs', 'fits', 'moments', mom_zero_name_1)) as fitsfile:
df_results[f'mom_zero_{lines[0]}'] = fitsfile[0].data[::zoom_ratios[0],
::zoom_ratios[1]].byteswap().newbyteorder().flatten()
with fits.open(os.path.join('prs', 'fits', 'moments', mom_zero_name_2)) as fitsfile:
df_results[f'mom_zero_{lines[1]}'] = fitsfile[0].data[::zoom_ratios[0],
::zoom_ratios[1]].byteswap().newbyteorder().flatten()
return df_results[df_results['H2_column_density'] > 0]
def plot_coldens_vs_integrated_intensity(df_coldens_mom0_list: List[pd.DataFrame],
lines: Tuple[str, str],
run_type: str) -> pd.DataFrame:
"""
Plots the column densities vs integrated intensity for each line.
:param df_col_dens_list: List of DataFrames containing the column densities of interest.
:param lines: Tuple of the id of the lines.
:return: A DataFrame containing the column densities of interest vs. the moment zero.
"""
df_coldens_mom0 = pd.concat(df_coldens_mom0_list)
plt.xlabel('H_2 column density')
plt.ylabel('Moment 0')
plt.loglog()
plt.scatter(df_coldens_mom0['H2_column_density'], df_coldens_mom0[f'mom_zero_{lines[0]}'],
label=f'mom_zero_{lines[0]}')
plt.scatter(df_coldens_mom0['H2_column_density'], df_coldens_mom0[f'mom_zero_{lines[1]}'],
label=f'mom_zero_{lines[1]}')
plt.legend()
plt.savefig(
os.path.join(
'prs',
'output',
'run_type',
run_type,
'figures',
f'coldens_moments_lines_{"-".join(lines)}.png'
)
)
plt.clf()
return df_coldens_mom0
def plot_ratio_vs_density(lines: Tuple[str, str],
results: xr.DataArray,
avg_density: np.array,
run_type: str):
"""
Plots the line ratio vs. the average LOS volume density, and the integrated ratio as a function of the
model-specific density and temperature at the scaling radius.
:param lines: Tuple of the id of the lines.
:param results: DataArray containing the ratios and volume densities.
:param avg_density: numpy array with the overall average densities of the clumps.
:param run_type: string identification of the model type.
"""
plt.figure(figsize=(8, 6))
plt.semilogx()
plt.xlabel(r'<n(H$_2$) [cm$^{-3}$]>')
plt.ylabel('simulated ratio')
plt.title(f'Ratio {line_ratio_mapping["-".join(lines)]}')
plt.tight_layout()
plt.savefig(
os.path.join(
'prs',
'output',
'run_type',
run_type,
'figures',
f'ratio_vs_avg_density_los_{"-".join(lines)}.png'
)
)
plt.clf()
plt.figure(figsize=(8, 6))
results.plot(x='dust_temperature', y='gas_density', yscale='log', cbar_kwargs={'pad': 0.2})
plt.xlabel('$T_{dust}(r_0)$ [K]')
plt.ylabel('$n$(H$_2$, r$_0$) [cm$^{-3}$]')
# plt.title(f'Integrated line ratio ({"-".join(lines)}) grid', y=1.02)
ref_density_limits = plt.gca().get_ylim()
ref_density_values_limits = np.array([min(results['gas_density'].data), max(results['gas_density'].data)])
ax2 = plt.twinx()
# Aligning the secondary axis values with the pixel centres
ax2.set_ylim(np.array([ref_density_limits[0] / ref_density_values_limits[0] * min(avg_density),
ref_density_limits[1] / ref_density_values_limits[1] * max(avg_density)]))
ax2.semilogy()
ax2.set_ylabel(r'<n(H$_2$) [cm$^{-3}$]>')
plt.tight_layout()
plt.savefig(
os.path.join(
'prs',
'output',
'run_type',
run_type,
'figures',
f'ratio_grid_lines_{"-".join(lines)}.png'
)
)
plt.clf()
def main(run_id: str,
is_isothermal: bool,
is_homogeneous: bool,
molecule_used_to_weight_los_quantity: Union[str, None] = None,
engine: sqla_engine = None,
run_type: Union[str, None] = None):
if engine is None:
engine = get_pg_engine(logger=logger)
config = load_config_file(os.path.join('config', 'config.yml'))
config_stg = load_config_file(os.path.join('stg', 'config', 'config.yml'))
_run_type = validate_parameter(
run_type,
default='constant_abundance_p15_q05'
)
# grid definition
dust_temperatures = parse_grid_overrides(par_name='dust_temperature',
config=config)
central_densities = parse_grid_overrides(par_name='gas_density',
config=config)
line_pairs = config['overrides']['lines_to_process']
# If not specified, assumes that the first molecule should be used for weighting
_molecule_used_to_weight_los_quantity = validate_parameter(molecule_used_to_weight_los_quantity,
default=config_stg['lines']['species_to_include'][0])
results = xr.DataArray(np.empty(shape=[len(dust_temperatures), len(central_densities)]),
dims=('dust_temperature', 'gas_density'),
coords={
'dust_temperature': dust_temperatures,
'gas_density': central_densities
})
results_dict = {}
avg_densities_dict = {}
with closing(Session(engine)) as session:
for lines in line_pairs:
df_coldens_mom0_list = []
results_dict[f'{"-".join(lines)}'] = {}
for (tdust, nh2) in product(dust_temperatures, central_densities):
aggregated_ratio = get_aggregated_ratio_from_db(dust_temperature=tdust,
gas_density=nh2,
lines=lines,
session=session,
run_id=run_id,
is_isothermal=is_isothermal,
is_homogeneous=is_homogeneous)
results.loc[tdust, nh2] = aggregated_ratio
logger.debug(f'The aggregated ratio for lines {lines}, using {nh2}, {tdust} is: {aggregated_ratio}')
_, \
mom0_name_1, \
mom0_name_2, \
grid_pixel_size, \
_, _, \
zoom_ratios, \
ratio_values = \
get_results(
dust_temperature=tdust,
gas_density=nh2,
lines=lines,
session=session,
run_id=run_id,
is_isothermal=is_isothermal,
is_homogeneous=is_homogeneous
)
density_grid = get_grid_values(
quantity_name='gas_number_density',
dust_temperature=tdust,
gas_density=nh2,
session=session,
run_id=run_id,
is_isothermal=is_isothermal,
is_homogeneous=is_homogeneous
)
temperature_grid = get_grid_values(
quantity_name='dust_temperature',
dust_temperature=tdust,
gas_density=nh2,
session=session,
run_id=run_id,
is_isothermal=is_isothermal,
is_homogeneous=is_homogeneous
)
molecule_grid = get_grid_values(
quantity_name=_molecule_used_to_weight_los_quantity,
dust_temperature=tdust,
gas_density=nh2,
session=session,
run_id=run_id,
is_isothermal=is_isothermal,
is_homogeneous=is_homogeneous
)
avg_density_map = compute_los_average_weighted_profile(profile=density_grid, weights=molecule_grid)
avg_density = np.nansum(np.where(density_grid == 0, np.nan, density_grid * molecule_grid)) / \
np.nansum(np.where(density_grid == 0, np.nan, molecule_grid))
avg_densities_dict[nh2] = avg_density
std_density_map = np.nanstd(np.where(density_grid == 0, np.nan, molecule_grid), axis=2)
avg_temperature_map = compute_los_average_weighted_profile(profile=temperature_grid,
weights=molecule_grid)
correlation_data = np.array([avg_density_map.flatten(),
avg_temperature_map.flatten(),
std_density_map.flatten(),
ratio_values[::zoom_ratios[0], ::zoom_ratios[1]].flatten()])
results_dict[f'{"-".join(lines)}'][f'{"_".join([str(tdust), str(nh2)])}'] = correlation_data
plt.scatter(correlation_data[0], correlation_data[3])
df_coldens_mom0_list.append(get_column_density_vs_mom0(molecule_column_density_grid=molecule_grid,
h2_volume_density_grid=density_grid,
mom_zero_name_1=mom0_name_1,
mom_zero_name_2=mom0_name_2,
lines=lines,
gas_density=nh2,
dust_temperature=tdust,
zoom_ratios=zoom_ratios,
grid_pixel_size=grid_pixel_size))
avg_densities = get_clump_avg_density(density_at_ref_value=results['gas_density'].data,
avg_densities_dict=avg_densities_dict)
plot_ratio_vs_density(lines=lines,
results=results,
avg_density=avg_densities,
run_type=_run_type)
df_coldens_mom0 = plot_coldens_vs_integrated_intensity(df_coldens_mom0_list=df_coldens_mom0_list,
lines=lines,
run_type=_run_type).reset_index(). \
rename(columns={'index': 'px_index'})
df_coldens_mom0.to_csv(
os.path.join(
'prs',
'output',
'run_type',
_run_type,
'data',
f'full_dataset_NH2_lines_{"-".join(lines)}.csv'
)
)
df_list_concat = []
for (tdust, nh2) in product(dust_temperatures, central_densities):
df_list = []
for lines in line_pairs:
df = pd.DataFrame(
data=results_dict[f'{"-".join(lines)}'][f'{"_".join([str(tdust), str(nh2)])}'].transpose(),
columns=['avg_nh2', 'avg_tdust', 'std_nh2', f'ratio_{"-".join(lines)}'])
df_list.append(df.dropna())
df_tmp = pd.concat(df_list, axis=1)
df_tmp = df_tmp.loc[:, ~df_tmp.columns.duplicated()].copy()
df_tmp['nh2'] = nh2
df_tmp['tdust'] = tdust
df_list_concat.append(df_tmp)
df_all = pd.concat(df_list_concat).reset_index(). \
rename(columns={'index': 'px_index'})
for lines in line_pairs:
g = sns.jointplot(x=np.log10(df_all['avg_nh2']), y=df_all[f'ratio_{"-".join(lines)}'],
kind='kde', joint_kws={'bw_adjust': 2})
g.plot_joint(sns.scatterplot, s=100, alpha=.5, marker='+', color='orange')
plt.savefig(
os.path.join(
'prs',
'output',
'run_type',
_run_type,
'figures',
f'ratio_vs_avg_density_los_kde_{"-".join(lines)}.png'
)
)
plt.clf()
df_all.to_csv(
os.path.join(
'prs',
'output',
'run_type',
_run_type,
'data',
'full_dataset.csv'
)
)
if __name__ == '__main__':
main(run_id='7dd5b365-875e-4857-ae11-2707820a33c1', is_isothermal=True, is_homogeneous=True, run_type='uniform')
import os
import numpy as np
import argparse
from assets.commons import (load_config_file,
validate_parameter,
setup_logger)
from assets.commons.training_utils import train_models_wrapper
if __name__ == '__main__':
external_input = load_config_file(config_file_path='config/ml_modelling.yml')
logger = setup_logger(name='PRS - ML training')
parser = argparse.ArgumentParser()
parser.add_argument('--line', default=None)
parser.add_argument('--distributed', default='true')
args = parser.parse_args()
try:
limit_rows = external_input['limit_rows']
except KeyError:
limit_rows = None
try:
use_model_for_training = external_input['use_model_for_training']
except KeyError:
use_model_for_training = None
_distributed = validate_parameter(args.distributed, default='false').lower() == 'true'
_use_model_for_training = validate_parameter(
use_model_for_training,
default='constant_abundance_p15_q05'
)
_model_root_folder = os.path.join('prs', 'output', 'run_type', _use_model_for_training)
logger.info(f'Using {_use_model_for_training} for training in folder {_model_root_folder}')
if external_input['retrain'] is True:
if external_input['model_type'] == 'XGBoost_gridsearch':
_model_kwargs = external_input['model_parameters']['param_grid'].copy()
for par in _model_kwargs:
_model_kwargs[par] = np.arange(_model_kwargs[par][0], _model_kwargs[par][1], _model_kwargs[par][2])
external_input['model_parameters']['param_grid'] = _model_kwargs
if _distributed is True:
_line = validate_parameter(args.line, default='86')
if args.line is None:
logger.warning(f'No line ID specified, proceeding with {_line}')
train_models_wrapper(target_name=f'mom_zero_{_line}',
model_type=external_input['model_type'],
model_kwargs=external_input['model_parameters'],
limit_rows=limit_rows,
model_root_folder=_model_root_folder,
use_validation=external_input['use_validation'],
logger=logger)
else:
lines_to_process = (args.line,) if (args.line is not None) \
else ('86', '87', '88', '256', '257', '381', '380')
for line in lines_to_process:
train_models_wrapper(target_name=f'mom_zero_{line}',
model_type=external_input['model_type'],
model_kwargs=external_input['model_parameters'],
limit_rows=limit_rows,
model_root_folder=_model_root_folder,
use_validation=external_input['use_validation'],
logger=logger)
logger.info(f'Completed training for line {args.line}')
import os
import numpy as np
import argparse
from assets.commons import (load_config_file,
validate_parameter,
setup_logger)
from assets.commons.training_utils import train_models_wrapper
if __name__ == '__main__':
external_input = load_config_file(config_file_path=os.path.join('config', 'ml_modelling.yml'))
logger = setup_logger(name='PRS - ML training')
parser = argparse.ArgumentParser()
parser.add_argument('--line_ratio', default=None)
parser.add_argument('--distributed', default='true')
args = parser.parse_args()
try:
limit_rows = external_input['limit_rows']
except KeyError:
limit_rows = None
try:
use_model_for_training = external_input['use_model_for_training']
except KeyError:
use_model_for_training = None
_distributed = validate_parameter(args.distributed, default='false').lower() == 'true'
_use_model_for_training = validate_parameter(
use_model_for_training,
default='constant_abundance_p15_q05'
)
_model_root_folder = os.path.join('prs', 'output', 'run_type', _use_model_for_training)
logger.info(f'Using {_use_model_for_training} for training in folder {_model_root_folder}')
if external_input['retrain'] is True:
if external_input['model_type'] == 'XGBoost_gridsearch':
_model_kwargs = external_input['model_parameters']['param_grid'].copy()
for par in _model_kwargs:
_model_kwargs[par] = np.arange(_model_kwargs[par][0], _model_kwargs[par][1], _model_kwargs[par][2])
external_input['model_parameters']['param_grid'] = _model_kwargs
if _distributed is True:
_line_ratio = validate_parameter(args.line_ratio, default='87-86')
if args.line_ratio is None:
logger.warning(f'No line ratio specified, proceeding with {_line_ratio}')
train_models_wrapper(target_name=f'ratio_{_line_ratio}',
model_type=external_input['model_type'],
model_kwargs=external_input['model_parameters'],
limit_rows=limit_rows,
model_root_folder=_model_root_folder,
use_validation=external_input['use_validation'],
logger=logger)
else:
ratios_to_process = (args.line_ratio,) if (args.line_ratio is not None) \
else (('86', '87'), ('88', '87'), ('88', '86'), ('256', '257'), ('381', '380'))
for ratio in ratios_to_process:
train_models_wrapper(target_name=f'ratio_{ratio}',
model_type=external_input['model_type'],
model_kwargs=external_input['model_parameters'],
limit_rows=limit_rows,
model_root_folder=_model_root_folder,
use_validation=external_input['use_validation'],
logger=logger)
logger.info(f'Completed training for line ratio {args.line_ratio}')
import os
import pandas as pd
import seaborn as sns
import pickle
import numpy as np
import matplotlib.pyplot as plt
from astropy.constants import G, m_p
import astropy.units as u
from assets.utils import get_poc_results
plt.rcParams.update({'font.size': 16})
def make_ecdf_figure(df_full: pd.DataFrame):
sns.kdeplot(data=df_full, x='best_fit', hue='Class')
plt.clf()
plt.figure(figsize=(8, 6))
sns.ecdfplot(data=df_full, x='best_fit', hue='Class', hue_order=['Quiescent', 'Protostellar', 'MYSOs', 'HII'])
plt.semilogx()
plt.xlim([1e5, 5e6])
plt.xlabel('<$n$(H$_{2}$)> [cm$^{-3}$]')
plt.ylabel('Cumulative probability')
plt.title('Volume density ECDF per class')
plt.tight_layout()
plt.savefig(os.path.join('prs', 'output', 'poc_figures', 'best_fit_nh2.png'))
def make_violin_figure(df_full: pd.DataFrame, nsamples: int = 10000):
with open(os.path.join('prs', 'output', 'run_type', 'constant_abundance_p15_q05', 'posteriors.pickle'),
'rb') as infile:
posteriors = pickle.load(infile)
df_list = []
for source in posteriors['PDF']:
tmp_df = pd.DataFrame(columns=['source_name', 'Class', 'density'])
try:
class_phys = df_full[df_full['source_name'] == source]['Class'].iloc[0]
samples = sampler(ecdf=posteriors['ECDF'][source], density_grid=posteriors['density_grid'][source],
nsamples=nsamples)
tmp_df['density'] = samples
tmp_df['source_name'] = source
tmp_df['Class'] = class_phys
df_list.append(tmp_df)
except IndexError:
pass
plt.clf()
plt.figure(figsize=(8, 6))
df_generated_densities = pd.concat(df_list)
sns.violinplot(df_generated_densities,
x='Class',
y='density',
order=['Quiescent', 'Protostellar', 'MYSOs', 'HII'],
bw=0.2)
plt.semilogy()
plt.ylim([5e4, 5e6])
plt.ylabel('<$n$(H$_{2}$)> [cm$^{-3}$]')
plt.title('Bootstrapped volume density distributions')
plt.tight_layout()
plt.savefig(os.path.join('prs', 'output', 'poc_figures', 'nh2_violin.png'))
def sampler(ecdf, density_grid, nsamples):
generated_densities = np.zeros(shape=nsamples)
for idx in range(nsamples):
a = np.random.default_rng().uniform(0, 1)
generated_index = np.argmax(ecdf >= a) - 1
# In principle this can be interpolated for a smoother density distribution
generated_densities[idx] = density_grid[generated_index]
return generated_densities
def make_volume_densities_comparison_figure(df_full: pd.DataFrame):
df_volume_density_sed = pd.read_csv(os.path.join('assets', 'data', 'top100_density.csv'),
skiprows=4)
df_merge = df_full.merge(df_volume_density_sed, left_on='AGAL_name', right_on='agal_name')
hpd = df_full['hpd_interval'].values
filtered_intervals = []
for intervals in hpd:
filtered_intervals.append(intervals[-2:])
hpd_array = np.array(filtered_intervals)
uncertainty_array = np.abs(hpd_array - np.array(df_full['best_fit'])[:, np.newaxis]).T
plt.clf()
plt.figure(figsize=(8, 6))
plt.scatter(df_merge['volume_density'],
df_merge['best_fit'],)
plt.errorbar(df_merge['volume_density'],
df_merge['best_fit'],
yerr=uncertainty_array,
fmt='none')
plt.plot([2e4, 8e5], [2e4, 8e6], color='red')
plt.loglog()
plt.xlabel('<$n$(H$_{2, dust}$)> [cm$^{-3}$]')
plt.ylabel('<$n$(H$_{2, SAK}$)> [cm$^{-3}$]')
plt.title('Comparison of the inferred volume density')
plt.tight_layout()
plt.savefig(os.path.join('prs', 'output', 'poc_figures', 'volume_density_comparison.png'))
def main():
df_full = get_poc_results(line_fit_filename='ch3oh_data_top35.csv')
df_full.rename(columns={'class_phys': 'Class'}, inplace=True)
df_full = df_full[(df_full['mass'] < 10000) & (df_full['mass'] > 300)]
make_ecdf_figure(df_full=df_full)
make_violin_figure(df_full=df_full)
make_volume_densities_comparison_figure(df_full=df_full)
if __name__ == '__main__':
main()
import os
import pandas as pd
import numpy as np
from assets.utils import get_poc_results
from collections import OrderedDict
def divide_chunks(l, n):
for i in range(0, len(l), n):
yield list(l[i:i + n])
def mask_non_detections(data: pd.DataFrame,
transition_id: str) -> pd.DataFrame:
_data = data.copy()
_data.loc[_data[f'tpeak_{transition_id}'] < 2 * _data[f'rms_noise_{transition_id}'],
f'tpeak_{transition_id}'] = -1
return _data
df_full = get_poc_results(line_fit_filename='ch3oh_data_top35.csv')
# Make POC LaTex table
pi230_beam_eff = 0.7
flash_beam_eff = 0.73
df_table = df_full.copy()
df_table[['tpeak_256', 'tpeak_257', 'rms_noise_256', 'rms_noise_257']] /= pi230_beam_eff
df_table[['area_256', 'area_257', 'area_unc_256', 'area_unc_257']] /= pi230_beam_eff
df_table[['tpeak_380', 'tpeak_381', 'rms_noise_380', 'rms_noise_381']] /= flash_beam_eff
df_table[['area_380', 'area_381', 'area_unc_380', 'area_unc_381']] /= flash_beam_eff
for transition_id in ['86', '87', '88', '256', '257', '380', '381']:
df_table = mask_non_detections(data=df_table, transition_id=transition_id)
df_table['best_fit'] /= 1e5
df_table['best_fit'] = df_table['best_fit'].round(1)
df_table['hpd_interval'] = df_table['hpd_interval'].apply(lambda row: (np.array(row) / 1e5).round(1))
df_table['hpd_interval'] = df_table['hpd_interval'].apply(lambda row: list(divide_chunks(row, 2)))
df_table['mass'] = (df_table['mass'] / 100).round(1).astype(str)
df_table['distance'] = (df_table['distance']).round(1).astype(str)
density_format = '$[10^{5}\,\mathrm{cm^{-3}}]$'
fwhm_units = '[km s$^{-1}$]'
df_fit_results = df_table[['AGAL_name', 'class_phys', 'best_fit', 'hpd_interval','mass', 'distance']].copy()
df_source_properties = df_table[['AGAL_name', 'tpeak_86', 'tpeak_87', 'tpeak_88', 'tpeak_256', 'tpeak_257', 'tpeak_380',
'tpeak_381', 'rms_noise_86', 'rms_noise_87', 'rms_noise_88', 'rms_noise_256',
'rms_noise_257', 'rms_noise_380', 'rms_noise_381', 'linewidth_86', 'linewidth_87', 'linewidth_88', 'linewidth_256', 'linewidth_257', 'linewidth_380',
'linewidth_381', 'linewidth_unc_86', 'linewidth_unc_87', 'linewidth_unc_88', 'linewidth_unc_256',
'linewidth_unc_257', 'linewidth_unc_380', 'linewidth_unc_381']].copy()
for transition_id in ['86', '87', '88', '256', '257', '380', '381']:
df_source_properties[f'tpeak_{transition_id}'] = '$' + df_source_properties[f'tpeak_{transition_id}'].round(2).astype(str) + ' \pm ' + df_table[f'rms_noise_{transition_id}'].round(2).astype(str) + '$'
df_source_properties[f'linewidth_{transition_id}'] = '$' + df_source_properties[f'linewidth_{transition_id}'].round(2).astype(str) + ' \pm ' + df_table[f'linewidth_unc_{transition_id}'].round(2).astype(str) + '$'
df_source_properties.loc[df_source_properties[f'tpeak_{transition_id}'].str.startswith('$-1.0'), [f'tpeak_{transition_id}', f'linewidth_{transition_id}']] = '-1.00'
df_source_properties.loc[df_source_properties[f'tpeak_{transition_id}'].str.startswith('$nan'), [f'tpeak_{transition_id}', f'linewidth_{transition_id}']] = 'N/A'
df_source_properties.drop(columns=[f'rms_noise_{transition_id}', f'linewidth_unc_{transition_id}'], inplace=True)
remap_columns = OrderedDict([
('AGAL_name', ('Source', '')),
('class_phys', ('Classification', '')),
('best_fit', ('best fit($n\mathrm{H_2}$)', density_format)),
('hpd_interval', ('67\% HPD', density_format)),
('mass', ('Mass', '$[10^2 \mathrm{M_\odot}]$')),
('distance', ('Distance', '[kpc]')),
('tpeak_86', ('$T_{MB, 86}$', '[K]')),
('linewidth_86', ('$FWHM_{(2_K-1_K)}$', fwhm_units)),
('tpeak_87', ('$T_{MB, 87}$', '[K]')),
('linewidth_87', ('$FWHM_{87}$', '[km s$^{-1}]$')),
('tpeak_88', ('$T_{MB, 88}$', '[K]')),
('linewidth_88', ('$FWHM_{88}$', '[km s$^{-1}]$')),
# ('rms_noise_86', ('$\sigma_{96.7GHz}$', '[K]')),
('tpeak_256', ('$T_{MB, 256}$', '[K]')),
('linewidth_256', ('$FWHM_{256}$', fwhm_units)),
('tpeak_257', ('$T_{MB, 257}$', '[K]')),
('linewidth_257', ('$FWHM_{(5_K-4_K)}$', fwhm_units)),
# ('rms_noise_256', ('$\sigma_{241.7GHz}$', '[K]')),
('tpeak_380', ('$T_{MB, 380}$', '[K]')),
('linewidth_380', ('$FWHM_{380}$', fwhm_units)),
('tpeak_381', ('$T_{MB, 381}$', '[K]')),
('linewidth_381', ('$FWHM_{(7_K-6_K)}$', fwhm_units)),
# ('rms_noise_380', ('$\sigma_{338.3GHz}$', '[K]')),
])
df_table = df_table[remap_columns.keys()]
df_table.rename(columns=remap_columns, inplace=True)
df_table.columns = pd.MultiIndex.from_tuples(df_table.columns)
df_source_properties.rename(columns=remap_columns, inplace=True)
df_source_properties.columns = pd.MultiIndex.from_tuples(df_source_properties.columns)
df_fit_results.rename(columns=remap_columns, inplace=True)
df_fit_results.columns = pd.MultiIndex.from_tuples(df_fit_results.columns)
caption = 'Sources included in the proof-of-concept, classification, distance, mass, and number density results. We indicate the highest-probability density interval containing the 67\% of the probability mass as HPD 67\%.'
label = 'tab:poc_results'
latex_table = (df_fit_results.style.hide(axis='index').format({
('best fit($n\mathrm{H_2}$)', '$[10^{5}\,\mathrm{cm^{-3}}]$'): '{:,.1f}'})
.to_latex(caption=caption, label=label, hrules=True, environment='table*', position_float='centering'))
latex_table = latex_table.replace('[[', '[').replace(']]', ']')
with open(os.path.join('prs', 'output', 'poc_tables', 'fit_results.tex'), 'w') as outfile:
outfile.write(latex_table)
table_cols = [
[('Source', ''), ('$T_{MB, 86}$', '[K]'), ('$T_{MB, 87}$', '[K]'), ('$T_{MB, 88}$', '[K]'), ('$FWHM_{(2_K-1_K)}$',
fwhm_units)],
[('Source', ''), ('$T_{MB, 256}$', '[K]'), ('$T_{MB, 257}$', '[K]'), ('$FWHM_{(5_K-4_K)}$', fwhm_units), ('$T_{MB, 380}$', '[K]'), ('$T_{MB, 381}$', '[K]'), ('$FWHM_{(7_K-6_K)}$',
fwhm_units), ]
]
captions = [
'Line properties of the lines in the $(2_K-1_K)$ methanol band. Only one FWHM is listed because the fit is performed forcing all lines to have the same width.',
'Line properties of the lines in the $(5_K-4_K)$ and $(7_K-6_K)$ methanol bands. Only one FWHM is listed per band because the fit is performed forcing all lines to have the same width.',
]
labels = [
'tab:poc_lines_3mm',
'tab:poc_lines_hf',
]
outfiles = [
'line_table_3mm.tex',
'line_table_hf.tex',
]
for cols, caption, label, filename in zip(table_cols, captions, labels, outfiles):
latex_table = (df_source_properties[cols].style.hide(axis='index')
.to_latex(caption=caption, label=label, hrules=True, environment='table*', position_float='centering'))
latex_table = latex_table.replace('[[', '[').replace(']]', ']')
latex_table = latex_table.replace('-1.00', '\dots').replace('-1.00', '\dots')
with open(os.path.join('prs', 'output', 'poc_tables', filename), 'w') as outfile:
outfile.write(latex_table)
numpy==1.23.4
PyYAML==6.0
astropy==5.1
matplotlib==3.6.2
scipy==1.9.3
tqdm==4.64.1
xarray==2022.10.0
sqlalchemy==1.4.42
psycopg2-binary==2.9.5
seaborn==0.12.2
scikit-learn==1.3.0
xgboost==2.0.0
astropy==5.1
keras==2.12.0
\ No newline at end of file
#!/bin/bash
read -p "This will delete all of the results, are you sure? " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]
then
folder_list=("prs/fits/cubes" "prs/fits/moments" "prs/fits/ratios" "prs/fits/grids")
for folder_name in ${folder_list[@]}
do
rm $folder_name/*.fits
done
rm prs/output/*.png
rm stg/archive/*.zip
rm full_dataset*.csv
fi
grid:
grid_type: regular
coordinate_system: cartesian
central_density: 1e8
density_unit: "cm^-3"
density_powerlaw_idx: -1.5
density_at_reference: 1e6
maximum_radius: 0.9
maximum_radius_unit: 'pc'
distance_reference: 0.5
distance_reference_unit: 'pc'
dust_temperature: 2000
dust_temperature_at_reference: 30
dust_temperature_unit: 'K'
dust_temperature_powerlaw_idx: -0.5
microturbulence: 1.5
microturbulence_unit: 'km/s'
dim1: {"size": 2, "size_units": "pc", "shape": 101, "refpix": 50}
velocity_field: 'solid'
velocity_gradient: 2
velocity_gradient_unit: "km/(s pc)"
#stars:
# nstars: 1
# rstars: [6.96e10, ]
# mstars: [1.99e36, ]
# star_positions: [[0, 0, 0]]
# star_fluxes: [[-32000,], ]
# nlambdas: 300
# spacing: 'log'
# lambdas_micron_limits: [0.001, 1300]
lines:
species_to_include: ['e-ch3oh']
molecular_abundances: {
"e-ch3oh": 1e-9,
"p-h2": 1,
}
hot_core_specs: {
"e-ch3oh": {
"threshold": 90,
"abundance_jump": 1
}
}
lines_mode: 'lvg'
collision_partners: ['p-h2']
import logging
import os
import time
from sqlalchemy import (Column,
ForeignKey,
Integer,
Sequence,
String,
Float,
DateTime,
Boolean,
ARRAY,
ForeignKeyConstraint)
from assets.commons import (setup_logger)
from assets.commons.db_utils import get_pg_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.exc import OperationalError
logger = setup_logger(name='DB_SETUP')
Base = declarative_base()
class GridFiles(Base):
__tablename__ = "grid_files"
__table_args__ = (
ForeignKeyConstraint(
('zipped_grid_name', 'run_id'),
['grid_parameters.zipped_grid_name', 'grid_parameters.run_id']
),
)
zipped_grid_name = Column(String(150), primary_key=True)
quantity = Column(String(30), primary_key=True)
fits_grid_name = Column(String)
created_on = Column(DateTime)
run_id = Column(String, primary_key=True)
class GridPars(Base):
__tablename__ = "grid_parameters"
zipped_grid_name = Column(String(150), primary_key=True)
species_and_partners = relationship("SpeciesAndPartners", cascade="all, delete-orphan")
fits_cube_name = relationship("ModelPars", cascade="all, delete-orphan")
line_pars = relationship("LinePars", cascade="all, delete-orphan")
stars_pars = relationship("StarsPars", cascade="all, delete-orphan")
grid_type = Column(String)
coordinate_system = Column(String)
central_density = Column(Float)
density_powerlaw_index = Column(Float)
density_at_reference = Column(Float)
dust_temperature = Column(Float)
dust_temperature_powerlaw_index = Column(Float)
dust_temperature_at_reference = Column(Float)
microturbulence = Column(Float)
velocity_field = Column(String)
velocity_gradient = Column(String)
velocity_powerlaw_index = Column(Float)
velocity_at_reference = Column(Float)
distance_reference = Column(Float)
maximum_radius = Column(Float)
grid_size_1 = Column(Float)
grid_shape_1 = Column(Float)
grid_refpix_1 = Column(Float)
grid_size_2 = Column(Float)
grid_shape_2 = Column(Float)
grid_refpix_2 = Column(Float)
grid_size_3 = Column(Float)
grid_shape_3 = Column(Float)
grid_refpix_3 = Column(Float)
created_on = Column(DateTime)
run_id = Column(String, primary_key=True)
class StarsPars(Base):
__tablename__ = "stars_parameters"
__table_args__ = (
ForeignKeyConstraint(
('zipped_grid_name', 'run_id'),
['grid_parameters.zipped_grid_name', 'grid_parameters.run_id']
),
)
zipped_grid_name = Column(String(150), primary_key=True)
nstars = Column(Integer)
rstars = Column(ARRAY(Float))
mstars = Column(ARRAY(Float))
star_positions = Column(ARRAY(Float))
star_fluxes = Column(ARRAY(Float))
nlambdas = Column(Integer)
spacing = Column(String)
lambdas_micron_limits = Column(ARRAY(Float))
created_on = Column(DateTime)
run_id = Column(String, primary_key=True)
class LinePars(Base):
__tablename__ = "lines_parameters"
__table_args__ = (
ForeignKeyConstraint(
('zipped_grid_name', 'run_id'),
['grid_parameters.zipped_grid_name', 'grid_parameters.run_id']
),
)
zipped_grid_name = Column(String(150), primary_key=True)
lines_mode = Column(String(20))
created_on = Column(DateTime)
run_id = Column(String, primary_key=True)
class SpeciesAndPartners(Base):
__tablename__ = "species_and_partners"
__table_args__ = (
ForeignKeyConstraint(
('zipped_grid_name', 'run_id'),
['grid_parameters.zipped_grid_name', 'grid_parameters.run_id']
),
)
zipped_grid_name = Column(String(150), primary_key=True)
species_to_include = Column(String(100), primary_key=True)
molecular_abundance = Column(Float)
threshold = Column(Float)
abundance_jump = Column(Float)
collision_partner = Column(String(100), primary_key=True)
molecular_abundance_collision_partner = Column(Float)
created_on = Column(DateTime)
run_id = Column(String, primary_key=True)
class ModelPars(Base):
__tablename__ = "model_parameters"
__table_args__ = (
ForeignKeyConstraint(
('zipped_grid_name', 'run_id'),
['grid_parameters.zipped_grid_name', 'grid_parameters.run_id']
),
)
zipped_grid_name = Column(String(150), nullable=False)
fits_cube_name = Column(String(150), primary_key=True)
mom_zero_name = relationship("MomentZeroMaps", cascade="all, delete-orphan")
nphotons = Column(Float)
scattering_mode_max = Column(Integer)
iranfreqmode = Column(Integer)
tgas_eq_tdust = Column(Integer)
inclination = Column(Float)
position_angle = Column(Float)
imolspec = Column(Integer)
iline = Column(Integer)
width_kms = Column(Float)
nchannels = Column(Integer)
npix = Column(Integer)
created_on = Column(DateTime)
run_id = Column(String, primary_key=True)
class MomentZeroMaps(Base):
__tablename__ = "moment_zero_maps"
__table_args__ = (
ForeignKeyConstraint(
('fits_cube_name', 'run_id'),
['model_parameters.fits_cube_name', 'model_parameters.run_id']
),
)
mom_zero_name = Column(String(150), primary_key=True)
fits_cube_name = Column(String(150), nullable=False)
integration_limit_low = Column(Float)
integration_limit_high = Column(Float)
aggregated_moment_zero = Column(Float)
aggregation_function = Column(String(20))
created_on = Column(DateTime)
run_id = Column(String, primary_key=True)
class RatioMaps(Base):
__tablename__ = "ratio_maps"
__table_args__ = (
ForeignKeyConstraint(
('mom_zero_name_1', 'run_id'),
['moment_zero_maps.mom_zero_name', 'moment_zero_maps.run_id'],
),
ForeignKeyConstraint(
('mom_zero_name_2', 'run_id'),
['moment_zero_maps.mom_zero_name', 'moment_zero_maps.run_id'],
),
)
ratio_map_name = Column(String(150), primary_key=True)
mom_zero_name_1 = Column(String(150), nullable=False)
mom_zero_name_2 = Column(String(150), nullable=False)
aggregated_ratio = Column(Float)
aggregation_function = Column(String(20))
created_on = Column(DateTime)
run_id = Column(String, primary_key=True)
mom_zero_map_1 = relationship("MomentZeroMaps", foreign_keys=[mom_zero_name_1, run_id])
mom_zero_map_2 = relationship("MomentZeroMaps", foreign_keys=[mom_zero_name_2, run_id])
class TmpExecutionQueue(Base):
__tablename__ = "tmp_execution_queue"
row_id = Column(Integer, Sequence('row_id_seq'))
run_id = Column(String, primary_key=True)
dust_temperature = Column(Float, primary_key=True)
density = Column(Float, primary_key=True)
line = Column(Integer, primary_key=True)
density_keyword = Column(String, primary_key=True)
dust_temperature_keyword = Column(String, primary_key=True)
fits_cube_name = Column(String)
done = Column(Boolean)
def init_db():
engine = get_pg_engine(logger=logger)
try:
Base.metadata.create_all(bind=engine)
except OperationalError:
logger.error('Connection failed. Sleeping and retrying')
time.sleep(3)
Base.metadata.create_all(bind=engine)
logger.info('Connection successful! DB initialized as needed.')
engine.dispose()
if __name__ == '__main__':
init_db()
import logging
import uuid
import numpy as np
import os
import shutil
from datetime import datetime
from typing import Union, List, Tuple
from sqlalchemy import engine as sqla_engine
from assets.commons import (load_config_file,
get_moldata,
compute_unique_hash_filename,
make_archive,
validate_parameter,
setup_logger,
cleanup_directory,
get_value_if_specified)
from assets.commons.grid_utils import (compute_molecular_number_density_hot_core,
compute_power_law_radial_profile,
extract_grid_metadata)
from assets.commons.parsing import read_abundance_variation_schema
from assets.commons.db_utils import upsert, get_pg_engine
from assets.constants import (mean_molecular_mass,
radmc_input_headers,
radmc_lines_mode_mapping)
from stg.stg_build_db_structure import GridPars, GridFiles, StarsPars
from astropy import units as u
from astropy import constants as cst
from astropy.io import fits
logger = setup_logger(name='STG')
def write_radmc_input(filename: str,
quantity: np.array,
grid_metadata: dict,
path: Union[None, str] = None,
override_defaults: Union[None, dict] = None,
flatten_style: Union[None, str] = None):
"""
Write RADMC-3D input files with the specified grid metadata and quantities.
:param filename: The name of the file to write.
:param quantity: The array of quantities to be written to the file.
:param grid_metadata: A dictionary containing metadata about the grid.
:param path: Optional. The directory path where the file will be saved. Defaults to the current directory if None.
:param override_defaults: Optional. A dictionary to override default header values. Defaults to None.
:param flatten_style: Optional. The style to flatten the array (Fortran 'F' or C 'C' order). Defaults to 'F'.
:return: None. Writes the formatted data to the specified file.
"""
rt_metadata_default = {
'iformat': 1,
'grid_type': grid_metadata['grid_type'],
'coordinate_system': grid_metadata['coordinate_system'],
'grid_info': 0, ##
'grid_shape': grid_metadata['grid_shape'],
'ncells': np.prod(grid_metadata['grid_shape']),
'ncells_per_axis': ' '.join([str(axis_size) for axis_size in grid_metadata['grid_shape']]),
'dust_species': 1, ##
'active_axes': ' '.join(['1'] * len(grid_metadata['grid_shape'])),
'continuum_lambdas': grid_metadata['continuum_lambdas'], ##
}
_path = validate_parameter(path, default='.')
_filename = 'numberdens_mol.inp' if filename.startswith('numberdens_') else filename
_flatten_style = validate_parameter(flatten_style, default='F')
with open(os.path.join(_path, filename), "w") as outfile:
for key in radmc_input_headers[_filename]:
try:
header_line = override_defaults[key]
except (KeyError, TypeError):
header_line = rt_metadata_default[key]
outfile.write(f'{header_line}\n')
outfile.write('\n'.join(quantity.flatten(_flatten_style).astype(str)))
def write_radmc_lines_input(line_config: dict,
logger: logging.Logger,
path: Union[None, str] = None):
"""
Write the 'lines.inp' input file for RADMC-3D based on the provided line configuration.
:param line_config: A dictionary containing the line configuration, including mode, species, and collision partners.
:param logger: A logging.Logger instance for logging warnings and information.
:param path: Optional. The directory path where the file will be saved. Defaults to the current directory if None.
:return: None. Writes the 'lines.inp' file based on the configuration.
"""
_path = validate_parameter(path, default='.')
if line_config['lines_mode'] != 'lte':
assert len(line_config['collision_partners']) > 0
else:
logger.warning('Some collision partners were specified, but LTE populations were requested. Ignoring collision '
'partner data...')
line_config['collision_partners'] = []
with open(os.path.join(_path, 'lines.inp'), "w") as outfile:
# iformat
outfile.write('2\n')
outfile.write(f'{str(len(line_config["species_to_include"]))}\n')
for species in line_config["species_to_include"]:
outfile.write(
f'{species} leiden 0 0 {str(len(line_config["collision_partners"]))}\n')
for coll_partner in line_config['collision_partners']:
outfile.write(f'{coll_partner}\n')
def copy_additional_files(files_to_transfer: Union[None, List[str]] = None,
dst_path: Union[str, None] = None):
"""
Copy constant files from cache
:param files_to_transfer: list of files to transfer
:param dst_path: destination path of the files
"""
_dst_path = validate_parameter(dst_path, default=os.path.join('mdl', 'radmc_files'))
_files_to_transfer = validate_parameter(files_to_transfer, default=['dustkappa_silicate.inp',
'dustopac.inp'])
_path = os.path.join('mdl', 'data')
if not os.path.isdir(_dst_path):
os.mkdir(_dst_path)
for filename in _files_to_transfer:
shutil.copy2(os.path.join(_path, filename), _dst_path)
def write_radmc_main_input_file(config_mdl: dict,
config_lines: dict,
path: Union[None, str] = None):
"""
Creates the main input file for RADMC, which can be used to reprocess the input
:param config_mdl: the model configuration
:param config_lines: te line configuration, from the STG layer
:param path: the path where to put the input file
"""
_path = validate_parameter(path, default='.')
with open(os.path.join(_path, 'radmc3d.inp'), "w") as outfile:
for parameter in config_mdl["radmc_postprocessing"]:
outfile.write(f'{parameter} = {config_mdl["radmc_postprocessing"][parameter]}\n')
outfile.write(f'lines_mode = {radmc_lines_mode_mapping[config_lines["lines_mode"]]}\n')
def get_solid_body_rotation_y(grid_metadata: dict) -> Tuple[np.array, np.array]:
"""
Generate the velocity field for solid body rotation around the y-axis according to the configuration file
:param grid_metadata: the grid metadata
:return: a tuple with the velocity along the x- and y-axes
"""
distance_xz = grid_metadata['distance_matrix'][:, grid_metadata['grid_refpix'][1], :]
gradient = float(grid_metadata['velocity_gradient']) * u.Unit(grid_metadata['velocity_gradient_unit'])
radial_velocity = (distance_xz * u.cm * gradient).to(u.cm / u.second)
angle_in_radians = np.arctan(
abs(grid_metadata['centered_indices'][2, ...] / grid_metadata['centered_indices'][0, ...]))
velocity_x = radial_velocity[:, np.newaxis, :] * -np.sin(angle_in_radians) * np.sign(
grid_metadata['centered_indices'][2, ...])
velocity_z = radial_velocity[:, np.newaxis, :] * np.cos(angle_in_radians) * np.sign(
grid_metadata['centered_indices'][0, ...])
velocity_x = np.where(np.isnan(velocity_x), 0, velocity_x)
velocity_z = np.where(np.isnan(velocity_z), 0, velocity_z)
return velocity_x, velocity_z
def get_profiles(grid_metadata: dict) -> dict:
"""
Function that computes the profiles of the physical quantities needed for the model.
:param grid_metadata: the grid metadata obtained parsing the config file
:return: a dictionary of arrays containing the profiles of physical quantities
"""
density_ref = get_reference_value(grid_metadata=grid_metadata,
quantity_name='density',
desired_unit='cm^-3')
tdust_ref = get_reference_value(grid_metadata=grid_metadata,
quantity_name='dust_temperature',
desired_unit='K')
profiles_mapping = {
'gas_number_density': {
'central_value': (float(grid_metadata['central_density']) * u.Unit(grid_metadata['density_unit']))
.to(u.cm ** -3).value,
'power_law_index': float(grid_metadata['density_powerlaw_idx']),
'value_at_reference': density_ref['value_at_reference'],
'distance_reference': density_ref['distance_reference']
},
'dust_temperature': {
'central_value': float(grid_metadata['dust_temperature']),
'power_law_index': float(grid_metadata['dust_temperature_powerlaw_idx']),
'value_at_reference': tdust_ref['value_at_reference'],
'distance_reference': tdust_ref['distance_reference']
},
'microturbulence': {
'central_value': (float(grid_metadata['microturbulence']) * u.Unit(grid_metadata['microturbulence_unit']))
.to(u.cm / u.second).value,
'power_law_index': 0,
'value_at_reference': None,
'distance_reference': 1.0
},
'escprob_lengthscale': {
'central_value': min((np.max(grid_metadata['grid_size']) *
u.Unit(grid_metadata['grid_size_units'][np.argmax(grid_metadata['grid_size'])]))
.to(u.cm).value,
(2 * grid_metadata['maximum_radius'] * u.Unit(grid_metadata['maximum_radius_unit']))
.to(u.cm).value),
'power_law_index': 0,
'value_at_reference': None,
'distance_reference': 1.0
},
'velocity_x': {
'central_value': 0,
'power_law_index': 0,
'value_at_reference': None,
'distance_reference': 1.0
},
'velocity_y': {
'central_value': 0,
'power_law_index': 0,
'value_at_reference': None,
'distance_reference': 1.0
},
'velocity_z': {
'central_value': 0,
'power_law_index': 0,
'value_at_reference': None,
'distance_reference': 1.0
},
}
profiles = {}
for profile in profiles_mapping:
profiles[profile] = compute_power_law_radial_profile(
central_value=profiles_mapping[profile]['central_value'],
power_law_index=profiles_mapping[profile]['power_law_index'],
distance_matrix=grid_metadata['distance_matrix'],
value_at_reference=profiles_mapping[profile]['value_at_reference'],
distance_reference=profiles_mapping[profile]['distance_reference'],
maximum_radius=grid_metadata['maximum_radius'] * u.Unit(grid_metadata['maximum_radius_unit']).to(u.cm)
)
if grid_metadata['velocity_field'] == 'solid':
profiles['velocity_x'], profiles['velocity_z'] = get_solid_body_rotation_y(grid_metadata=grid_metadata)
profiles['velocity_field'] = np.array([profiles['velocity_x'], profiles['velocity_y'], profiles['velocity_z']])
if 'floor_temperature' in grid_metadata.keys():
profiles['dust_temperature'] = np.where(
profiles['dust_temperature'] < float(grid_metadata['floor_temperature']),
float(grid_metadata['floor_temperature']),
profiles['dust_temperature']
)
return profiles
def get_reference_value(grid_metadata: dict,
quantity_name: str,
desired_unit: str) -> dict:
"""
Compile a dictionary of reference values for the grid, applying a conversion, if necessary
:param grid_metadata: the grid metadata
:param quantity_name: the quantity to process
:param desired_unit: the output unit to use
:return: the dictionary with the value at reference in the desired units and the distance reference used to scale
the power law
"""
try:
reference_value_dict = {
'value_at_reference': (
(float(grid_metadata[f'{quantity_name}_at_reference']) * u.Unit(grid_metadata[f'{quantity_name}_unit']))
.to(u.Unit(desired_unit))).value,
'distance_reference': (float(grid_metadata['distance_reference']) * u.Unit(
grid_metadata['distance_reference_unit'])).to(u.cm).value
}
except (KeyError, AttributeError):
reference_value_dict = {
'value_at_reference': None,
'distance_reference': 1.0
}
return reference_value_dict
def write_grid_input_files(profiles: dict,
grid_metadata: dict,
wavelengths_micron: np.array,
path: Union[str, None] = None):
"""
Write the grid input files needed by RADMC3d
:param profiles: the dictionary of profiles for physical quantities that define the model
:param grid_metadata: the grid metadata
:param wavelengths_micron: the array of wavelengths to model
:param path: the output path
"""
_path = validate_parameter(path, default=os.path.join('mdl', 'radmc_files'))
quantity_mapping = {
'amr_grid.inp': {
'quantity': grid_metadata['grid_edges'],
},
'dust_density.inp': {
'quantity': (profiles['gas_number_density'] * u.cm ** -3 / 100. * cst.m_p.to(
u.gram) * mean_molecular_mass).value,
},
'dust_temperature.dat': {
'quantity': profiles['dust_temperature'],
},
'microturbulence.inp': {
'quantity': profiles['microturbulence'],
},
'gas_velocity.inp': {
'quantity': profiles['velocity_field'],
},
'wavelength_micron.inp': {
'quantity': wavelengths_micron,
},
'escprob_lengthscale.inp': {
'quantity': profiles['escprob_lengthscale'],
},
}
for filename in quantity_mapping:
write_radmc_input(filename=filename,
quantity=quantity_mapping[filename]['quantity'],
path=_path,
grid_metadata=grid_metadata)
def write_molecular_number_density_profiles(profiles: dict,
line_config: dict,
grid_metadata: dict,
path: Union[str, None] = None) -> dict:
"""
Write the molecular number density profiles. A different function is used wrt the other profiles because the
molecular ones depend on the others, and on the abundance (profiles)
:param profiles: the dictionary of profiles for physical quantities that define the model
:param line_config: the dictionary of the line configurations, for the abundances
:param grid_metadata: the grid metadata
:param path: the output path
"""
_path = validate_parameter(path, default=os.path.join('mdl', 'radmc_files'))
hot_core_specs = read_abundance_variation_schema(line_config=line_config)
species_profiles = {}
for species in line_config['species_to_include'] + line_config['collision_partners']:
species_profiles[species] = compute_molecular_number_density_hot_core(
gas_number_density_profile=profiles['gas_number_density'],
abundance=float(line_config['molecular_abundances'][species]),
temperature_profile=profiles['dust_temperature'],
threshold=float(hot_core_specs[species]['threshold']),
abundance_jump=float(hot_core_specs[species]['abundance_jump']))
write_radmc_input(filename=f'numberdens_{species}.inp',
quantity=species_profiles[species],
path=_path,
grid_metadata=grid_metadata)
return species_profiles
def save_fits_grid_profile(quantity: np.array,
filename: str,
path: str = None):
"""
Save the model grid to a fits file
:param quantity: the array to be stored in the fits file
:param filename: the name of the file to be created
:param path: the path where the file will be stored
"""
_path = validate_parameter(path, default=os.path.join('prs', 'fits', 'grids'))
if not os.path.isfile(os.path.join(_path, filename)):
try:
fits.writeto(os.path.join(_path, filename), quantity)
except OSError:
pass
else:
logger.info('Skipping saving of fits grid. File already present!')
def convert_dimensional_unit(value: Union[float, None, str],
current_unit: str,
desired_unit: str) -> Union[float, None]:
"""
Convert the quantities specified in the configuration file to standard ones
:param value: the value to convert
:param current_unit: unit specified in the config file
:param desired_unit: the output unit
:return: the converted value, if defined
"""
if value is not None:
return (float(value) * u.Unit(current_unit)).to(u.Unit(desired_unit)).value
else:
return None
def remap_metadata_to_grid_row(zipped_grid_name: str,
grid_metadata: dict,
config: dict) -> dict:
"""
Create a dictionary that can be persisted in the database, according to the GridPars table structure
:param zipped_grid_name: the name of the compressed archive that contains grid files
:param grid_metadata: the grid metadata
:param config: the configuration dictionary, to store human-readable grid- and coordinate types
:return: the dictionary to be inserted in the database
"""
_config = config['grid']
dimensional_quantities = {
'central_density': {'unit_key': 'density_unit', 'desired_unit': 'cm^-3'},
'density_at_reference': {'unit_key': 'density_unit', 'desired_unit': 'cm^-3'},
'microturbulence': {'unit_key': 'microturbulence_unit', 'desired_unit': 'km/s'},
'velocity_gradient': {'unit_key': 'velocity_gradient_unit', 'desired_unit': 'km/(s pc)'},
'velocity_at_reference': {'unit_key': 'velocity_unit', 'desired_unit': 'km/s'},
'distance_reference': {'unit_key': 'distance_reference_unit', 'desired_unit': 'pc'},
'maximum_radius': {'unit_key': 'maximum_radius_unit', 'desired_unit': 'pc'},
}
grid_row = {
'zipped_grid_name': zipped_grid_name,
'grid_type': _config['grid_type'],
'coordinate_system': _config['coordinate_system'],
'central_density': get_value_if_specified(parameters_dict=grid_metadata, key='central_density'),
'density_powerlaw_index': get_value_if_specified(parameters_dict=grid_metadata, key='density_powerlaw_idx'),
'density_at_reference': get_value_if_specified(parameters_dict=grid_metadata, key='density_at_reference'),
'dust_temperature': get_value_if_specified(parameters_dict=grid_metadata, key='dust_temperature'),
'dust_temperature_powerlaw_index': get_value_if_specified(parameters_dict=grid_metadata,
key='dust_temperature_powerlaw_idx'),
'dust_temperature_at_reference': get_value_if_specified(parameters_dict=grid_metadata,
key='dust_temperature_at_reference'),
'microturbulence': get_value_if_specified(parameters_dict=grid_metadata, key='microturbulence'),
'velocity_field': get_value_if_specified(parameters_dict=grid_metadata, key='velocity_field'),
'velocity_gradient': get_value_if_specified(parameters_dict=grid_metadata, key='velocity_gradient'),
'velocity_powerlaw_index': get_value_if_specified(parameters_dict=grid_metadata, key='velocity_powerlaw_index'),
'velocity_at_reference': get_value_if_specified(parameters_dict=grid_metadata, key='velocity_at_reference'),
'distance_reference': get_value_if_specified(parameters_dict=grid_metadata, key='distance_reference'),
'maximum_radius': get_value_if_specified(parameters_dict=grid_metadata, key='maximum_radius'),
'grid_size_1': grid_metadata['grid_size'][0],
'grid_shape_1': grid_metadata['grid_shape'][0],
'grid_refpix_1': grid_metadata['grid_refpix'][0],
'grid_size_2': grid_metadata['grid_size'][1],
'grid_shape_2': grid_metadata['grid_shape'][1],
'grid_refpix_2': grid_metadata['grid_refpix'][1],
'grid_size_3': grid_metadata['grid_size'][2],
'grid_shape_3': grid_metadata['grid_shape'][2],
'grid_refpix_3': grid_metadata['grid_refpix'][2],
}
for qty in dimensional_quantities:
try:
grid_row[qty] = convert_dimensional_unit(value=grid_row[qty],
current_unit=u.Unit(
grid_metadata[dimensional_quantities[qty]['unit_key']]),
desired_unit=u.Unit(dimensional_quantities[qty]['desired_unit']))
except KeyError:
grid_row[qty] = None
for dim_idx in range(2):
grid_row[f'grid_size_{dim_idx + 1}'] = (
grid_row[f'grid_size_{dim_idx + 1}'] * u.Unit(grid_metadata['grid_size_units'][dim_idx])).to(
u.pc).value
return grid_row
def populate_grid_table(config: dict,
engine: sqla_engine,
grid_metadata: dict,
run_id: str) -> str:
"""
Upsert the grid data to the postgres DB
:param config: the configuration dictionary
:param engine: the SQLAlchemy engine to use
:param grid_metadata: the grid metadata
:return: the compressed archive filename
"""
output_filename = f'{compute_unique_hash_filename(config=config)}.zip'
remapped_row = remap_metadata_to_grid_row(
zipped_grid_name=output_filename,
grid_metadata=grid_metadata,
config=config
)
remapped_row['run_id'] = run_id
remapped_row['created_on'] = datetime.now()
upsert(
table_object=GridPars,
row_dict=remapped_row,
conflict_keys=[GridPars.zipped_grid_name, GridPars.run_id],
engine=engine
)
return output_filename
def populate_grid_files(quantity_name: str,
engine: sqla_engine,
zip_filename: str,
filename: str,
run_id: str):
"""
Stores into the DB the grid filename for later reference
:param quantity_name: the name of the quantity to store
:param engine: the SQLAlchemy engine
:param zip_filename: the name of the grid archive containing the radmc3d input files
:param filename: the fits filename where the grid is stored
:param run_id: the ID of the run
"""
raw_insert_entry = {'zipped_grid_name': zip_filename,
'quantity': quantity_name,
'fits_grid_name': filename,
'created_on': datetime.now(),
'run_id': run_id}
upsert(
table_object=GridFiles,
row_dict=raw_insert_entry,
conflict_keys=[GridFiles.zipped_grid_name, GridFiles.quantity, GridFiles.run_id],
engine=engine
)
def populate_stars_table(config_stars: dict,
engine: sqla_engine,
grid_zipfile: str,
run_id: str):
"""
Populate the stars table in the DB
:param config_stars: the dictionary containing the stars configurations
:param engine: the SQLAlchemy engine to use
:param executed_on: the timestamp of execution, to add to the record
:param grid_zipfile: the grid tarfile name, to be used as key
:param run_id: the run unique identifier
"""
_config_stars = config_stars.copy()
_config_stars['zipped_grid_name'] = grid_zipfile
_config_stars['created_on'] = datetime.now()
_config_stars['run_id'] = run_id
upsert(
table_object=StarsPars,
row_dict=_config_stars,
conflict_keys=[StarsPars.zipped_grid_name, StarsPars.run_id],
engine=engine
)
def write_stellar_input_file(stars_metadata: dict,
grid_metadata: dict,
path: str,
wavelengths_micron: np.array):
"""
Create the input file to insert stars in the computation
:param stars_metadata: the metadata about the stars, as specified in the configuration file
:param grid_metadata: the grid metadata
:param path: the path where the files will be created
:param wavelengths_micron: the wavelength to consider
"""
star_properties = [' '.join([str(rstar), str(mstar), str(pos[0]), str(pos[1]), str(pos[2])]) for rstar, mstar, pos
in zip(stars_metadata['rstars'], stars_metadata['mstars'], stars_metadata['star_positions'])]
override_defaults = {
'iformat': 2,
'nstars': stars_metadata['nstars'],
'continuum_lambdas': stars_metadata['nlambdas'],
'stars_properties': '\n'.join(star_properties),
'lambdas': ' \n'.join(wavelengths_micron.astype(str)),
}
write_radmc_input(filename='stars.inp',
path=path,
quantity=np.array(stars_metadata['star_fluxes']),
grid_metadata=grid_metadata,
override_defaults=override_defaults,
flatten_style='C')
def get_grid_name(method: Union[str, None] = None,
zip_filename: Union[str, None] = None,
quantity_name: Union[str, None] = None):
"""
Compute the name of the grid file
:param method: method used to compute the file name; uuid generates a random unique identifier, composite_grid
appends the quantity name to the zipped archive name
:param zip_filename: the zipped archive name
:param quantity_name: the name of the quantity that should be processed
:return: the name of the file
"""
_method = validate_parameter(method, default='uuid')
allowed_methods = ('uuid', 'composite_grid')
if method == 'uuid':
return f'{str(uuid.uuid4())}.fits'
elif method == 'composite_grid':
assert ((zip_filename is not None) and (quantity_name is not None))
return f'{".".join(zip_filename.split(".")[0:-1])}_{quantity_name}.fits'
else:
raise NotImplementedError(
f'The chosen method is not available. Allowed options are: {" ".join(allowed_methods)}')
def main(run_id: str,
override_config: Union[dict, None] = None,
path_radmc_files: Union[str, None] = None,
compute_dust_temperature: bool = True,
engine: sqla_engine = None) -> str:
_override_config = validate_parameter(override_config, default={'grid_lines': {}, 'model': {}})
config = load_config_file(os.path.join('stg', 'config', 'config.yml'),
override_config=_override_config['grid_lines'])
config_lines = config['lines']
config_mdl = load_config_file(os.path.join('mdl', 'config', 'config.yml'),
override_config=_override_config['model'])
input_files_dir = validate_parameter(path_radmc_files, default=os.path.join('mdl', 'radmc_files'))
cleanup_directory(directory=input_files_dir,
logger=logger)
copy_additional_files(dst_path=input_files_dir)
grid_metadata = extract_grid_metadata(config=config)
if 'stars' in config:
wavelengths_micron = manage_wavelength_grid_with_stars(config, grid_metadata, input_files_dir)
else:
wavelengths_micron = np.logspace(np.log10(5),
np.log10(1300),
100)
grid_metadata['continuum_lambdas'] = len(wavelengths_micron)
profiles = get_profiles(grid_metadata=grid_metadata)
write_grid_input_files(grid_metadata=grid_metadata,
profiles=profiles,
path=input_files_dir,
wavelengths_micron=wavelengths_micron)
get_moldata(species_names=config['lines']['species_to_include'],
logger=logger,
path=input_files_dir,
use_cache=True)
write_radmc_lines_input(line_config=config['lines'],
path=input_files_dir,
logger=logger)
molecular_number_density_profiles = write_molecular_number_density_profiles(profiles=profiles,
grid_metadata=grid_metadata,
line_config=config['lines'],
path=input_files_dir)
if engine is None:
engine = get_pg_engine(logger=logger)
zip_filename = populate_grid_table(config=config,
engine=engine,
grid_metadata=grid_metadata,
run_id=run_id)
engine.dispose()
write_radmc_main_input_file(config_mdl=config_mdl,
config_lines=config_lines,
path=input_files_dir)
execution_dir = os.getcwd()
os.chdir(input_files_dir)
# Recompute dust temperature distribution if needed, based on star positions and properties
if 'stars' in config:
if compute_dust_temperature is True:
logger.info('Computing dust temperature distribution using the stars in the configuration file')
try:
_threads = config_mdl['radmc_observation']['threads']
except KeyError:
_threads = 4
os.system(f'radmc3d mctherm setthreads {str(_threads)}')
populate_stars_table(config_stars=config['stars'],
engine=engine,
grid_zipfile=zip_filename,
run_id=run_id)
else:
logger.info('Using cached dust temperature distribution')
shutil.copy(os.path.join(execution_dir, 'model', 'data', 'dust_temperature.dat'),
os.path.join('.', 'dust_temperature.dat'))
# TODO: Should I apply the floor temperature here as well?
os.chdir(execution_dir)
if engine is None:
engine = get_pg_engine(logger=logger)
for quantity_name in ('gas_number_density', 'dust_temperature'):
save_and_persist_grid(engine=engine,
profiles=profiles,
run_id=run_id,
zip_filename=zip_filename,
quantity_name=quantity_name)
for species in molecular_number_density_profiles:
save_and_persist_grid(engine=engine,
profiles=molecular_number_density_profiles,
run_id=run_id,
zip_filename=zip_filename,
quantity_name=species)
engine.dispose()
make_archive(output_filename=zip_filename,
source_dir=input_files_dir,
archive_path=os.path.join('stg', 'archive'))
return zip_filename
def manage_wavelength_grid_with_stars(config: dict,
grid_metadata: dict,
input_files_dir: str) -> np.array:
stars_metadata = config['stars']
if stars_metadata['spacing'] == 'log':
wavelengths_micron = np.logspace(np.log10(stars_metadata['lambdas_micron_limits'][0]),
np.log10(stars_metadata['lambdas_micron_limits'][1]),
stars_metadata['nlambdas'])
elif stars_metadata['spacing'] == 'linear':
wavelengths_micron = np.linspace(stars_metadata['lambdas_micron_limits'][0],
stars_metadata['lambdas_micron_limits'][1],
stars_metadata['nlambdas'])
else:
raise (NotImplemented('Spacing not defined. Choose between {linear, log}'))
write_stellar_input_file(stars_metadata=stars_metadata,
grid_metadata=grid_metadata,
path=input_files_dir,
wavelengths_micron=wavelengths_micron)
return wavelengths_micron
def save_and_persist_grid(engine: sqla_engine,
profiles: dict,
run_id: str,
zip_filename: str,
quantity_name: str,
method: Union[str, None] = None):
_method = validate_parameter(method, default='composite_grid')
grid_file_name = get_grid_name(method=_method,
zip_filename=zip_filename,
quantity_name=quantity_name)
save_fits_grid_profile(quantity=profiles[quantity_name],
filename=grid_file_name)
populate_grid_files(quantity_name=quantity_name,
engine=engine,
zip_filename=zip_filename,
filename=grid_file_name,
run_id=run_id)
if __name__ == '__main__':
main(run_id='test_run')
import os
from unittest import TestCase
import numpy as np
import pandas as pd
from astropy import units as u
from assets.commons import (load_config_file,
validate_parameter,
setup_logger,
get_moldata)
from assets.commons.db_utils import get_credentials
from assets.commons.grid_utils import (get_grid_edges,
get_physical_px_size,
compute_cartesian_coordinate_grid,
compute_power_law_radial_profile,
get_distance_matrix,
get_centered_indices,
extract_grid_metadata,
compute_los_average_weighted_profile)
from assets.commons.parsing import (get_grid_properties,
parse_grid_overrides)
from assets.commons.training_utils import (compute_and_add_similarity_cols,
split_data)
def create_test_config(config_dict: dict,
config_filename: str):
with open(config_filename, 'w') as config_file:
config_file.write('grid:\n')
for key in config_dict:
config_file.write(f' {key}: {config_dict[key]}\n')
class TestCommons(TestCase):
def setUp(self):
self.config_filename = 'config.yml'
self.logger = setup_logger(name='TEST_COMMONS')
def test_compute_power_law_radial_profile_flat(self):
indices = np.indices([3, 3]) - 1
distance_matrix = np.sqrt(indices[0, :, :] ** 2 + indices[1, :, :] ** 2)
radial_profile = compute_power_law_radial_profile(
central_value=1,
power_law_index=0,
distance_matrix=distance_matrix
)
self.assertTrue(np.array_equal(radial_profile, np.ones([3, 3])))
def test_compute_power_law_radial_profile_pl(self):
indices = np.indices([3, 3]) - 1
distance_matrix = np.sqrt(indices[0, :, :] ** 2 + indices[1, :, :] ** 2)
distance_matrix[1, 1] = 1.0
radial_profile = compute_power_law_radial_profile(
central_value=1,
power_law_index=-1,
distance_matrix=distance_matrix
)
self.assertTrue(np.array_equal(radial_profile, 1.0 / distance_matrix))
def test_compute_power_law_radial_profile_pl_with_reference(self):
indices = np.indices([3, 3]) - 1
distance_matrix = np.sqrt(indices[0, :, :] ** 2 + indices[1, :, :] ** 2)
radial_profile = compute_power_law_radial_profile(
central_value=2,
value_at_reference=1,
distance_reference=1,
power_law_index=-1,
distance_matrix=distance_matrix
)
censored_rp = radial_profile.copy()
censored_rp[1, 1] = np.nan
self.assertEqual(radial_profile[1, 1], np.nanmax(censored_rp))
def test_get_grid_properties_uniform(self):
_config_filename = os.path.join('test_files', self.config_filename)
config_dict = {
'dim1': {"size": 0.1, "unit": "pc", "shape": 5, "refpix": 2},
}
keywords = ['size', 'unit', 'shape', 'refpix']
expected_result = {
'size': [0.1] * 3,
'unit': ['pc'] * 3,
'shape': [5] * 3,
'refpix': [2] * 3,
}
create_test_config(config_dict=config_dict,
config_filename=_config_filename)
config = load_config_file(_config_filename)
for key in keywords:
grid_properties = get_grid_properties(grid_config=config['grid'],
keyword=key)
self.assertListEqual(grid_properties, expected_result[key])
def test_get_grid_properties(self):
_config_filename = os.path.join('test_files', self.config_filename)
config_dict = {
'dim1': {"size": 0.1, "unit": "pc", "shape": 5, "refpix": 2},
'dim2': {"size": 0.2, "unit": "pc", "shape": 10, "refpix": 4.5},
'dim3': {"size": 0.3, "unit": "pc", "shape": 15, "refpix": 7},
}
keywords = ['size', 'unit', 'shape', 'refpix']
expected_result = {
'size': [0.1, 0.2, 0.3],
'unit': ['pc'] * 3,
'shape': [5, 10, 15],
'refpix': [2, 4.5, 7],
}
create_test_config(config_dict=config_dict,
config_filename=_config_filename)
config = load_config_file(_config_filename)
for key in keywords:
grid_properties = get_grid_properties(grid_config=config['grid'],
keyword=key)
self.assertListEqual(grid_properties, expected_result[key])
def test_get_grid_edges(self):
grid_metadata = {
'grid_shape': [2, 2],
'physical_px_size': [1 * u.cm, 1 * u.cm],
'grid_refpix': [0.5, 0.5]
}
expected_results_per_axis = np.array([-1, 0, 1])
grid_edges = get_grid_edges(grid_metadata=grid_metadata)
for axis_idx in range(len(grid_metadata['grid_shape'])):
self.assertTrue(np.array_equal(expected_results_per_axis, grid_edges[:, axis_idx]))
def test_get_physical_px_size(self):
_config_filename = os.path.join('test_files', self.config_filename)
grid_size = 0.2
npix = 5
config_dict = {
'grid_type': 'regular',
'coordinate_system': 'cartesian',
'central_density': '1e6',
'density_unit': 'cm^-3',
'dim1': {"size": grid_size, "size_units": "pc", "shape": npix, "refpix": 2},
}
expected_result = [(grid_size * u.pc).to(u.cm) / npix] * 3
create_test_config(config_dict=config_dict,
config_filename=_config_filename)
config = load_config_file(_config_filename)
grid_metadata = extract_grid_metadata(config=config)
physical_px_size = get_physical_px_size(grid_metadata)
self.assertListEqual(expected_result, physical_px_size)
def test_compute_cartesian_coordinate_grid(self):
indices = np.array([0, 1, 2])
physical_px_size = [0.1 * u.pc]
expected_results = indices * (physical_px_size[0]).to(u.cm).value
computed_grid = compute_cartesian_coordinate_grid(indices=indices,
physical_px_size=physical_px_size)
self.assertTrue(np.array_equal(expected_results, computed_grid))
def test_compute_cartesian_coordinate_grid_2d(self):
indices = np.indices([3, 4])
physical_px_size = [1 * u.cm, 1 * u.cm]
expected_results = indices
computed_grid = compute_cartesian_coordinate_grid(indices=indices,
physical_px_size=physical_px_size)
self.assertTrue(np.array_equal(expected_results, computed_grid))
def test_get_distance_matrix_2d(self):
grid_metadata = {
'grid_shape': [2, 2],
'physical_px_size': [1 * u.cm, 1 * u.cm]
}
indices = np.indices(grid_metadata['grid_shape'])
expected_results = np.array([[0, 1], [1, np.sqrt(2)]])
distance_matrix = get_distance_matrix(grid_metadata=grid_metadata,
indices=indices)
self.assertTrue(np.array_equal(expected_results, distance_matrix))
def test_get_distance_matrix(self):
grid_metadata = {
'grid_shape': [2, 2, 2],
'physical_px_size': [1 * u.cm, 1 * u.cm, 1 * u.cm]
}
indices = np.indices(grid_metadata['grid_shape'])
expected_results = np.array([[[0, 1], [1, np.sqrt(2)]],
[[1, np.sqrt(2)], [np.sqrt(2), np.sqrt(3)]]])
distance_matrix = get_distance_matrix(grid_metadata=grid_metadata,
indices=indices)
self.assertTrue(np.array_equal(expected_results, distance_matrix))
def test_get_distance_matrix_symmetric(self):
grid_metadata = {
'grid_shape': [5, 5, 5],
'physical_px_size': [1 * u.cm, 1 * u.cm, 1 * u.cm]
}
indices = np.indices(grid_metadata['grid_shape']) - 3
distance_matrix = get_distance_matrix(grid_metadata=grid_metadata,
indices=indices)
self.assertTrue(np.array_equal(distance_matrix, distance_matrix.T))
def test_get_centered_indices(self):
grid_metadata = {
'grid_shape': [5, 3],
'grid_refpix': [2, 1]
}
expected_indices = [[[-2, -2, -2],
[-1, -1, -1],
[0, 0, 0],
[1, 1, 1],
[2, 2, 2]],
[[-1, 0, 1],
[-1, 0, 1],
[-1, 0, 1],
[-1, 0, 1],
[-1, 0, 1]]]
indices = get_centered_indices(grid_metadata=grid_metadata)
self.assertTrue(np.array_equal(np.array(expected_indices), indices))
def test_validate_parameter_none(self):
parameter = None
default = 10
result = validate_parameter(param_to_validate=parameter,
default=default)
self.assertEqual(default, result)
def test_validate_parameter(self):
parameter = 100
default = 10
result = validate_parameter(param_to_validate=parameter,
default=default)
self.assertEqual(parameter, result)
def test_parse_grid_overrides_linear(self):
config = {
'overrides': {
'dust_temperature_grid_type': 'linear',
'dust_temperature_limits': [10, 30],
'dust_temperature_step': 1,
}
}
expected_results = np.arange(config['overrides']['dust_temperature_limits'][0],
config['overrides']['dust_temperature_limits'][1],
config['overrides']['dust_temperature_step'])
grid_values = parse_grid_overrides(par_name='dust_temperature',
config=config)
self.assertTrue(np.array_equal(expected_results, grid_values))
def test_parse_grid_overrides_log(self):
config = {
'overrides': {
'dust_temperature_grid_type': 'log',
'dust_temperature_limits': [10, 200],
'dust_temperature_step': 2,
}
}
expected_results = np.array([10, 20, 40, 80, 160])
grid_values = parse_grid_overrides(par_name='dust_temperature',
config=config)
self.assertTrue(np.array_equal(expected_results, grid_values.astype(int)))
def test_parse_grid_overrides_log_exponent(self):
exponent = 0.25
config = {
'overrides': {
'dust_temperature_grid_type': 'log',
'dust_temperature_limits': [10, 100],
'dust_temperature_step': exponent,
}
}
expected_results = np.array(
[10, 10 * 10 ** exponent, 10 * 10 ** (2 * exponent), 10 * 10 ** (3 * exponent), 10 * 10 ** (4 * exponent)])
grid_values = parse_grid_overrides(par_name='dust_temperature',
config=config)
self.assertTrue(np.allclose(expected_results, grid_values, 5))
def test_get_credentials(self):
expected_results = {
'DB_USER': 'pippo',
'DB_PASS': 'pluto',
'DB_HOST': 'localhost',
'DB_NAME': 'postgres',
'DB_PORT': 5432,
}
credentials = get_credentials(logger=self.logger,
credentials_filename=os.path.join('test_files', 'credentials.yaml'),
set_envs=True)
self.assertDictEqual(credentials, expected_results)
for key in expected_results:
self.assertEqual(str(expected_results[key]), os.getenv(key))
def test_get_moldata_from_cache(self):
species = 'e-ch3oh'
os.chdir('..')
expected_path = os.path.join('tests', 'test_files', f'molecule_{species}.inp')
try:
os.remove(expected_path)
except IOError:
pass
get_moldata(species_names=[species],
logger=self.logger,
path=os.path.join('tests', 'test_files'),
use_cache=True)
self.assertTrue(os.path.isfile(expected_path))
os.remove(expected_path)
os.chdir('tests')
def test_get_moldata_negate_cache(self):
species = 'e-ch3oh'
os.chdir('..')
expected_path = os.path.join('tests', 'test_files', f'molecule_{species}.inp')
try:
os.remove(expected_path)
except IOError:
pass
get_moldata(species_names=[species],
logger=self.logger,
path=os.path.join('tests', 'test_files'),
use_cache=False)
self.assertTrue(os.path.isfile(expected_path))
os.remove(expected_path)
os.chdir('tests')
def test_compute_los_average_weighted_profile(self):
indices = np.indices([3, 3, 3]) - 1
distance_matrix = np.sqrt(indices[0, :, :] ** 2 + indices[1, :, :] ** 2 + indices[2, :, :] ** 2)
radial_profile = compute_power_law_radial_profile(
central_value=1,
power_law_index=0,
distance_matrix=distance_matrix
)
rng = np.random.default_rng(seed=3)
weights = rng.random(radial_profile.shape)
los_average = compute_los_average_weighted_profile(profile=radial_profile,
weights=weights)
expected_result = np.ones_like(los_average)
self.assertTrue(np.array_equal(expected_result, los_average))
def test_compute_los_average_weighted_profile_pl(self):
indices = np.indices([3, 3, 3]) - 1
distance_matrix = np.sqrt(indices[0, :, :] ** 2 + indices[1, :, :] ** 2 + indices[2, :, :] ** 2)
radial_profile = compute_power_law_radial_profile(
central_value=1,
power_law_index=-1,
distance_matrix=distance_matrix
)
weights = np.ones_like(radial_profile)
los_average = compute_los_average_weighted_profile(profile=radial_profile,
weights=weights)
expected_result = np.where(~np.isfinite(np.mean(1.0 / distance_matrix, axis=2)),
1,
np.mean(1.0 / distance_matrix, axis=2))
self.assertTrue(np.array_equal(expected_result, los_average))
def test_compute_los_average_weighted_profile_pl_weights(self):
indices = np.indices([3, 3, 3]) - 1
distance_matrix = np.sqrt(indices[0, :, :] ** 2 + indices[1, :, :] ** 2 + indices[2, :, :] ** 2)
radial_profile = compute_power_law_radial_profile(
central_value=1,
power_law_index=-1,
distance_matrix=distance_matrix
)
rng = np.random.default_rng(seed=3)
weights = rng.random(radial_profile.shape)
los_average = compute_los_average_weighted_profile(profile=radial_profile,
weights=weights)
weights /= np.sum(weights, axis=2, keepdims=True)
expected_radial_profile = np.where(~np.isfinite(1.0 / distance_matrix),
1, 1.0 / distance_matrix)
self.assertTrue(np.array_equal(radial_profile, expected_radial_profile))
expected_result = np.where(~np.isfinite(1.0 / distance_matrix),
1, np.sum(radial_profile * weights, axis=2))
self.assertTrue(np.allclose(expected_result, los_average, 1e-5))
class TestTraining(TestCase):
def setUp(self):
feature_names = ['feature_01', 'feature_02']
self.df_data = pd.DataFrame(
data=[
[1, -1],
[1, 1],
[-1, -1],
[-1, 0],
[2, 1],
[0, 1],
[0, 0]
],
columns=feature_names
)
self.average_features_per_target_bin = pd.DataFrame(
data=[
[1, 1],
[-1, -1]
],
columns=feature_names
)
def test_compute_and_add_similarity_cols_output_features(self):
test_df = compute_and_add_similarity_cols(average_features_per_target_bin=self.average_features_per_target_bin,
input_df=self.df_data,
similarity_bins=2)
self.assertListEqual(
list1=list(test_df.columns),
list2=list(self.df_data.columns) + [f'sim_{str(idx).zfill(2)}' for idx in range(2)]
)
def test_compute_and_add_similarity_cols_drop_features_output(self):
df_data_with_additional_column = self.df_data.copy()
df_data_with_additional_column['additional_column'] = 1
assert 'additional_column' in df_data_with_additional_column.columns
test_df = compute_and_add_similarity_cols(average_features_per_target_bin=self.average_features_per_target_bin,
input_df=df_data_with_additional_column,
similarity_bins=2)
self.assertListEqual(
list1=list(test_df.columns),
list2=list(df_data_with_additional_column.columns) + [f'sim_{str(idx).zfill(2)}' for idx in range(2)]
)
def test_compute_and_add_similarity_cols(self):
test_df = compute_and_add_similarity_cols(average_features_per_target_bin=self.average_features_per_target_bin,
input_df=self.df_data,
similarity_bins=2)
df_data_l2_norm = np.sqrt(np.sum(self.df_data.values ** 2, axis=1)).reshape(len(self.df_data), 1)
average_features_per_target_bin_l2_norm = (np.sqrt(np.sum(self.average_features_per_target_bin.values ** 2,
axis=1))
.reshape(1, len(self.average_features_per_target_bin)))
expected_result = np.dot(
self.df_data,
self.average_features_per_target_bin.T
) / np.dot(df_data_l2_norm,
average_features_per_target_bin_l2_norm)
self.assertTrue(
np.array_equal(
test_df[['sim_00', 'sim_01']].values.round(5),
np.nan_to_num(expected_result.round(5), nan=0)
)
)
def test_split_data(self):
merged = pd.DataFrame(
data=[
[2.7e4, 1, 1, 2],
[2.187e6, 2, 2, 4],
[1e3, 3, 3, 6],
[1e4, 4, 4, 8],
[1e5, 5, 5, 10],
[1e6, 6, 6, 12],
[1e7, 7, 7, 14]
],
columns=['nh2', 'tdust', 'predictor', 'target'])
x_test, x_train, x_validation, y_test, y_train, y_validation = split_data(
merged=merged,
target_column='target',
predictor_columns=['nh2', 'tdust', 'predictor']
)
self.assertListEqual(list((10**x_test['nh2'].unique()).round(1)), [2.7e4])
self.assertListEqual(list((10**x_validation['nh2'].unique()).round(1)), [2.187e6])
DB_USER: cGlwcG8=
DB_PASS: cGx1dG8=
DB_HOST: localhost
DB_NAME: postgres
DB_PORT: 5432
\ No newline at end of file