Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • main
  • v0.1
  • v0.2
  • v0.2.1
  • v0.3
  • v0.3.1
  • v0.3.2
  • v0.3.3
8 results

Target

Select target project
  • andrea.giannetti/swiss_army_knife_stable
1 result
Select Git revision
  • main
  • v0.1
  • v0.2
  • v0.2.1
  • v0.3
  • v0.3.1
  • v0.3.2
  • v0.3.3
8 results
Show changes
from itertools import groupby
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
from assets.commons import (get_data)
from astropy import units as u
from astropy import constants
from sklearn.linear_model import LinearRegression
from typing import Tuple
plt.rcParams.update({'font.size': 16})
def get_moldata() -> pd.DataFrame:
source_file = 'mdl/data/molecule_e-ch3oh.inp'
rows_list = []
with open(source_file, 'r') as infile:
file_content = infile.readlines()
file_content_sections = [list(g) for k, g in groupby(file_content, lambda x: x.startswith('!')) if not k]
levels = file_content_sections[3]
for line in levels:
split_line = line.split()
rows_list.append({
'original_index': int(split_line[0]),
'energy': float(split_line[1]),
'g': float(split_line[2]),
'weight': 2 * float(split_line[2]) + 1,
'J_K': f'{split_line[3]}'
})
energy_levels = pd.DataFrame(rows_list)
return energy_levels.sort_values(by='energy')
def compute_partition_function(temperature: float,
energies_and_gs: pd.DataFrame) -> float:
partition_function = 0
for level in energies_and_gs.iterrows():
partition_function += float(level[1]['g']) * np.exp(-float(level[1]['energy']) / temperature)
return partition_function
def get_coldens_z_temperature(energies: np.array, y_rotational_diagram: np.array) -> Tuple[np.array, np.array]:
coldens_z = []
temperature = []
for pix_y_rotdia in y_rot_dia.value:
reg = LinearRegression().fit(np.log10(np.e) * energies.reshape(-1, 1), np.log10(pix_y_rotdia))
coldens_z.append(reg.intercept_)
temperature.append(-1 / reg.coef_[0])
temperature = np.array(temperature)
temperature = np.where((temperature > 200) | (temperature <= 0), 200, temperature)
return np.array(coldens_z), temperature
def get_y_rotational_diagram(mom_zero_data: pd.DataFrame,
frequencies: np.array,
einstein_a: np.array,
gs: np.array):
integrated_intensities = np.array(
mom_zero_data[['mom_zero_86', 'mom_zero_87', 'mom_zero_88']]) * u.Jansky.decompose() * u.km / u.s * \
(constants.c.cgs ** 2 / (2 * constants.k_B.decompose() * (frequencies * u.GHz).to(
u.Hz) ** 2).decompose()) / (np.arcsin(2 / 101)) ** 2
integrated_intensities = integrated_intensities.decompose().cgs
y_rot_dia = 8 * np.pi * constants.k_B.cgs * (frequencies * u.GHz).to(u.Hz) ** 2 / \
(constants.h.cgs * constants.c.cgs ** 3 * einstein_a * (u.s ** -1)) * \
integrated_intensities / gs
y_rot_dia = y_rot_dia.decompose().cgs
return y_rot_dia
# Transitions data
# 86, 87, 88
gs = np.array([5, 5, 5])
energies = np.array([12.5, 20.1, 28.0])
einstein_a = np.array([2.557794E-06, 3.407341E-06, 2.624407E-06])
frequencies = np.array([96.739358, 96.744545, 96.75550])
data = get_data(limit_rows=None)
molecular_data = get_moldata()
y_rot_dia = get_y_rotational_diagram(mom_zero_data=data,
frequencies=frequencies,
einstein_a=einstein_a,
gs=gs)
coldens_z, temperature = get_coldens_z_temperature(energies=energies,
y_rotational_diagram=y_rot_dia)
Z = compute_partition_function(energies_and_gs=molecular_data, temperature=temperature)
data['lte_coldens'] = coldens_z + np.log10(Z) #np.mean(log_n, axis=1)
plt.clf()
plt.scatter(10**data.lte_coldens, 10**data.molecule_column_density, alpha=0.3)
# sns.kdeplot(x=10**data.lte_coldens, y=10**data.molecule_column_density)
plt.loglog()
plt.plot([1e11, 1e17], [1e11, 1e17], color='red')
plt.xlabel(r'N(CH$_3$OH) LTE [cm$^{-2}$]')
plt.ylabel(r'N(CH$_3$OH) Model [cm$^{-2}$]')
plt.tight_layout()
plt.savefig(os.path.join('prs', 'output', 'lte_approx', 'lte_approximation_coldens.png'))
from mdl.mdl_execute_radmc_command import (get_command_options,
check_config)
from unittest import TestCase
class Test(TestCase):
def test_get_command_options(self):
config = {
'radmc_observation': {
'inclination': 30,
'position_angle': 45,
'imolspec': 1,
'iline': 3
}
}
options_set = get_command_options(config=config)
expected_result = {
'incl 30',
'phi 45',
'imolspec 1',
'iline 3'
}
self.assertSetEqual(options_set, expected_result)
def test_check_config_pass(self):
config = {
'inclination': 0,
'position_angle': 0
}
self.assertTrue(check_config(config))
def test_check_config_fail(self):
config = {
'inclination': 0,
'imolspec': 1
}
self.assertFalse(check_config(config))
import numpy as np
from unittest import TestCase
from prs.prs_density_inference import (get_probability_density_threshold,
get_probability_distribution,
get_results,
get_hpd_interval)
class Test(TestCase):
def test_get_probability_density_threshold(self):
x_array = np.arange(8)
probability_density = np.array([0, 5, 15, 30, 30, 15, 5, 0]) / 100.
expected_result = 0.025
probability, centered_probability_density, ordered_idxs = get_probability_distribution(
probability_density=probability_density,
x_array=x_array)
computed_threshold = get_probability_density_threshold(
ordered_probability=probability[ordered_idxs],
ordered_probability_density=centered_probability_density[ordered_idxs],
probability_threshold=0.05
)
self.assertAlmostEqual(expected_result, computed_threshold, 5)
def test_get_hpd_interval_last(self):
x_array = np.arange(8)
probability_density = np.array([0, 5, 15, 30, 10, 15, 15, 15]) / 100.
hpd_interval = get_hpd_interval(x_array=x_array,
probability_density=probability_density,
hpd_threshold=0.05,
interp_points=1000)
expected_hpd_interval = [1, 7]
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-2))
def test_get_hpd_interval_first(self):
x_array = np.arange(8)
probability_density = np.array([15, 15, 15, 10, 30, 15, 5, 0]) / 100.
hpd_interval = get_hpd_interval(x_array=x_array,
probability_density=probability_density,
hpd_threshold=0.05,
interp_points=1000)
expected_hpd_interval = [0, 6]
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-2))
def test_get_hpd_interval_double(self):
x_array = np.arange(8)
probability_density = np.array([10, 20, 15, 10, 30, 15, 5, 0]) / 100.
hpd_interval = get_hpd_interval(x_array=x_array,
probability_density=probability_density,
hpd_threshold=0.15,
interp_points=1000)
expected_hpd_interval = [0.5, 2, 3.25, 5]
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-2))
def test_get_hpd_interval_edge_case_below(self):
x_array = np.arange(8)
probability_density = np.array([30, 20, 15, 10, 10, 15, 5, 0]) / 100.
hpd_interval = get_hpd_interval(x_array=x_array,
probability_density=probability_density,
hpd_threshold=0.15,
interp_points=1000)
expected_hpd_interval = [0, 2, 5, 5]
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-2))
def test_get_hpd_interval_edge_case_constant(self):
x_array = np.arange(8)
probability_density = np.array([30, 20, 15, 15, 10, 10, 5, 0]) / 100.
hpd_interval = get_hpd_interval(x_array=x_array,
probability_density=probability_density,
hpd_threshold=0.1,
interp_points=1000)
expected_hpd_interval = [0, 5]
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-2))
def test_get_hpd_interval_edge_case_constant2(self):
x_array = np.arange(8)
probability_density = np.array([10, 10, 35, 25, 10, 10, 5, 0]) / 100.
hpd_interval = get_hpd_interval(x_array=x_array,
probability_density=probability_density,
hpd_threshold=0.1,
interp_points=1000)
expected_hpd_interval = [0, 5]
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-2))
def test_get_hpd_interval_edge_case_above(self):
x_array = np.arange(8)
probability_density = np.array([30, 20, 15, 10, 11, 15, 5, 1]) / 100.
hpd_interval = get_hpd_interval(x_array=x_array,
probability_density=probability_density,
hpd_threshold=0.1,
interp_points=1000)
expected_hpd_interval = [0, 5.5]
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-2))
def test_get_results(self):
x_array = np.arange(8)
probability_density = np.array([0, 5, 15, 30, 30, 15, 5, 0]) / 100.
computed_threshold, best_fit, hpd_interval = get_results(x_array=x_array,
probability_density=probability_density,
probability_threshold=0.05,
interp_points=1000)
expected_threshold = 0.025
expected_best_fit = 3
expected_hpd_interval = [0.5, 6.5]
self.assertAlmostEqual(expected_threshold, computed_threshold, 5)
self.assertEqual(expected_best_fit, best_fit)
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-2))
def test_get_results_asymmetric(self):
x_array = np.array([1, 2, 4, 8, 16, 32, 64, 128])
probability_density = np.array([0, 5, 15, 34, 40, 5, 1, 0]) / 100.
computed_threshold, best_fit, hpd_interval = get_results(x_array=x_array,
probability_density=probability_density,
probability_threshold=0.05,
interp_points=1000)
expected_threshold = 0.02557788944723618
expected_best_fit = 8
expected_hpd_interval = [1.5115, 51.5395]
self.assertAlmostEqual(expected_threshold, computed_threshold, 5)
self.assertEqual(expected_best_fit, best_fit)
self.assertTrue(np.allclose(np.array(expected_hpd_interval), hpd_interval, rtol=1e-4))
import os
import numpy as np
from stg.stg_radmc_input_generator import (get_solid_body_rotation_y,
get_grid_name)
from assets.commons.parsing import read_abundance_variation_schema
from astropy import units as u
from unittest import TestCase
from assets.commons import load_config_file
from assets.commons.grid_utils import (compute_molecular_number_density_hot_core,
extract_grid_metadata,
compute_power_law_radial_profile)
def create_test_config(config_dict: dict,
config_filename: str):
with open(config_filename, 'w') as config_file:
config_file.write('grid:\n')
for key in config_dict:
config_file.write(f' {key}: {config_dict[key]}\n')
class Test(TestCase):
def setUp(self):
self.config_filename = 'config.yml'
def test_compute_power_law_radial_profile(self):
profile = compute_power_law_radial_profile(central_value=2000.0,
power_law_index=0,
distance_matrix=np.array([
[np.sqrt(2), 1, np.sqrt(2)],
[1, 0, 1],
[np.sqrt(2), 1, np.sqrt(2)],
]),
maximum_radius=np.sqrt(2),
value_at_reference=15.0,
distance_reference=3e18)
expected_result = np.array([
[15, 15, 15],
[15, 15, 15],
[15, 15, 15],
])
self.assertTrue(np.allclose(profile, expected_result, 1.0e-5))
def test_read_abundance_variation_schema(self):
line_config = {
'species_to_include': ['e-ch3oh'],
'molecular_abundances': {
"e-ch3oh": 1e-9,
"p-h2": 1,
},
'hot_core_specs': {
"e-ch3oh": {
'threshold': 90,
'abundance_jump': 100
}
},
'lines_mode': 'lvg',
'collision_partners': ['p-h2']
}
result_dict = read_abundance_variation_schema(line_config=line_config)
expected_dict = {
'e-ch3oh': {
'threshold': 90,
'abundance_jump': 100
},
'p-h2': {
'threshold': 20,
'abundance_jump': 1
},
}
self.assertDictEqual(result_dict, expected_dict)
def test_compute_molecular_number_density_hot_core(self):
gas_number_density_profile = np.array([1, 1, 2, 3])
temperature_profile = np.array([10, 100, 200, 20])
abundance_array = compute_molecular_number_density_hot_core(
gas_number_density_profile=gas_number_density_profile,
abundance=1e-9,
temperature_profile=temperature_profile,
abundance_jump=100,
threshold=90)
expected_results = np.array([1e-9, 1e-7, 2e-7, 3e-9])
self.assertTrue(np.allclose(abundance_array, expected_results, 1e-7))
def test_get_grid_filename_composite(self):
grid_name = get_grid_name(method='composite_grid',
zip_filename='abc.def.zip',
quantity_name='h2_density')
self.assertEqual(grid_name, 'abc.def_h2_density.fits')
def test_get_grid_filename_missing_info(self):
with self.assertRaises(AssertionError):
_ = get_grid_name(method='composite_grid',
quantity_name='h2_density')
with self.assertRaises(AssertionError):
_ = get_grid_name(method='composite_grid',
zip_filename='abc.def.zip')
def test_get_grid_filename_undefined_method(self):
with self.assertRaises(NotImplementedError):
get_grid_name(method='puzzidilontano')
def test_get_solid_body_rotation_y(self):
_config_filename = os.path.join('test_files', self.config_filename)
grid_size = 3
npix = 3
config_dict = {
'grid_type': 'regular',
'coordinate_system': 'cartesian',
'central_density': '1e6',
'density_unit': 'cm^-3',
'dim1': {"size": grid_size, "size_units": "cm", "shape": npix, "refpix": 1},
'velocity_field': 'solid',
'velocity_gradient': 1,
'velocity_gradient_unit': "cm/s cm",
}
expected_result_x = [
[
[np.sqrt(2) * np.sin(np.pi / 4), 0, -np.sqrt(2) * np.sin(np.pi / 4)],
[np.sqrt(2) * np.sin(np.pi / 4), 0, -np.sqrt(2) * np.sin(np.pi / 4)],
[np.sqrt(2) * np.sin(np.pi / 4), 0, -np.sqrt(2) * np.sin(np.pi / 4)],
],
[
[1, 0, -1],
[1, 0, -1],
[1, 0, -1],
],
[
[np.sqrt(2) * np.sin(np.pi / 4), 0, -np.sqrt(2) * np.sin(np.pi / 4)],
[np.sqrt(2) * np.sin(np.pi / 4), 0, -np.sqrt(2) * np.sin(np.pi / 4)],
[np.sqrt(2) * np.sin(np.pi / 4), 0, -np.sqrt(2) * np.sin(np.pi / 4)],
],
]
expected_result_z = [
[
[-np.sqrt(2) * np.cos(np.pi / 4), -1.0, -np.sqrt(2) * np.cos(np.pi / 4)],
[-np.sqrt(2) * np.cos(np.pi / 4), -1.0, -np.sqrt(2) * np.cos(np.pi / 4)],
[-np.sqrt(2) * np.cos(np.pi / 4), -1.0, -np.sqrt(2) * np.cos(np.pi / 4)],
],
[
[0.0, 0.0, 0.0],
[0.0, 0.0, 0.0],
[0.0, 0.0, 0.0],
],
[
[np.sqrt(2) * np.cos(np.pi / 4), 1.0, np.sqrt(2) * np.cos(np.pi / 4)],
[np.sqrt(2) * np.cos(np.pi / 4), 1.0, np.sqrt(2) * np.cos(np.pi / 4)],
[np.sqrt(2) * np.cos(np.pi / 4), 1.0, np.sqrt(2) * np.cos(np.pi / 4)],
],
]
create_test_config(config_dict=config_dict,
config_filename=_config_filename)
config = load_config_file(_config_filename)
grid_metadata = extract_grid_metadata(config=config)
velocity_x, velocity_z = get_solid_body_rotation_y(grid_metadata=grid_metadata)
self.assertTrue(np.array_equal(np.array(expected_result_x), velocity_x.value))
self.assertTrue(np.array_equal(np.array(expected_result_z), velocity_z.value))