diff --git a/Makefile b/Makefile index 13371a8f2d4e1c8e61570bd0ce23755f95f2a21f..cb3e186f36e9a7c599ec0beea574bd5a93628370 100644 --- a/Makefile +++ b/Makefile @@ -33,5 +33,4 @@ upload-war-deb: curl --header "PRIVATE-TOKEN: glpat-JhqpFhEGvxuVzHqxjwqx" --upload-file vlkb-soda-$(VERSION).war https://ict.inaf.it/gitlab/api/v4/projects/1780/packages/generic/vlkb-soda/1.7/vlkb-soda-$(VERSION).war curl --header "PRIVATE-TOKEN: glpat-JhqpFhEGvxuVzHqxjwqx" --upload-file vlkb-$(VERSION).deb https://ict.inaf.it/gitlab/api/v4/projects/1780/packages/generic/vlkb-soda/1.7/ubuntu22/vlkb-$(VERSION).deb curl --header "PRIVATE-TOKEN: glpat-JhqpFhEGvxuVzHqxjwqx" --upload-file vlkb-obscore-$(VERSION).deb https://ict.inaf.it/gitlab/api/v4/projects/1780/packages/generic/vlkb-soda/1.7/ubuntu22/vlkb-obscore-$(VERSION).deb - curl --header "PRIVATE-TOKEN: glpat-JhqpFhEGvxuVzHqxjwqx" --upload-file vlkbd-$(VERSION).deb https://ict.inaf.it/gitlab/api/v4/projects/1780/packages/generic/vlkb-soda/1.7/ubuntu22/vlkbd-$(VERSION).deb diff --git a/data-access/engine/Makefile b/data-access/engine/Makefile index d44feed36de73e02dcf9abed3627135712991e67..5c829ac91e2b2b7320f5a6da4f0ab0dcacaed5ac 100644 --- a/data-access/engine/Makefile +++ b/data-access/engine/Makefile @@ -1,7 +1,6 @@ PREFIX ?= /usr/local -AMQP_QUEUE ?= vlkbdevel INST_PATH ?= $(PREFIX) VERSION ?= $(shell git describe) @@ -13,14 +12,12 @@ all : build .PHONY: build build: make -C src/common VERSION=$(VERSION) - make -C src/vlkbd VERSION=$(VERSION) make -C src/vlkb-obscore VERSION=$(VERSION) make -C src/vlkb VERSION=$(VERSION) .PHONY: clean clean: make -C src/common clean - make -C src/vlkbd clean make -C src/vlkb-obscore clean make -C src/vlkb clean @@ -31,15 +28,11 @@ install: mkdir -p $(INST_PATH)/bin install ./src/vlkb/bin/vlkb $(INST_PATH)/bin install ./src/vlkb-obscore/bin/vlkb-obscore $(INST_PATH)/bin - install ./src/vlkbd/bin/vlkbd $(INST_PATH)/bin - install vlkbd_exec.sh $(INST_PATH)/bin .PHONY: uninstall uninstall: rm -f $(INST_PATH)/bin/vlkb rm -f $(INST_PATH)/bin/vlkb-obscore - rm -f $(INST_PATH)/bin/vlkbd - rm -f $(INST_PATH)/bin/vlkbd_exec.sh @@ -53,25 +46,8 @@ vlkb-devel: stop uninstall clean build install config start .PHONY: config config: mkdir -p $(INST_PATH)/etc/vlkb-obscore - mkdir -p $(INST_PATH)/etc/vlkbd cp config/vlkb-obscore.datasets.conf $(INST_PATH)/etc/vlkb-obscore/datasets.conf - cp config/vlkbd.datasets.conf $(INST_PATH)/etc/vlkbd/datasets.conf -.PHONY: start -start: - vlkbd_exec.sh localhost $(AMQP_QUEUE) $(INST_PATH)/etc/vlkbd/datasets.conf - -.PHONY: stop -stop: - pkill -f 'vlkbd.* $(AMQP_QUEUE).*$(INST_PATH)/etc/vlkbd/datasets.conf' - -.PHONY: status -status: - ps ax | grep vlkbd - -.PHONY: reload -reload: stop start - diff --git a/data-access/engine/src/vlkbd/Makefile b/data-access/engine/src/vlkbd/Makefile deleted file mode 100644 index ca69708c62a074f6f34abcd0acbb34ac515a006f..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/Makefile +++ /dev/null @@ -1,83 +0,0 @@ -#================================================================================ -EXEC_NAME = vlkbd -VERSION ?= $(shell git describe) -BUILD_ ?= $(shell LANG=us_US date; hostname) -#================================================================================ -DEPS_DIR := ../common ../../ext/aria-csv ../../ext/nlohmann-json -DEPS_INC := $(foreach d, $(DEPS_DIR), $d/include) -DEPS_LIB := $(foreach d, $(DEPS_DIR), $d/lib) -#================================================================================ -COMMON_DIR=../common -COMMON_LIB = $(COMMON_DIR)/lib/libvlkbcommon.a -#================================================================================ -INC_DIR=src $(DEPS_INC) $(COMMON_DIR)/include ../../ext \ - /usr/include/cfitsio \ - /usr/include/postgresql -LIB_DIR= $(DEPS_LIB) $(COMMON_DIR)/lib /usr/lib64/ast /usr/local/lib -#================================================================================ -CC=g++ -CXX_DEBUG_FLAGS=-g -DFDB_DEBUG -CXX_RELEASE_FLAGS=-O2 -CXX_DEFAULT_FLAGS=-c -x c++ -std=c++11 -fPIC -Wall -Wextra -Wconversion -fno-common -DVERSIONSTR='"$(VERSION)"' -DBUILD='"$(BUILD_)"' -# FIXME: -last_pal missing in some builds (not realluy needed only for linker) -LDFLAGS = -Wall -lvlkbcommon -lpq -lpqxx -lcfitsio -lrabbitmq -last -last_grf_2.0 -last_grf_3.2 -last_grf_5.6 -last_grf3d -last_err -pthread -lstdc++ -lm -INC_PARM=$(foreach d, $(INC_DIR), -I$d) -LIB_PARM=$(foreach d, $(LIB_DIR), -L$d) -#================================================================================ -SRC_DIR=src -OBJ_DIR=obj -BIN_DIR=bin -#================================================================================ -EXECUTABLE := $(BIN_DIR)/$(EXEC_NAME) -CPP_FILES := $(wildcard $(SRC_DIR)/*.cpp) -OBJ_FILES := $(addprefix $(OBJ_DIR)/,$(notdir $(CPP_FILES:.cpp=.o))) -#================================================================================ -NPROCS = $(shell grep -c 'processor' /proc/cpuinfo) -MAKEFLAGS += -j$(NPROCS) -#================================================================================ - -.PHONY: all -all : debug - -.PHONY: release -release: CXXFLAGS+=$(CXX_RELEASE_FLAGS) $(CXX_DEFAULT_FLAGS) -release: $(EXECUTABLE) - -.PHONY: debug -debug: CXXFLAGS+=$(CXX_DEBUG_FLAGS) $(CXX_DEFAULT_FLAGS) -debug: $(EXECUTABLE) - -$(EXECUTABLE) : $(COMMON_LIB) makedir $(OBJ_FILES) - $(CC) $(OBJ_FILES) $(LIB_PARM) $(LDFLAGS) -o $@ - -$(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp - $(CC) $(CXXFLAGS) $(INC_PARM) -o $@ $< - -.PHONY: $(COMMON_LIB) -$(COMMON_LIB) : - make -C $(COMMON_DIR) - -.PHONY: makedir -makedir: - -mkdir -p $(OBJ_DIR) $(BIN_DIR) - - -.PHONY: clean -clean : - -rm -fr $(OBJ_DIR) $(BIN_DIR) - - - -.PHONY: test -test : - @tabs 20 - @echo -e "EXEC_NAME:\t" $(EXEC_NAME) - @echo -e "VERSION:\t" $(VERSION) - @echo -e "CPP_FILES:\t" $(CPP_FILES) - @echo -e "OBJ_FILES:\t" $(OBJ_FILES) - @echo -e "C_FILES:\t" $(C_FILES) - @echo -e "C_OBJ_FILES:\t" $(C_OBJ_FILES) - @echo -e "INC_PARM:\t" $(INC_PARM) - @echo -e "LIB_PARM:\t" $(LIB_PARM) - - diff --git a/data-access/engine/src/vlkbd/src/config.cpp b/data-access/engine/src/vlkbd/src/config.cpp deleted file mode 100644 index 7f6b41778a69436ae1e2c6e71a18b1407c960435..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/config.cpp +++ /dev/null @@ -1,76 +0,0 @@ - -#include "io.hpp" -#include "config.hpp" - -#include <iostream> -#include <fstream> -#include <sstream> -#include <map> - -/*/ C -#include <stdio.h> -#include <stdlib.h> // atoi needed -#include <string.h> -*/ - -using namespace std; - -void config::read_config(const std::string & settings_path) -{ - std::ifstream settings_file(settings_path); - std::string line; - - LOG_STREAM << "config::read_config()" << endl; - - if (settings_file.fail()) - { - LOG_STREAM << "config file does not exist. Default options used." << endl; - - return; - } - - - while (std::getline(settings_file, line)) - { - std::istringstream iss(line); - std::string id, eq, val; - - if (std::getline(iss, id, '=')) - { - if (std::getline(iss, val)) - { - if (m_settings.find(id) != m_settings.end()) - { - if (val.empty()) - { - LOG_STREAM << "config " << id.c_str() - << " is empty. Keeping default " << m_settings[id].c_str() << endl; - } - else - { - m_settings[id] = val; - LOG_STREAM << "config " << id.c_str() - <<" read as " << m_settings[id].c_str() << endl; - } - } - else - { - //Not present in map - LOG_STREAM << "Setting "<< id.c_str() << " not defined, ignoring it" << endl; - continue; - } - } - else - { - // Comment line, skiping it - continue; - } - } - else - { - //Empty line, skipping it - continue; - } - } -} - diff --git a/data-access/engine/src/vlkbd/src/config.hpp b/data-access/engine/src/vlkbd/src/config.hpp deleted file mode 100644 index c6f9e98c72a938ba647912e2d369a53bb44cf082..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/config.hpp +++ /dev/null @@ -1,114 +0,0 @@ - -#ifndef CONFIG_HPP -#define CONFIG_HPP - -#include <string> -#include <map> - - -class config -{ - public: - - void read_config(const std::string & settings_path); - - std::string getLogDir() const {return m_settings.at(log_dir);} - std::string getLogFileName() const {return m_settings.at(log_filename);} - -// std::string getAuthority() const {return m_settings.at(ivoid_authority);} -// std::string getResourceKey() const {return m_settings.at(ivoid_resource_key);} -// std::string getObsCorePublisher() const {return std::string{"ivo://"} + getAuthority() + std::string{"/"} + getResourceKey();} -// std::string getObsCoreAccessFormat() const {return m_settings.at(obscore_access_format);} - -// std::string getRemoteFitsDir() const {return m_settings.at(fits_url);} - std::string getFitsDir() const {return m_settings.at(fits_dir);} - std::string getFitsCutDir() const {return m_settings.at(fits_cutdir);} -/* - std::string getDbms() const {return m_settings.at(db_dbms);} - std::string getDbHostName() const {return m_settings.at(db_host_name);} - std::string getDbPort() const {return m_settings.at(db_port);} - std::string getDbSchema() const {return m_settings.at(db_schema);} - std::string getDbName() const {return m_settings.at(db_name);} - std::string getDbUserName() const {return m_settings.at(db_user_name);} - std::string getDbPassword() const {return m_settings.at(db_password);} - - std::string getDbUri(bool with_password = false) const - {return - m_settings.at(db_dbms) - + "://" - + m_settings.at(db_user_name) - + (with_password ? ":"+m_settings.at(db_password) : "") - + "@" - + m_settings.at(db_host_name) - + ":" - + m_settings.at(db_port) - + "/" - + m_settings.at(db_name); - } - - std::string getDbPostgresConnectString(bool with_password = false) const - {return - "dbname = " + m_settings.at(db_name) - + " port = " + m_settings.at(db_port) - + " host = " + m_settings.at(db_host_name) - + " user = " + m_settings.at(db_user_name) - + (with_password ? " password = " + m_settings.at(db_password) : "") - + " options=\'-c search_path=" + m_settings.at(db_schema) + "\'"; - } -*/ - private: - std::string value(std::string key) {return m_settings.at(key);} - - const std::string fits_dir{"fits_path_surveys"}; - const std::string fits_cutdir{"fits_path_cutouts"}; - - const std::string log_dir{"log_dir"}; - const std::string log_filename{"log_filename"}; - -// const std::string fits_url{"fits_url_surveys"}; -/* const std::string ivoid_authority{"ivoid_authority"}; - const std::string ivoid_resource_key{"ivoid_resource_key"}; - const std::string obscore_access_format{"obscore_access_format"}; - - const std::string db_dbms{"db_dbms"}; - const std::string db_host_name{"db_host_name"}; - const std::string db_port{"db_port"}; - const std::string db_user_name{"db_user_name"}; - const std::string db_password{"db_password"}; - const std::string db_name{"db_name"}; - const std::string db_schema{"db_schema"}; -*/ - //------------------------------------------------- - // defaults - //------------------------------------------------- - - const std::string empty_string; - - std::map<const std::string, std::string> m_settings - { - {fits_dir, "/srv/surveys"}, - {fits_cutdir, "/srv/cutouts"}, - - {log_dir, "/tmp"}, - {log_filename, "vlkbd.log"}, -/* - {ivoid_authority, empty_string}, - {ivoid_resource_key, empty_string}, - {obscore_access_format, "application/fits"}, - - {fits_url, empty_string}, - - {db_dbms, empty_string}, - {db_host_name, empty_string}, - {db_port, empty_string}, - {db_user_name, empty_string}, - {db_password, empty_string}, - {db_name, empty_string}, - {db_schema, empty_string} -*/ - }; -}; - - -#endif - diff --git a/data-access/engine/src/vlkbd/src/json_reply.cpp b/data-access/engine/src/vlkbd/src/json_reply.cpp deleted file mode 100644 index 8ceb02e0faa822c4cedf6be8eb9d3a18e4e94690..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/json_reply.cpp +++ /dev/null @@ -1,17 +0,0 @@ - -#include "json_reply.hpp" -#include "json.hpp" -#include "io.hpp" - -#include <vector> -#include <string> -#include <stdexcept> - -#include <stdlib.h> -#include <string.h> -#include <sched.h> // sched_getcpu() - -using namespace std; - -const string ENGINE_VERSION{"engine version " + string(VERSIONSTR) + " " + string(BUILD) + " on CPU#"+to_string(sched_getcpu())}; - diff --git a/data-access/engine/src/vlkbd/src/json_reply.hpp b/data-access/engine/src/vlkbd/src/json_reply.hpp deleted file mode 100644 index 53c7248dd0e315471c5396cc0904776d89fda451..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/json_reply.hpp +++ /dev/null @@ -1,26 +0,0 @@ - -#include "cutout.hpp" -#include "cutout_nljson.hpp" -#include "mcutout.hpp" -#include "mcutout_nljson.hpp" -#include "json.hpp" - -#include <vector> -#include <string> - -/* All nlohmann-json exception are json::exception <- std::exception. - * So let them be caught by std::excpetion as 'Internal errors' in rpc-call's infinite loop, - * assuming all API syntactic errors were caught in servlet API parser */ - -class json_reply -{ - public: - json_reply() {j = nlohmann::json::object();}; - void put_cutout_result(cutout_res_s res) { j = res; }; - void put_mcutout_result(mcutout_res_s res) { j = res; }; - std::string json_str() { return j.dump();}; - - private: - nlohmann::json j; -}; - diff --git a/data-access/engine/src/vlkbd/src/json_request.cpp b/data-access/engine/src/vlkbd/src/json_request.cpp deleted file mode 100644 index 60574afd64d378041e5a8f3f340f044fad0962e8..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/json_request.cpp +++ /dev/null @@ -1,80 +0,0 @@ - - -#include "json_request.hpp" -#include "io.hpp" - -#include "cutout.hpp" -#include "cutout_nljson.hpp" -#include "mcutout.hpp" -#include "mcutout_nljson.hpp" - - -#include <stdexcept> -#include "json.hpp" - -#include <iostream> - -using json = nlohmann::json; -const bool ASSERTS = true; - -using namespace std; - - - - -NLOHMANN_JSON_SERIALIZE_ENUM( service, { - {MCUTOUT,"MCUTOUT"}, - {MERGEF, "MERGEF"}, - {MERGE1, "MERGE1"}, - {MERGE2, "MERGE2"}, - {MERGE3, "MERGE3"}, - {SUBIMG, "SUBIMG"} - }); - - -json_request::json_request(string request_json) -{ - LOG_trace(__func__); - - m_jservice = json::parse(request_json, nullptr, ASSERTS); - m_service = m_jservice.at("service"); - - LOG_STREAM << m_jservice.dump() << endl; -} - -struct coordinates json_request::coordinates() {return m_jservice.at("coordinates");} - -vector<struct cut_param_s> json_request::cut_params() -{ - LOG_trace(__func__); - - vector<struct cut_param_s> cut_pars; - json cuts = m_jservice.at("cuts"); - cut_pars = cuts.get<std::vector<struct cut_param_s>>(); - - return cut_pars; -} - -std::vector<struct fits_card> json_request::extra_cards() -{ - vector<struct fits_card> cards; - if(m_jservice.contains("extra_cards")) - { - json jcards = m_jservice.at("extra_cards"); - cards = jcards.get<std::vector<struct fits_card>>(); - } - return cards; -} - - -std::vector<std::string> json_request::get_pol() -{ - vector<string> str; - if(m_jservice.contains("pol")) - { - json j = m_jservice.at("pol"); - str = j.get<std::vector<string>>(); - } - return str; -} - diff --git a/data-access/engine/src/vlkbd/src/json_request.hpp b/data-access/engine/src/vlkbd/src/json_request.hpp deleted file mode 100644 index 178da92b37d9ee22c1f681446524d4449a430aac..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/json_request.hpp +++ /dev/null @@ -1,63 +0,0 @@ - -#include "cutout.hpp" -#include "cutout_nljson.hpp" -#include "mcutout.hpp" -#include "mcutout_nljson.hpp" - -#include "json.hpp" -#include <string> - -using json = nlohmann::json; - -/* All nlohmann-json exception are json::exception <- std::exception. - * So let them be caught by std::excpetion as 'Internal errors' in rpc-call's infinite loop, - * assuming all API syntactic errors were caught in servlet API parser */ - -enum service {SEARCH, CUTOUT, MCUTOUT, MERGEF, MERGE1, MERGE2, MERGE3, SUBIMG}; - -class json_request -{ - public: - - json_request(std::string request_json); - - bool is_search() {return m_service == SEARCH;} - bool is_cutout() {return m_service == CUTOUT;} - bool is_mcutout() {return m_service == MCUTOUT;} - bool is_mergefiles() {return m_service == MERGEF;} - bool is_mergefiles_common_header() {return m_service == MERGE1;} - bool is_mergefiles_reproject() {return m_service == MERGE2;} - bool is_mergefiles_add_reprojected() {return m_service == MERGE3;} - bool is_subimg() {return m_service == SUBIMG;} - - std::string pubdid() {return m_jservice.at("pubdid");} - - struct coordinates coordinates(); - - bool count_null_values(){return m_jservice.at("count_null_values");} - - std::vector<struct cut_param_s> cut_params(); - - std::string merge_id() {return m_jservice.at("merge_id");} - std::string dimensionality() {return m_jservice.at("dimensionality");} - std::vector<std::string> files_to_merge() {return m_jservice.at("files_to_merge");} - std::string fitsfilename() {return m_jservice.at("fits_filename");} - - /* SUBIMG */ - - std::string abs_subimg_pathname() {return m_jservice.at("subimg_filename");} - std::string img_pathname() {return m_jservice.at("img_pathname");} - int img_hdunum() {return m_jservice.at("img_hdunum");} - std::vector<struct fits_card> extra_cards(); - - /* new: no coordinates instead separate pos band time pol */ - - position get_pos() { return (m_jservice.contains("pos") ? (position) m_jservice.at("pos") : pos_none ); } - band get_band() { return (m_jservice.contains("band") ? (band) m_jservice.at("band") : band_none); } - time_axis get_time() { return (m_jservice.contains("time") ? (time_axis) m_jservice.at("time") : time_none); } - std::vector<std::string> get_pol(); - - private: - json m_jservice; - service m_service; -}; diff --git a/data-access/engine/src/vlkbd/src/json_service_call.cpp b/data-access/engine/src/vlkbd/src/json_service_call.cpp deleted file mode 100644 index 647ea9eba1bd83f58ff16cd04c794817fa721662..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/json_service_call.cpp +++ /dev/null @@ -1,159 +0,0 @@ - -// convert parameters to/from JSON and call vlkb services - -// NOTE merge_id -// jobId exists if we run under UWS (MERGE1,2,3) -// create merge-id = amqp-queuename + jobId -// such id is uniq when running MERGE parallel and more deploys -// of vlkb-dtasets present against the same amqp-broker - -#include "json_service_call.hpp" -#include "config.hpp" -#include "io.hpp" -#include "cutout.hpp" -#include "cutout_nljson.hpp" -#include "mcutout.hpp" -#include "mcutout_nljson.hpp" -#include "json_request.hpp" -#include "json_reply.hpp" - -#include "fitsfiles.hpp" // calc_nullvals - -#include <stdexcept> -#include <vector> -#include <string> - - -using namespace std; - - -string to_string(service_error serr) -{ - string str; - switch(serr) - { - case service_error::INVALID_PARAM: return "INVALID_PARAM"; break; - case service_error::SYSTEM_ERROR: return "SYSTEM_ERROR"; break; - } - - LOG_STREAM << string(__FILE__) << ":" << to_string(__LINE__) << "unrecognized value in service_error type" << endl; - - return str; -} - - - -string service_exception(service_error error, const string what) -{ - LOG_trace(__func__); - - json jreply; - - switch(error) - { - case service_error::INVALID_PARAM: - jreply["exception"] = { {"type","INVALID_PARAM"}, {"msg",what} }; - break; - case service_error::SYSTEM_ERROR: - jreply["exception"] = { {"type","SYSTEM_ERROR"}, {"msg",what} }; - break; - } - - LOG_STREAM << to_string(error) + ": " + what << endl; - - return jreply.dump(); -} - - - -string service_call(string request_json, string queuename, config conf) -{ - LOG_trace(__func__); - - const string setts_fitsdir{conf.getFitsDir()}; - const string setts_fitscutdir{conf.getFitsCutDir()}; - - LOG_STREAM << request_json << endl; - - json_request req(request_json); - - json_reply reply; - - if(req.is_subimg()) - { - cutout_res_s cutres = do_cutout_file( - req.img_pathname(), req.img_hdunum(), - req.get_pos(), req.get_band(), req.get_time(), req.get_pol(), - req.count_null_values(), - req.extra_cards(), - setts_fitsdir, - setts_fitscutdir); - - reply.put_cutout_result(cutres); - } - else if(req.is_mcutout()) - { - struct mcutout_res_s mres = mcutout(req.cut_params(), setts_fitsdir, setts_fitscutdir); - - mres.tgz_filename = setts_fitscutdir + "/" + mres.tgz_filename; - mres.filesize = fitsfiles::fileSize(mres.tgz_filename); - reply.put_mcutout_result(mres); - } - else if(req.is_mergefiles()) - { - string mergedfile_pathname; - - unsigned long fsize = xmergefiles( - req.files_to_merge(), - req.dimensionality(), - setts_fitscutdir, setts_fitscutdir, - mergedfile_pathname); - - cutout_res_s cutres{ fsize, mergedfile_pathname, {-1.0, 0, 0} }; - reply.put_cutout_result(cutres); - } - else if(req.is_mergefiles_common_header()) - { - string merge_id(queuename + "_" + req.merge_id()); - - xmergefiles_common_header( - merge_id, - req.files_to_merge(), - req.dimensionality(),//FIXME convert to int: dimensionslity - setts_fitscutdir, setts_fitscutdir); - } - else if(req.is_mergefiles_reproject()) - { - string merge_id(queuename + "_" + req.merge_id()); - - xmergefiles_reproject( - merge_id, - req.fitsfilename(), - req.dimensionality(),//FIXME convert to int: dimensionslity - setts_fitscutdir, setts_fitscutdir); - } - else if(req.is_mergefiles_add_reprojected()) - { - string merge_id(queuename + "_" + req.merge_id()); - - string mergedfile_pathname; - - unsigned long fsize = xmergefiles_add_reprojected( - merge_id, - req.dimensionality(), - setts_fitscutdir, setts_fitscutdir, - mergedfile_pathname); - - cutout_res_s cutres{ fsize, mergedfile_pathname, {-1.0, 0, 0} }; - reply.put_cutout_result(cutres); - } - else - { - throw std::runtime_error("unrecognized vlkb service"); - } - - return reply.json_str(); -} - - - diff --git a/data-access/engine/src/vlkbd/src/json_service_call.hpp b/data-access/engine/src/vlkbd/src/json_service_call.hpp deleted file mode 100644 index 9e9996dfcc6f0888cadd61bd757876bcc46b3f7f..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/json_service_call.hpp +++ /dev/null @@ -1,21 +0,0 @@ - -// in service_call(): -// queuename serves as identifier for merge working-dirs, to distinguish -// more instances of VLKB running on the same broker: each instance -// needs separate queue for rpc -// FIXME qname should come from config file - - -#ifndef JSON_SERVICE_CALL_HPP -#define JSON_SERVICE_CALL_HPP - -#include "config.hpp" -#include <string> - -std::string service_call(std::string request_json, std::string queuename, config conf); -enum class service_error {INVALID_PARAM, SYSTEM_ERROR}; - -std::string service_exception(enum service_error error, std::string what); - -#endif - diff --git a/data-access/engine/src/vlkbd/src/rpc_amqp.cpp b/data-access/engine/src/vlkbd/src/rpc_amqp.cpp deleted file mode 100644 index 09c02a3491945632565a4e3ec1a28d176199d961..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/rpc_amqp.cpp +++ /dev/null @@ -1,434 +0,0 @@ - -// RPC over AMQP - -#include "rpc_amqp.hpp" -#include "config.hpp" - -#include "rpc_amqp_utils.hpp" -#include "json_service_call.hpp" - -#include <stdexcept> -#include <string> - -#include <stdio.h> -#include <syslog.h> - -#include <amqp_tcp_socket.h> -#include <amqp.h> -#include <amqp_framing.h> - -#include "io.hpp" - -using namespace std; - -// error handling - - -void throw_ex_on_amqp_error(amqp_rpc_reply_t rc, amqp_connection_state_t conn, amqp_channel_t channel, char const *context) -{ - - std::string ct(context); - - if(rc.reply_type != AMQP_RESPONSE_NORMAL) - { - amqp_rpc_reply_t rc_ch_close = amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS); - if(rc_ch_close.reply_type != AMQP_RESPONSE_NORMAL) - throw std::runtime_error("cannot close channel after unsuccessful " + ct); - - amqp_rpc_reply_t rc_conn_close = amqp_connection_close(conn, AMQP_REPLY_SUCCESS); - if(rc_conn_close.reply_type != AMQP_RESPONSE_NORMAL) - throw std::runtime_error("cannot close connection after unsuccessful " + ct); - - if(AMQP_STATUS_OK != amqp_destroy_connection(conn)) - throw std::runtime_error("cannot end connection after unsuccessful " + ct); - else - throw std::runtime_error(ct + " failed"); - } -} - - - -void syslog_on_amqp_error(amqp_rpc_reply_t x, char const *context) -{ - switch (x.reply_type) { - case AMQP_RESPONSE_NORMAL: - return; - - case AMQP_RESPONSE_NONE: - syslog(LOG_ERR, "%s: missing RPC reply type!\n", context); - break; - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - syslog(LOG_ERR, "%s: %s\n", context, amqp_error_string2(x.library_error)); - break; - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: - { - amqp_connection_close_t *m = - (amqp_connection_close_t *)x.reply.decoded; - syslog(LOG_ERR, "%s: server connection error %uh, message: %.*s\n", - context, m->reply_code, (int)m->reply_text.len, - (char *)m->reply_text.bytes); - break; - } - case AMQP_CHANNEL_CLOSE_METHOD: - { - amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded; - syslog(LOG_ERR, "%s: server channel error %uh, message: %.*s\n", - context, m->reply_code, (int)m->reply_text.len, - (char *)m->reply_text.bytes); - break; - } - default: - syslog(LOG_ERR, "%s: unknown server error, method id 0x%08X\n", - context, x.reply.id); - break; - } - break; - } -} - - - -// AMQP RPC -// -// establish connection to RabbitMQ-broker on "conn" and channel=1 -// use this connection [conn,channel] to: -// * create queue where Java-vlkb-client will put messages (queuename must match routingKey of Java-client config file) -// * bind the queue to pre-defined exchange "amq.direct" -// * ask the broker to start basic-consumer on that queue -// WAIT: Consume message from "conn" -// Create new reply-message with CorrdId from received message -// * publish the reply-msg to reply-to queue -// return to WAIT: ... loop forever - -amqp_connection_state_t login_to_broker(const string user_name, const string password, - const string hostname, int port) -{ - // allocate new conn and initialize - // NOTE: must destroy conn at exit - - amqp_connection_state_t conn = amqp_new_connection(); - if(conn == NULL) - throw std::runtime_error("cannot create new connection"); - - - { // open new TCP-socket and store in conn - - amqp_socket_t *socket = NULL; - socket = amqp_tcp_socket_new(conn); - if (socket == NULL) - { - if(AMQP_STATUS_OK != amqp_destroy_connection(conn)) - throw std::runtime_error("cannot end connection after unsuccessful new TCP socket"); - else - throw std::runtime_error("error creating TCP socket"); - } - int status; - status = amqp_socket_open(socket, hostname.c_str(), port); - if (status != 0) - { - if(AMQP_STATUS_OK != amqp_destroy_connection(conn)) - throw std::runtime_error("cannot end connection after unsuccessful socket open"); - else - throw std::runtime_error("error opening TCP socket");// FIXME add status to msg - } - } - - - amqp_rpc_reply_t rc; - rc = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user_name.c_str(), password.c_str()); - if(rc.reply_type != AMQP_RESPONSE_NORMAL) - { - amqp_rpc_reply_t rc_close = amqp_connection_close(conn, AMQP_REPLY_SUCCESS); - if(rc_close.reply_type != AMQP_RESPONSE_NORMAL) - throw std::runtime_error("cannot close connection after unsuccessful amqp login"); - else if(AMQP_STATUS_OK != amqp_destroy_connection(conn)) - throw std::runtime_error("cannot end connection after unsuccessful amqp login"); - else - throw std::runtime_error("amqp_login failed"); - } - - return conn; -} - - - -// RPC-loop - - - -int channel_open(amqp_connection_state_t conn, amqp_channel_t channel) -{ - amqp_channel_open(conn, channel); - amqp_rpc_reply_t rep = amqp_get_rpc_reply(conn); - - return (rep.reply_type != AMQP_RESPONSE_NORMAL); -} - - - -void declare_nondurable_autodelete_queue( - amqp_connection_state_t conn, amqp_channel_t channel, - amqp_bytes_t queuename) -{ - amqp_queue_declare(conn, channel, - queuename, - 0, // 'passive' guarantees that this client sees the queue which was created already - 0, // 'durable' queue survives broker restarts - 0, // 'exclusive' to current connection (queue deleted when conn closes) - 1, // 'auto_delete' the queue when not used - amqp_empty_table); // amqp_table_t arguments specific for AMQP broker implementation (none in RabbitMQ) -} - - - -// start a queue consumer (e.g. start delivering msgs from the queue to this client) -// broker-implementation should support at least 16 consumers per queue -void start_basic_consumer_noack( - amqp_connection_state_t conn, amqp_channel_t channel, - amqp_bytes_t queuename) -{ - amqp_basic_consume(conn, channel, - queuename, - amqp_empty_bytes, // consumer_tag amqp_bytes_t: consumer-identifier (if empty, server generates a tag) - 0, // no_local amqp_boolean_t: broker will not send msgs to connection which published them - 1, // no_ack amqp_boolean_t: broker does not expect acknowledgement for delivered msgs - 0, // exclusive amqp_boolean_t : only this consumer can access the queue - amqp_empty_table); // arguments amqp_table_t: implementation specific args (not used in RabbitMQ) -} - - - -int consume_message_wait_forever(amqp_connection_state_t conn, amqp_envelope_t *envelope) -{ - // release memory associated with all channels - amqp_maybe_release_buffers(conn); - - amqp_rpc_reply_t res = amqp_consume_message(conn, - envelope, // message in envelope - NULL, // timeout (struct *) - 0); // flags (int) AMQP_UNUSED - - if (AMQP_RESPONSE_NORMAL != res.reply_type) - { - syslog_on_amqp_error(res, "amqp_consume_message"); - } - - return (AMQP_RESPONSE_NORMAL != res.reply_type); -} - - - -void basic_publish_on_queue_or_drop(amqp_connection_state_t conn, amqp_channel_t channel, - amqp_bytes_t queuename, - amqp_bytes_t correlation_id, - const char * msg_buff) -{ - amqp_basic_properties_t props; - - props._flags = - AMQP_BASIC_CONTENT_TYPE_FLAG | - AMQP_BASIC_DELIVERY_MODE_FLAG | - AMQP_BASIC_CORRELATION_ID_FLAG; - props.content_type = amqp_cstring_bytes("application/json");// FIXME make sure encoding is UTF-8 - //props.content_type = amqp_cstring_bytes("text/plain"); - props.delivery_mode = 2; - // 1: non-persistent - // 2: persistent (delivered even if broker re-boots - msg held on hard disk) - props.correlation_id = correlation_id; - - int rc = amqp_basic_publish(conn, channel, - amqp_empty_bytes, // exchange amqp_bytes_t: empty = default-exchange - queuename, // routingKey := queuename amqp_bytes_t - 0, // mandatory amqp_boolean_t 0: drop the msg if cannot be routed (1: return msg) - 0, // immediate amqp_boolean_t 0: queue the msg if cannot be routed immediately (1: -"-) - &props, // amqp_basic_properties_t - amqp_cstring_bytes(msg_buff)); // body - - if (rc < 0) - { - syslog(LOG_ERR, "%s: basic publish failed with %uh, message: %s\n",__func__, rc, amqp_error_string2(rc)); - } -} - - - -// run RPC-loop -// even if error happens on consume-request or publish-response -void rpc_loop_forever( - amqp_connection_state_t conn, amqp_channel_t channel, - const string queuename, - const string settings_pathname) -{ - if(channel_open(conn, channel)) - { - amqp_rpc_reply_t rep_close = amqp_connection_close(conn, AMQP_REPLY_SUCCESS); - if(rep_close.reply_type != AMQP_RESPONSE_NORMAL) - throw std::runtime_error("cannot close connection after unsuccessful channel open"); - else if(AMQP_STATUS_OK != amqp_destroy_connection(conn)) - throw std::runtime_error("cannot end connection after unsuccessful channel open"); - else - throw std::runtime_error("channel open failed"); - } - - - declare_nondurable_autodelete_queue(conn, channel, amqp_cstring_bytes(queuename.c_str())); - throw_ex_on_amqp_error(amqp_get_rpc_reply(conn), conn, channel, "amqp queue declare"); - - start_basic_consumer_noack(conn, channel, amqp_cstring_bytes(queuename.c_str())); - throw_ex_on_amqp_error(amqp_get_rpc_reply(conn), conn, channel, "amqp basic consume"); - - syslog(LOG_INFO,"AMQP initialized. Run RPC loop."); - - config conf; - conf.read_config(settings_pathname); - syslog(LOG_INFO, string("Will log to " + conf.getLogDir()).c_str()); - - for (;;) - { - amqp_envelope_t envelope; - - if(consume_message_wait_forever(conn, &envelope)) - { - continue; - } - - string request_json((const char*)envelope.message.body.bytes, envelope.message.body.len); - - // RPC call - - LOG_open(conf.getLogDir(), conf.getLogFileName()); - - string reply_json; - try - { - reply_json = service_call(request_json, queuename, conf); - } - catch(const invalid_argument& ex) - { - reply_json = service_exception(service_error::INVALID_PARAM, ex.what()); - } - catch(const exception& ex) - { - reply_json = service_exception(service_error::SYSTEM_ERROR, ex.what()); - } - - LOG_close(); - - - basic_publish_on_queue_or_drop(conn, channel, - envelope.message.properties.reply_to, - envelope.message.properties.correlation_id, - reply_json.c_str()); - - amqp_destroy_envelope(&envelope); - } - - // Function never returns. Terminate with signal. -} - - - -void do_cleanup(amqp_connection_state_t conn, amqp_channel_t channel) -{ - die_on_amqp_error(amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), "Closing channel"); - die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); - die_on_error(amqp_destroy_connection(conn), "Ending connection"); - - LOG_close(); -} - - - -// interfaces - -// global to make accessible from signal_handler FIXME -amqp_connection_state_t conn; -amqp_channel_t channel; - - -void rpc_run(const string user_name, const string password, - const string hostname, int port, - const string rpc_queuename, - const string settings_pathname) -{ - conn = login_to_broker(user_name, password, hostname, port); - - channel = 1; // only single AMQP-channel per connection needed, use channel no. 1 - rpc_loop_forever(conn, channel, rpc_queuename, settings_pathname); // func never returns -} - - -void rpc_cleanup(void) -{ - do_cleanup(conn, channel); -} - - - - - - - - - - -/////////////////////////////////////////////////////////////////////////// -/////////////////////////////////////////////////////////////////////////// -/////////////////////////////////////////////////////////////////////////// -// NOTE: -// this was in rpc_run_loop AFTER queue_declare and BEFORE basic_consume : -#ifdef usedefaultexchange -// bind queue to exchange - -amqp_queue_bind(conn, channel, - queuename, - amqp_cstring_bytes("amq.direct"), // exchange - queuename, // routingKey := queuename - amqp_empty_table); // empty arguments -throw_ex_on_amqp_error(amqp_get_rpc_reply(conn), "amqp queue bind"); - -// better load balancing - -amqp_basic_qos_ok_t * qok = amqp_basic_qos(conn, channel, - 0, // prefetch_size uint32_t - 1, // prefetch_count uint16_t - 0);// global amqp_boolean_t : -// =0 prefetch_count applies seperatly to each consumer -// =1 prefetch_count applices to all consumers -throw_ex_on_amqp_error(amqp_get_rpc_reply(conn), "amqp basic QoS"); -#endif -// ask the broker to start a basic-consumer on queue "queuename" -// serves all channels in connection (envelope.channel) -> always reply to -// queue whos name is in reply-to field (no need to ditinguish channels, -// reply-to queues were created by that channel) - -// no_ack affects message consume from queue: -// broker will remove msg right after delivery without waiting for confirmation from connected peer -// improves performance on expense of reliability - - - -/* util -void print_envelope_if(int condition, amqp_envelope_t * envelope) -{ - if(condition){ - printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) envelope->delivery_tag, - (int) envelope->exchange.len, (char *) envelope->exchange.bytes, - (int) envelope->routing_key.len, (char *) envelope->routing_key.bytes); - - if (envelope->message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { - printf("Content-type: %.*s\n", - (int) envelope->message.properties.content_type.len, - (char *) envelope->message.properties.content_type.bytes); - } - printf("----\n"); - // amqp_dump(envelope->message.body.bytes, envelope->message.body.len); - } -} -*/ - diff --git a/data-access/engine/src/vlkbd/src/rpc_amqp.hpp b/data-access/engine/src/vlkbd/src/rpc_amqp.hpp deleted file mode 100644 index 42c8d91026565f6e583b2c60714bc813ba40668b..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/rpc_amqp.hpp +++ /dev/null @@ -1,16 +0,0 @@ - -#ifndef RPC_AMQP_HPP -#define RPC_AMQP_HPP - -#include <string> - -void rpc_run( - const std::string user_name, const std::string password, - const std::string hostname, int port, - const std::string rpc_queuename, - const std::string settings_pathname); - -void rpc_cleanup(void); - -#endif - diff --git a/data-access/engine/src/vlkbd/src/rpc_amqp_utils.cpp b/data-access/engine/src/vlkbd/src/rpc_amqp_utils.cpp deleted file mode 100644 index dfc4422d5b71362a31c19fecd4e1d9ab8850b1c4..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/rpc_amqp_utils.cpp +++ /dev/null @@ -1,249 +0,0 @@ -/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ - -#include "rpc_amqp_utils.hpp" - -#include <string> -#include <stdexcept> - -#include <stdarg.h> -#include <stdlib.h> -#include <stdio.h> -#include <stdint.h> -#include <string.h> -#include <ctype.h> -#include <syslog.h> - -#include <amqp.h> -#include <amqp_framing.h> - - -void die(const char *fmt, ...) -{ - const size_t BUFF_SIZE = 256; - char buff[BUFF_SIZE]; - buff[BUFF_SIZE] = 0; - va_list ap; - va_start(ap, fmt); - vsnprintf(buff, BUFF_SIZE-1, fmt, ap); - va_end(ap); - throw std::runtime_error(buff); -} - -void die_on_error(int x, char const *context) -{ - if (x < 0) { - die("%s: %s\n", context, amqp_error_string2(x)); - } -} - -// If synchronous AMQP API methods fail (return NULL) use this. -// Get the last global amqp_rpc_reply (per-connection-global amqp_rpc_reply_t) -// normal operation: AMQP_RESPONSE_NORMAL -> RPC completed successfully. -// error: AMQP_RESPONSE_SERVER_EXCEPTION (conn closed, channel closed, library exception) -void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) -{ - switch (x.reply_type) { - - case AMQP_RESPONSE_NORMAL: - return; - - - case AMQP_RESPONSE_NONE: - die("%s: missing RPC reply type!\n", context); - break; - - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - die("%s: %s\n", context, amqp_error_string2(x.library_error)); - break; - - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; - die("%s: server connection error %uh, message: %.*s\n", - context, - m->reply_code, - (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; - } - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; - die("%s: server channel error %uh, message: %.*s\n", - context, - m->reply_code, - (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; - } - default: - die("%s: unknown server error, method id 0x%08X\n", context, x.reply.id); - break; - } - break; - - - default: - die("%s: unknown server error, reply_type 0x%08X\n", context, x.reply_type); - break; - } - // code never reaches here -} - - -void throw_ex_on_amqp_error(amqp_connection_state_t conn, amqp_rpc_reply_t x, char const *context) -{ - const size_t buff_size = 255; - char buff[buff_size+1]; - - buff[buff_size+1] = 0; - - - switch (x.reply_type) { - - case AMQP_RESPONSE_NORMAL: - return; - - - case AMQP_RESPONSE_NONE: - snprintf(buff, buff_size,"%s: missing RPC reply type!\n", context); - break; - - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - snprintf(buff, buff_size,"%s: %s\n", context, amqp_error_string2(x.library_error)); - break; - - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; - snprintf(buff, buff_size,"%s: server connection error %uh, message: %.*s\n", - context, - m->reply_code, - (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; - } - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; - snprintf(buff, buff_size,"%s: server channel error %uh, message: %.*s\n", - context, - m->reply_code, - (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; - } - default: - snprintf(buff, buff_size,"%s: unknown server error, method id 0x%08X\n", context, x.reply.id); - break; - } - break; - - - default: - snprintf(buff, buff_size,"%s: unknown server error, reply_type 0x%08X\n", context, x.reply_type); - break; - } - - if (AMQP_RESPONSE_NORMAL != x.reply_type) - { - die_on_error(amqp_destroy_connection(conn), "Ending connection before throw runtime exception"); - throw std::runtime_error(buff); - } -} - - - - - - - -// output - - -static void dump_row(long count, int numinrow, int *chs) -{ - int i; - - printf("%08lX:", count - numinrow); - - if (numinrow > 0) { - for (i = 0; i < numinrow; i++) { - if (i == 8) { - printf(" :"); - } - printf(" %02X", chs[i]); - } - for (i = numinrow; i < 16; i++) { - if (i == 8) { - printf(" :"); - } - printf(" "); - } - printf(" "); - for (i = 0; i < numinrow; i++) { - if (isprint(chs[i])) { - printf("%c", chs[i]); - } else { - printf("."); - } - } - } - printf("\n"); -} - -static int rows_eq(int *a, int *b) -{ - int i; - - for (i=0; i<16; i++) - if (a[i] != b[i]) { - return 0; - } - - return 1; -} - -void amqp_dump(void const *buffer, size_t len) -{ - unsigned char *buf = (unsigned char *) buffer; - long count = 0; - int numinrow = 0; - int chs[16]; - int oldchs[16] = {0}; - int showed_dots = 0; - size_t i; - - for (i = 0; i < len; i++) { - int ch = buf[i]; - - if (numinrow == 16) { - int j; - - if (rows_eq(oldchs, chs)) { - if (!showed_dots) { - showed_dots = 1; - printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); - } - } else { - showed_dots = 0; - dump_row(count, numinrow, chs); - } - - for (j=0; j<16; j++) { - oldchs[j] = chs[j]; - } - - numinrow = 0; - } - - count++; - chs[numinrow++] = ch; - } - - dump_row(count, numinrow, chs); - - if (numinrow != 0) { - printf("%08lX:\n", count); - } -} diff --git a/data-access/engine/src/vlkbd/src/rpc_amqp_utils.hpp b/data-access/engine/src/vlkbd/src/rpc_amqp_utils.hpp deleted file mode 100644 index 2ada690149c65731f78f659e8aa2d926adc840b7..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/rpc_amqp_utils.hpp +++ /dev/null @@ -1,20 +0,0 @@ -/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ -#ifndef RPC_AMQP_UTILS_HPP -#define RPC_AMQP_UTILS_HPP - -#include <amqp.h> - -void die(const char *fmt, ...); -extern void die_on_error(int x, char const *context); -extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context); - -extern void amqp_dump(void const *buffer, size_t len); - -extern uint64_t now_microseconds(void); -extern void microsleep(int usec); - - -void throw_ex_on_amqp_error(amqp_connection_state_t conn, amqp_rpc_reply_t x, char const *context); - - -#endif diff --git a/data-access/engine/src/vlkbd/src/vlkbd.cpp b/data-access/engine/src/vlkbd/src/vlkbd.cpp deleted file mode 100644 index b0d9fcad69cedc0bd4ecffc712e99ceab23c9a33..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/src/vlkbd.cpp +++ /dev/null @@ -1,169 +0,0 @@ -// vlkbd -// daemon for VLKB services - -#include "rpc_amqp.hpp" - -#include <iostream> -#include <stdexcept> - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <stdint.h> - -// daemon related START -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/signal.h> -#include <fcntl.h> -#include <errno.h> -#include <unistd.h> -#include <syslog.h> -// daemon related END - - -#define DAEMON_NAME "vlkbd" - -char dname[256]; - - -using namespace std; - -void signal_handler(int sig) { - - switch(sig){ - case SIGTERM: - syslog(LOG_INFO,"received SIGTERM signal. Exiting ..."); - rpc_cleanup(); - exit(EXIT_SUCCESS); - break; - - default: - syslog(LOG_WARNING,"received unhandled signal (%d) %s", - sig,strsignal(sig)); - break; - - } -} - - -void daemonize(void) { - - /* Our process ID and Session ID */ - pid_t pid, sid; - - /* Fork off the parent process */ - pid = fork(); - if (pid < 0) { - exit(EXIT_FAILURE); - } - /* If we got a good PID, then - we can exit the parent process. */ - if (pid > 0) { - exit(EXIT_SUCCESS); - } - - // insert signal handler here... - signal(SIGTERM, signal_handler); - - /* Change the file mode mask */ - umask(0); - - // Open any logs here ... - setlogmask(LOG_UPTO(LOG_INFO)); - sprintf(dname,"%s[%d]",DAEMON_NAME,getpid()); - openlog(dname, LOG_CONS, LOG_USER); - syslog(LOG_INFO,"service started."); - - /* Create a new SID for the child process */ - sid = setsid(); - if (sid < 0) { - /* Log the failure */ - exit(EXIT_FAILURE); - } - - /* Change the current working directory */ - if ((chdir("/")) < 0) { - /* Log the failure */ - exit(EXIT_FAILURE); - } - - /* Keep the standard file descriptors 0, 1, 2 occupied - so as no socket can have it; to guard the socket - against 'forgotten' printf's */ - - freopen ("/dev/null", "r", stdin); - freopen ("/dev/null", "w", stdout); - freopen ("/dev/null", "w", stderr); - - /* Daemon-specific initialization goes here */ -} - - - -std::string base_name(std::string path) -{ - return path.substr(path.find_last_of("//") + 1); - // FIXME replace with basename -} - - - -void usage(const string progname) -{ - cerr - << "Usage: " << progname << " host port queuename conf-pathname [-t]" << endl - << endl - << " -t : stay on terminal, don't run as daemon" << endl - << endl - << "Version: " << VERSIONSTR << " " << BUILD << endl; -} - - - -int main(int argc, char *argv[]) -{ - const std::string progname = base_name(argv[0]); - - if ((argc != 5) && (argc != 6)) - { - usage(progname); - return 1; - } - - int daemon = (argc == 5); - if(daemon) - daemonize(); - - // now stdin/out/err are redirected - - - // start AMQP-consumer - - string hostname(argv[1]); - int port = atoi(argv[2]); - string queuename(argv[3]); - string user_name("guest"); - string password("guest"); - string settings_pathname(argv[4]); - //string settings_pathname("/etc/vlkb/datasets.conf"); - // FIXME uname passwd put to conf file or args - cannot start without them - - try - { - rpc_run(user_name, password, hostname, port, queuename, settings_pathname); - } - catch (std::runtime_error const& e) - { - if(daemon) - syslog(LOG_ERR, e.what()); - else - std::cerr << "Runtime error: " << e.what() << std::endl; - - return 2; - } - - syslog(LOG_INFO,"service stoped. Exiting..."); - - return 0; -} diff --git a/data-access/engine/src/vlkbd/vlkbd.1 b/data-access/engine/src/vlkbd/vlkbd.1 deleted file mode 100644 index d4c3cf9e3da11c257baae61e03b3dd797f249fa1..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/vlkbd.1 +++ /dev/null @@ -1,22 +0,0 @@ -.\" Hey, EMACS: -*- nroff -*- -.\" (C) Copyright 2023 ... -.\" -.TH vlkbd 1 -.SH NAME -vlkbd \- vlkbd application -.SH SYNOPSIS -.B vlkbd -.SH DESCRIPTION -The -.B vlkbd -is an engine to perform data access (cutout, multicutout and demosaicing) of FITS files. -List of actual sub-commands is printed by --help. -.SH SEE ALSO -.BR vlkb, vlkb-obscore, vlkbd (1). -.SH AUTHORS -The -.B vlkbd -was written by -RBu <rbu@ia2.inaf.it> -.PP -This document was written by RBu <rbu@ia2.inaf.it> for Debian. diff --git a/data-access/engine/src/vlkbd/vlkbd.changelog.Debian b/data-access/engine/src/vlkbd/vlkbd.changelog.Debian deleted file mode 100644 index 812a6754059b4c0dd522725f3fc145b3a0c77238..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/vlkbd.changelog.Debian +++ /dev/null @@ -1,13 +0,0 @@ -vlkbd (1.4.8) stable; urgency=low - - [ VLKB ] - * First release via deb and rpm packages. - - -- INAF <RBu@ia2.inaf.com> Thu, 23 Dec 2023 11:30:00 +0100 - -vlkbd (1.4.7) stable; urgency=low - - [ INAF ] - * Adds support for SODA parameters (http://ivoa.net/documents). - - -- INAF <RBu@ia2.inaf.org> Wed, 4 Oct 2023 11:00:00 +0100 diff --git a/data-access/engine/src/vlkbd/vlkbd.control b/data-access/engine/src/vlkbd/vlkbd.control deleted file mode 100644 index 5b0ae0c617677d6b957ba8d71ea1139f1424fe23..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/vlkbd.control +++ /dev/null @@ -1,8 +0,0 @@ -Package: vlkbd -Version: -Section: utils -Priority: optional -Architecture: all -Maintainer: VLKB <RBu@ia2.vlkb.org> -Description: This is vlkbd engine to perform data access (cutout, multi-cutout, demosaicing) of FITS-files. List of commands is printed in help. - diff --git a/data-access/engine/src/vlkbd/vlkbd.copyright b/data-access/engine/src/vlkbd/vlkbd.copyright deleted file mode 100644 index 6d86f408d7212bc8d9bdb9833dd80346e39188ed..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/vlkbd.copyright +++ /dev/null @@ -1,14 +0,0 @@ -vlkbd - -Copyright: 2023 INAF <ia2@inaf.com> - -2023-10-30 - -The entire code base may be distributed under the terms of the GNU General -Public License (GPL), which appears immediately below. Alternatively, all -of the source code as any code derived from that code may instead be -distributed under the GNU Lesser General Public License (LGPL), at the -choice of the distributor. The complete text of the LGPL appears at the -bottom of this file. - -See /usr/share/common-licenses/(GPL|LGPL) diff --git a/data-access/engine/src/vlkbd/vlkbd.datasets.conf b/data-access/engine/src/vlkbd/vlkbd.datasets.conf deleted file mode 100644 index bccc41819036738345cde389866cc381c672eb2f..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/vlkbd.datasets.conf +++ /dev/null @@ -1,10 +0,0 @@ - -# path to original files -fits_path_surveys=/srv/surveys -# path to generated cutouts -fits_path_cutouts=/srv/cutouts - -# logging records last request only -# log_dir=/tmp -# log_filename=vlkbd.log - diff --git a/data-access/engine/src/vlkbd/vlkbd.spec b/data-access/engine/src/vlkbd/vlkbd.spec deleted file mode 100644 index ff016c7ce151d3067e29e60306a70b82204d562d..0000000000000000000000000000000000000000 --- a/data-access/engine/src/vlkbd/vlkbd.spec +++ /dev/null @@ -1,34 +0,0 @@ -Name: vlkbd -Version: %{version} -Release: 1%{?dist} -Summary: vlkbd -Source1: vlkbd -License: GPLv3+ -URL: http://ia2.inaf.it -BuildRequires: gcc >= 3.2.0, glibc-devel >= 2.17, libstdc++-devel >= 4.8, ast-devel >= 7.3.4, cfitsio-devel >= 3.370, libcsv-devel >= 3.0 -Requires: glibc >= 2.17, libstdc++ >= 4.8, ast >= 7.3.4, cfitsio >= 3.370, libcsv >= 3.0 - -%description -This utility ia part of a VLKB-suite (ViaLactea Knowledge Base) to manipulate or calculate information abount -coordinates systems in a FITS-file. Set of actual commands is printed in help. - - -%prep - -%build - - -%install -mkdir -p %{buildroot}%{_prefix}/bin -install -m 755 %{SOURCE1} %{buildroot}%{_prefix}/bin -%files -%{_bindir}/vlkbd - - -%post - -%postun - - -%changelog - diff --git a/data-access/engine/vlkbd_exec.sh b/data-access/engine/vlkbd_exec.sh deleted file mode 100755 index f9c87f1064afd81482f72f9034972bf5a09313ad..0000000000000000000000000000000000000000 --- a/data-access/engine/vlkbd_exec.sh +++ /dev/null @@ -1,44 +0,0 @@ - -# how to all vlkbd in Makefile: - -# killall -q vlkbd-$(VERNUM); test $$? -eq 1 - - -if [ "$#" -lt 1 ]; then - echo -e "Run vlkbd-<version> on all CPU-cores, connecting to RabbitMQ on <amqphost>.\nUsage:\n\t $0 <amqphost> <queue_name> <datasets_conf>\n" - exit -fi - -ncores=$(grep '^processor' /proc/cpuinfo | sort -u | wc -l) - -AMQPHOST=$1 - - -for core in $(seq 0 $(expr $ncores - 1) ) -do - - # FIXME /usr/local should be configurable from engine/Makefile -> INSTALL_DIR - # or vlkbd_exec.sh sould be under resources and handled together with datasets.conf - taskset -c $core vlkbd $AMQPHOST 5672 $2 $3 - -done - -ps ax | grep vlkbd - - - -# with bitmask: - -# usage: run on 2nd cpu ore -#taskset 0x02 ./vlkb_amqp localhost 5672 test -# run on 1st cpu ore -#taskset 0x01 ./vlkb_amqp localhost 5672 test -# Note that a bitmask uses "hexadecimal" notation. -# "0x11" is "00010001" in a binary format, which corresponds -# to CPU core 0 and 4. -# CPU core 0 and 1 is represented by CPU affinity "0x3". -# taskset $1 ./vlkb_amqp localhost 5672 test - - -# use defualt exchange -#./amqp_listen localhost 5672 "" "test" diff --git a/data-access/servlet/config/Makefile b/data-access/servlet/config/Makefile index 6e549a0f124eaff9127db3a0ca219d01e46d0cba..39a1c03c1fd964eb32ed88a47c6f7f874a7a3eac 100644 --- a/data-access/servlet/config/Makefile +++ b/data-access/servlet/config/Makefile @@ -1,7 +1,5 @@ ################################################################ # args -AMQP_QUEUE ?= vlkbdevel -# if amqp_host_name empty -> uses ExecCmd vlkb for cutout (not AMQP+vlkbd) # Resolver DB: <none> localhost pasquale pasqule-devel # DBMS ?= localhost diff --git a/data-access/servlet/src/main/java/common/webapi/Settings.java b/data-access/servlet/src/main/java/common/webapi/Settings.java index 3d391830373b4707d775bccac0fa00c6f049ab07..ff860078880b5aea653226d004075dc32a9c4e84 100644 --- a/data-access/servlet/src/main/java/common/webapi/Settings.java +++ b/data-access/servlet/src/main/java/common/webapi/Settings.java @@ -56,31 +56,8 @@ class Settings } - // DEPRECATED - public static class AmqpConn - { - private String hostName; - private int portNum; - private String routingKey; - - public String hostName() { return hostName; } - public int portNumber() { return portNum; } - public String routingKey() { return routingKey; } - - public boolean isHostnameEmpty() - { - return ( (hostName == null) || hostName.trim().isEmpty() ); - } - - public String toString() - { - return hostName + " " + String.valueOf(portNum) + " " + routingKey; - } - } - public FITSPaths fitsPaths; public DBConn dbConn; - public AmqpConn amqpConn; // will not start without config-file @@ -100,9 +77,8 @@ class Settings FITSPaths fitsPaths = loadFITSPaths(properties); DBConn dbConn = loadDBConn(properties); - AmqpConn amqpConn = loadAmqpConn(properties); - return new Settings(dbConn, amqpConn, fitsPaths); + return new Settings(dbConn, fitsPaths); } else { @@ -118,12 +94,10 @@ class Settings - private Settings(DBConn dbConn, AmqpConn amqpConn, - FITSPaths fitsPaths) + private Settings(DBConn dbConn, FITSPaths fitsPaths) { this.fitsPaths = fitsPaths; this.dbConn = dbConn; - this.amqpConn = amqpConn; } @@ -147,15 +121,5 @@ class Settings return dbconn; } - private static AmqpConn loadAmqpConn(Properties properties) - { - AmqpConn amqpconn = new AmqpConn(); - amqpconn.hostName = properties.getProperty("amqp_host_name", "").strip(); - String strPortNum = properties.getProperty("amqp_port", "5672").strip(); - amqpconn.portNum = Integer.parseInt(strPortNum); - amqpconn.routingKey = properties.getProperty("amqp_routing_key", "").strip(); - return amqpconn; - } - } diff --git a/data-access/servlet/src/main/java/cutout/webapi/ServletCutout.java b/data-access/servlet/src/main/java/cutout/webapi/ServletCutout.java index 7f87953a8e31894942e8544721bcad3f3729ede7..915bf9771831fb30f5f02fc93a68307e9f2ab07c 100644 --- a/data-access/servlet/src/main/java/cutout/webapi/ServletCutout.java +++ b/data-access/servlet/src/main/java/cutout/webapi/ServletCutout.java @@ -55,7 +55,6 @@ public class ServletCutout extends HttpServlet protected static final Subsurvey[] subsurveys = Subsurvey.loadSubsurveys(settings.fitsPaths.surveysMetadataAbsPathname()); protected boolean resolveFromId = true;//FIXME separate setting authz is separate table settings.dbConn.isDbUriEmpty(); - protected boolean useEngineOverCli = settings.amqpConn.isHostnameEmpty(); final String RESPONSE_ENCODING = "utf-8"; final String DEFAULT_RESPONSEFORMAT = "application/fits"; @@ -67,9 +66,6 @@ public class ServletCutout extends HttpServlet LOGGER.config("Subsurveys loaded : " + String.valueOf(subsurveys.length)); LOGGER.config("DEFAULT_RESPONSEFORMAT : " + DEFAULT_RESPONSEFORMAT); LOGGER.config("Resolver : " + (resolveFromId ? "IVOID" : "DB")); - LOGGER.config("Engine : " + (useEngineOverCli ? "CLI" : "AMQP")); - if(!useEngineOverCli) - LOGGER.config("AMQP : " + settings.amqpConn.toString()); } @@ -158,9 +154,7 @@ public class ServletCutout extends HttpServlet final Resolver resolver = (resolveFromId ? new ResolverFromId(subsurveys) : new ResolverByObsCore(settings.dbConn, subsurveys)); - final Vlkb vlkb = (useEngineOverCli ? - new VlkbCli(settings, subsurveys) - : new VlkbAmqp(settings, subsurveys)); + final Vlkb vlkb = new VlkbCli(settings, subsurveys); resolver.resolve(id); diff --git a/data-access/servlet/src/main/java/vlkb/Vlkb.java b/data-access/servlet/src/main/java/mcutout/Vlkb.java similarity index 67% rename from data-access/servlet/src/main/java/vlkb/Vlkb.java rename to data-access/servlet/src/main/java/mcutout/Vlkb.java index f81378ed0d30f935051e9397730a7de6a58bad87..eef767d4c33421ddc82912e283ca8a0afd70225f 100644 --- a/data-access/servlet/src/main/java/vlkb/Vlkb.java +++ b/data-access/servlet/src/main/java/mcutout/Vlkb.java @@ -18,30 +18,18 @@ public interface Vlkb String cutAbsPathname) throws IOException, InterruptedException; + public void doFile(String relPathname, int hdunum, String pixels, String cutAbsPathname) throws IOException, InterruptedException; - public MCutResult doMCutout(String jdlJson, String workDir) throws IOException, InterruptedException; - public CutResult doMerge(String[] idArr, Coord coord, boolean countNullValues) - throws FileNotFoundException, IOException; - - - // misc - NullValueCount doCountNullValues(String absPathname, int hdunum) throws IOException, InterruptedException; - // deprecated - used only in VlkbAmqp - public CutResult doFileAmqp(String relPathname, int hdunum, - Pos pos, Band band, Time time, Pol pol, - boolean countNullValues, FitsCard[] extraCards, - String cutAbsPathname) - throws IOException, InterruptedException; } diff --git a/data-access/servlet/src/main/java/mcutout/VlkbCli.java b/data-access/servlet/src/main/java/mcutout/VlkbCli.java index 701d9271241ad9d4af63cec9122785930a5d660c..34626f3817a07d9fbcf4c72ded653cbd10f2f087 100644 --- a/data-access/servlet/src/main/java/mcutout/VlkbCli.java +++ b/data-access/servlet/src/main/java/mcutout/VlkbCli.java @@ -87,17 +87,6 @@ class VlkbCli implements Vlkb return new CutResult(); } - public CutResult doFileAmqp(String relPathname, int hdunum, - Pos pos, Band band, Time time, Pol pol, - boolean countNullValues, FitsCard[] extraCards, - String cutAbsPathname) - throws IOException, InterruptedException - { - LOGGER.fine("trace doFileAmqp by CLI is NOT IMPLEMENTED (only by AMQP)"); - - return new CutResult(); - } - public void doFile(String relPathname, int hdunum, diff --git a/data-access/servlet/src/main/java/mcutout/webapi/UWSMCutoutWork.java b/data-access/servlet/src/main/java/mcutout/webapi/UWSMCutoutWork.java index 1dc06af6ff3ba24ab93d21c8086c5d72e25d297a..f8e5881b0202a6233a572173f81ab7a3e3a16ff8 100644 --- a/data-access/servlet/src/main/java/mcutout/webapi/UWSMCutoutWork.java +++ b/data-access/servlet/src/main/java/mcutout/webapi/UWSMCutoutWork.java @@ -43,9 +43,7 @@ public class UWSMCutoutWork extends JobThread private Settings settings = UWSMCutout.settings; private Subsurvey[] subsurveys = UWSMCutout.subsurveys; - protected Vlkb vlkb = (settings.amqpConn.isHostnameEmpty() ? - new VlkbCli(settings, subsurveys) - : new VlkbAmqp(settings, subsurveys)); + protected Vlkb vlkb = new VlkbCli(settings, subsurveys); /* NOTE needed if cutouts dir served by vlkb-datasets */ private String webappRootRequestUrl = null; diff --git a/data-access/servlet/src/main/java/vlkb/Regrid.java b/data-access/servlet/src/main/java/vlkb/Regrid.java deleted file mode 100644 index 04ef0662988a25a28633179a43725aef8f5d83aa..0000000000000000000000000000000000000000 --- a/data-access/servlet/src/main/java/vlkb/Regrid.java +++ /dev/null @@ -1,254 +0,0 @@ -// -// Access to service-engines (abstract methods) plus common functionality (merge): -// - search, cutout, mergefiles : abstract webservices funcs (overrride for AMQP, JNI) -// - merge : high level, which uses mergefiles after cutout - -import java.util.logging.Logger; -//import java.util.logging.Level; - -import java.io.IOException; -import java.util.*; // ArrayList<String> -import java.io.File; -import nom.tam.fits.*;// Fits - for regridding - - - -class Regrid -{ - private static final Logger LOGGER = Logger.getLogger(Regrid.class.getName()); - - private Double average(ArrayList<Double> dd) { - - Double sum = 0.0; - int sz = dd.size(); - - for (int i=0; i < sz; i++) { - sum += dd.get(i); - } - return sum / sz; - } - - - // returns number of non-degenerate axes in fitsfiles - // this is utility for merge, where files must be homogenious, - // so number of dimensions must be the same in - // all files otherwise merge not possible - // - // Counts only non-degenerate axes (NAXISi >1). - // - // in: fitsfiles : list of filenames with full path !? - // out: int : number of (non-degenerate) dimensions common to all fitsfiles - public int dimensions(final String[] fitsfiles) - //throws FitsException, IOException - { - int dim = 0; - - ArrayList<Integer> Vnaxis = new ArrayList<Integer>(); - - for(String fitsfile : fitsfiles) { - - // check if empty filename (;; typed on input) - if(fitsfile.length() == 0) - continue; - - try { - - Fits f = new Fits(fitsfile); - Integer naxis = f.getHDU(0).getHeader().getIntValue("NAXIS"); - - Integer nx = 0; // naxis excluding degenerate axes - for(int i=1; i<=naxis; i++){ - if( 1 < f.getHDU(0).getHeader().getIntValue("NAXIS" + i)) - nx++; - } - Vnaxis.add(nx); - - } catch (FitsException e) { - LOGGER.severe("dimensions: " + e.getMessage()); - } catch (IOException e) { - LOGGER.severe("dimensions:" + e.getMessage()); - } - } - // check that dimensions in all files match - dim = Vnaxis.get(0); - for( int ix = 1; ix < Vnaxis.size() ; ix++ ) { - if (Vnaxis.get(ix) != dim) { - // FIXME throw exception - dim = 0; - }; - } - return dim; - } - - // regrid variant 2: - // CRPIX can be different in files to be merged -> we'll adjust shift CRPIX for common CRVAL - // attempt regrid velocity if needed and possible - // returns true: add msg to client: "Keywords CDLET3, CRVAL3 - // were modified for merge, see header of the merged file." - // returns false: do nothing - // - // use closest neighbour algorithm - changes only metadata - // possible: if CDELT3 does not differ more then one pixel (=size CDELT3) - // between the files to be merged and - // NAXIS3 is exactly the same (in cutouts should be) - // needed: if CDELT3 & CRVAL3 are not (exactly) equal between the files to be merged - // - // WARN: assumes (as Montage does too) 3rd axis is velocity (compatible) - public Boolean regrid_vel2(final String[] fitsfiles) - // Boolean regrid_vel2(final ArrayList<String> fitsfiles) - //throws FitsException, IOException - { - ArrayList<Double> Vcrval = new ArrayList<Double>(); - ArrayList<Double> Vcrpix = new ArrayList<Double>(); - ArrayList<Double> Vcdelt = new ArrayList<Double>(); - ArrayList<Long> Vnaxis = new ArrayList<Long>(); - - // - // 1, read needed keywords from files to be merged - // - for(String fitsfile : fitsfiles) { - - // check if empty filename (;; typed on input) - if(fitsfile.length() == 0) - continue; - - // read orig keyword values - - try { - - // FITS - Fits f = new Fits(fitsfile); - - // we should check here that 3rd axis is velocity-compatible (freq etc) - // for now, remains only assumption (also Montage only assumes and offers mTransform if it is not so) - // String oldkwd = f.getHDU(0).getHeader().getStringValue("CTYPE3"); - - // get card values as string (for exact match comparison to avoid string -> double conversion artifacts) - // String allcardstr = f.getHDU(0).getHeader().findKey("CRVAL3"); - // String cardvaluestr = f.getHDU(0).getHeader().findCard("CRVAL3").getValue(); - // LOGGER.finest("CRVAL3 as string: " + cardvaluestr); - - Vcrval.add(f.getHDU(0).getHeader().getDoubleValue("CRVAL3")); - Vcrpix.add(f.getHDU(0).getHeader().getDoubleValue("CRPIX3")); - Vcdelt.add(f.getHDU(0).getHeader().getDoubleValue("CDELT3")); - Vnaxis.add(f.getHDU(0).getHeader().getLongValue("NAXIS3")); - - - } catch (FitsException e) { - LOGGER.severe("regrid_vel2: " + e.getMessage()); - } catch (IOException e) { - LOGGER.severe("regrid_vel2: " + e.getMessage()); - } - - } - /*/ debug print - for( int ix = 0; ix < Vcrval.size() ; ix++ ) { - - LOGGER.finest(ix + - " " + Vcrval.get(ix) + - " " + Vcdelt.get(ix) + - " " + Vcrpix.get(ix) + - " " + Vnaxis.get(ix) - ); - - } - */ - - // - // 2, check if closeste-neighbour interpolation possible/needed: - // NAXIS3 must match - // max diff(CDELT3) << absvalue(CDELT3) - // max diff(CRVAL3) << absvalue(CDELT3) - // - long dnaxis = Collections.max(Vnaxis) - Collections.min(Vnaxis); - //LOGGER.finest("dNAXIS : " + dnaxis); - if( dnaxis != 0 ) { - return false; - } - double minCDELT = Collections.min(Vcdelt); - double maxCDELT = Collections.max(Vcdelt); - - double avgCDELT = average(Vcdelt); - double absavgCDELT = java.lang.Math.abs(avgCDELT); - - // FIXME use exceptions instead... - if(absavgCDELT == 0.0) { - LOGGER.finest("regrid: avg(CDELT3) == 0"); - return false; - } - - double dcdelt = java.lang.Math.abs(maxCDELT - minCDELT); - //LOGGER.finest("dCDELT : " + dcdelt - // + " ratio: " + - // String.format("%.1f",100.0*dcdelt/absavgCDELT) - // + " %" ); - if(dcdelt > absavgCDELT) { - return false; - } - - double minCRVAL = Collections.min(Vcrval); - double maxCRVAL = Collections.max(Vcrval); - double dcrval = java.lang.Math.abs(maxCRVAL - minCRVAL); - //LOGGER.finest("dCRVAL : " + dcrval + "|CDELT| : " + absavgCDELT - // + " ratio: " + - // String.format("%.1f",100.0*dcrval/absavgCDELT) - // + " %" ); - // if(dcrval > absavgCDELT) { - // return false; - // } - // if we've got here all conditions for interpolation satisfied - - // exact match, interpolation not needed - // ?? FIXME would be better check exact match by comparing card values as strings ?? - // to avoid string -> double conversion machine architecture dependencies (any?) - if((dcrval == 0.0) && (dcdelt == 0.0)){ - return false; - } - - // - // 3, interpolation possible and needed: update fits file headers - // with new values - // - - // interpolate closest neighbour: simply set the grid to average of all - // they differ by less then a pixel (=CDELT3) - double newCDELT = avgCDELT; - double newCRVAL = average(Vcrval); - - for(String fitsfile : fitsfiles) { - - // check if empty filename (;; typed on input) - if(fitsfile.length() == 0) - continue; - - try { - - Fits f = new Fits(fitsfile); - - double origCDELT = f.getHDU(0).getHeader().getDoubleValue("CDELT3"); - String commentCDELT = "VLKB OrigVal: " + origCDELT; - f.getHDU(0).getHeader().addValue("CDELT3",newCDELT, commentCDELT); - - double origCRVAL = f.getHDU(0).getHeader().getDoubleValue("CRVAL3"); - String commentCRVAL = "VLKB OrigVal: " + origCRVAL; - f.getHDU(0).getHeader().addValue("CRVAL3",newCRVAL,commentCRVAL); - - double origCRPIX = f.getHDU(0).getHeader().getDoubleValue("CRPIX3"); - String commentCRPIX = "VLKB OrigVal: " + origCRPIX; - double newCRPIX = origCRPIX - ((newCRVAL - origCRVAL) / newCDELT); - f.getHDU(0).getHeader().addValue("CRPIX3",newCRPIX,commentCRPIX); - - f.getHDU(0).rewrite(); - - } catch(Exception e) { - // FIXME do error handling properly... - LOGGER.severe("regrid_vel2: " + e.getMessage()); - } - - } - - return true; - } - -} - diff --git a/data-access/servlet/src/main/java/vlkb/Reproject.java b/data-access/servlet/src/main/java/vlkb/Reproject.java deleted file mode 100644 index 0e1cb009621087ae3aea119a25b54f9b2c2aeb41..0000000000000000000000000000000000000000 --- a/data-access/servlet/src/main/java/vlkb/Reproject.java +++ /dev/null @@ -1,32 +0,0 @@ - - - - -class Reproject implements Runnable -{ - String id; - String prefix; - String fileName; - String[] response; - VlkbAmqp datasets; - - public Reproject(VlkbAmqp datasets, String id, String prefix, String fileName) - { - this.datasets = datasets; - this.id = id; - this.prefix = prefix; - this.fileName = fileName; - } - - - @Override - public void run() - { - String name = Thread.currentThread().getName(); - VlkbAmqp.LOGGER.fine("Start of " + name); - response = datasets.mergefiles_reproject(id, prefix, fileName); - VlkbAmqp.LOGGER.fine("End of " + name); - } - -} - diff --git a/data-access/servlet/src/main/java/vlkb/VlkbAmqp.java b/data-access/servlet/src/main/java/vlkb/VlkbAmqp.java deleted file mode 100644 index 065c1cbd825d3e42a7f7c56e4a9e527d97d63e5a..0000000000000000000000000000000000000000 --- a/data-access/servlet/src/main/java/vlkb/VlkbAmqp.java +++ /dev/null @@ -1,533 +0,0 @@ - -import java.util.logging.Logger; -import java.util.logging.Level; -import java.util.List; -import java.util.ArrayList; -import java.util.Arrays; - -import java.time.Instant; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.nio.file.StandardOpenOption; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - -import java.time.*;// Timestamp in cut-filename -import java.io.ByteArrayOutputStream; // for SODA direct streaming doSubimgStream - -import vo.parameter.*; - -class VlkbAmqp implements Vlkb -{ - static final Logger LOGGER = Logger.getLogger(VlkbAmqp.class.getName()); - - private Settings settings = null; - private Subsurvey[] subsurveys = null; - private Resolver resolver = null; - - public VlkbAmqp() - { - LOGGER.fine("trace VlkbAmqp()"); - this.settings = Settings.getInstance(); - this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys) - : new ResolverByObsCore(settings.dbConn, subsurveys)); - } - - - public VlkbAmqp(Settings settings) - { - LOGGER.fine("trace VlkbAmqp(settings)"); - this.settings = settings; - this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys) - : new ResolverByObsCore(settings.dbConn, subsurveys)); - } - - - - public VlkbAmqp(Settings settings, Subsurvey[] subsurveys) - { - LOGGER.fine("trace VlkbAmqp(settings, subsurveys)"); - this.settings = settings; - this.subsurveys = subsurveys; - this.resolver = (settings.dbConn.isDbUriEmpty() ? new ResolverFromId(subsurveys) - : new ResolverByObsCore(settings.dbConn, subsurveys)); - } - - - public NullValueCount doCountNullValues(String absPathname, int hdunum) - throws IOException, InterruptedException - { - LOGGER.fine("trace: not implemented; TB deprecated"); - return new NullValueCount(); - } - - - public CutResult doMerge(String[] idArr, Coord coord, boolean countNullValues) - throws FileNotFoundException, IOException - { - LOGGER.fine("trace"); - - return merge(idArr, coord, countNullValues); - } - - - /////////////////////////////////////////////////////////////////////////////////// - public void doFile(String relPathname, int hdunum, - Pos pos, Band band, Time time, Pol pol, String dummyCutAbsPathname) - throws IOException, InterruptedException - { - ;// only placehoder for compatibility with Vlkb.java interface, - //Amqp support is deprecated - } - - public void doFile(String relPathname, int hdunum, - String pixels, String dummyCutAbsPathname) - throws IOException, InterruptedException - { - ;// only placehoder for compatibility with Vlkb.java interface, - //Amqp support is deprecated - } - - - public CutResult doFileAmqp(String relPathname, int hdunum, - Pos pos, Band band, Time time, Pol pol, - boolean countNullValues, FitsCard[] extraCards, String dummyCutAbsPathname) - throws IOException, InterruptedException - { - LOGGER.fine("trace: " + pos.toString() ); - - CutResult cutResult = new CutResult(); - - LOGGER.finer("Using AMQP"); - - JsonEncoder jReq = new JsonEncoder(); - jReq.add(relPathname, hdunum); - jReq.add(pos); - jReq.add(band); - jReq.add(time); - jReq.add(pol); - - jReq.add(countNullValues); - jReq.add(extraCards); - - String outJson = RpcOverAmqp.doRpc( settings.amqpConn, jReq.toString() ); - - cutResult = JsonDecoder.responseFromCutoutJson( outJson ); - - return cutResult; - } - - - - private CutResult doFileById(String id, Pos pos, Band band, Time time, Pol pol, - boolean countNullValues, Subsurvey[] subsurveys) - throws IOException, InterruptedException - { - LOGGER.fine("trace"); - - FitsCard[] extraCards = null; - - this.resolver.resolve(id); - String relPathname = this.resolver.relPathname(); - int hdunum = this.resolver.hdunum(); - String subsurveyId = this.resolver.obsCollection(); - - if(subsurveyId != null) - { - extraCards = Subsurvey.subsurveysFindCards(subsurveys, subsurveyId); - } - else - { - LOGGER.finer("Resolver returns subsurveyId null: no extraCards loaded."); - } - - final String DEFAULT_TIME_SYSTEM = "MJD_UTC"; // FIXME take from confif file - - CutResult cutResult = doFileAmqp(relPathname, hdunum, pos, band, time, pol, - countNullValues, extraCards, null); - - return cutResult; - } - - - - - /////////////////////////////////////////////////////////////////////////////////// - - - public MCutResult doMCutout(String jdlJson, String workDir) - throws IOException - { - LOGGER.fine("trace"); - - MCutResult mCutResult; - - LOGGER.finer("doMCutout over AMQP"); - String updatedJsonString = JdlMCutout.resolveAndUpdateJsonRequest(jdlJson, settings, subsurveys); - LOGGER.finest("doMCutout over AMQP : " + updatedJsonString); - String outJson = RpcOverAmqp.doRpc(settings.amqpConn, JdlMCutout.mcutoutToJson(updatedJsonString) ); - mCutResult = JdlMCutout.responseFromMCutoutJson(outJson); - - return mCutResult; - } - - - /* ================= MERGE =============================== */ - - private String generateSubimgPathname(String relPathname, int hdunum) - { - String cutfitsname = "vlkb-cutout"; - - Instant instant = Instant.now() ; - String timestamp = instant.toString().replace(":","-").replace(".","_"); - - String tempPathname1 = relPathname.replaceAll("/","-"); - String tempPathname2 = tempPathname1.replaceAll(" ","_"); - - if(hdunum == 1) - { - return cutfitsname + "_" + timestamp + "_" + tempPathname2; - } - else - { - String extnum = "EXT" + String.valueOf(hdunum-1); - return cutfitsname + "_" + timestamp + "_" + extnum + "_" + tempPathname2; - } - } - - - - private CutResult cutout( - String publisherDid, Coord coord, - boolean countNullValues) - { - // ResolverByObsCore rsl = new ResolverByObsCore(settings.dbConn, subsurveys); - Resolver rsl = resolver;//new ResolverFromId();//settings.dbConn, subsurveys); - rsl.resolve(publisherDid); - - FitsCard[] extraCards = null; - //Subsurvey.subsurveysFindCards(subsurveys, rsl.obsCollection());//rsl.subsurveyId); - String absSubimgPathname = settings.fitsPaths.cutouts() + "/" - + generateSubimgPathname(rsl.relPathname(), rsl.hdunum()); - LOGGER.finest("absSubimgPathname: " + absSubimgPathname); - - LOGGER.finer("Using AMQP"); - - JsonEncoder jReq = new JsonEncoder(); - jReq.add(rsl.relPathname(), rsl.hdunum()); - jReq.add(coord.pos); - jReq.add(coord.band); - jReq.add(coord.time); - jReq.add(coord.pol); - - jReq.add(countNullValues); - jReq.add(extraCards); - - String inJson = jReq.toString(); - - return JsonDecoder.responseFromCutoutJson( RpcOverAmqp.doRpc(settings.amqpConn, inJson) ); - } - - - protected CutResult merge(String[] pubdids, Coord coord, Boolean countNullValues) - { - LOGGER.fine("trace"); - - ArrayList<CutResult> allresults = new ArrayList<CutResult>(); - - // 1. Decode pubdid's from inputs.pubdid and cutout on each - - CutResult[] allCutResults = do_cutouts( - pubdids, coord, - countNullValues); - - allresults.addAll(Arrays.asList(allCutResults)); - - - String[] allCutPathnames = selectCutPathnames(allCutResults); - - if( allCutPathnames.length <= 0 ){ - LOGGER.warning("No cutout created."); - return null; - } - if( allCutPathnames.length != pubdids.length ) { - LOGGER.warning("Number of cubes found and number of cutouts created do not match."); - } - - try - { - // 2. regridding (closest neighbour interpolation) for files to be merged - - Regrid grid = new Regrid(); - int dim = grid.dimensions(allCutPathnames); - if( dim > 2 ) { - Boolean changed = grid.regrid_vel2(allCutPathnames); - if(changed){ - //allresults.add("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding."); - LOGGER.finer("MSG Keywords CDELT3, CRVAL3 were adjusted for merge regridding."); - } - } - - // 3. Merge cut-files - - //String[] strar_results = mergefiles_parallel(id, logFileName, // logfileName - //String[] strar_results = mergefiles_split_execution(id, logFileName, // logfileName - CutResult strar_results = mergefiles( - String.valueOf(dim), // prefix: "2D" or "3D" - allCutPathnames); // files to merge - - allresults.addAll(Arrays.asList(strar_results)); - - } - catch(Exception e) - { - LOGGER.log(Level.SEVERE, "merge:",e); - //allresults.add( - // "MSG System error. Report time, your IP-number, and the exact request-URL string to service administrator."); - } - - CutResult[] dlkArr = allresults.toArray(new CutResult[allresults.size()]); - return dlkArr[0]; // FIXME should return only datalink for the merged file not all cutout files? - } - - - protected CutResult[] do_cutouts( - String[] publisherDids, Coord coord, - Boolean countNullValues) - { - ArrayList<CutResult> allresults = new ArrayList<CutResult>(); - if(publisherDids.length <= 0) - return null; // no cube found - - for(String publisherDid : publisherDids) - { - CutResult cutout_results_table = cutout( - publisherDid, coord, - countNullValues); - - allresults.addAll(Arrays.asList(cutout_results_table)); - } - - return allresults.toArray(new CutResult[allresults.size()]); - } - - - protected CutResult mergefiles( - String prefix, // IN prefix added after filename-start-word - String[] filestomerge) // IN abs path with filenames to be merged - { - LOGGER.fine("trace"); - - String InJson = JsonEncoderMerge.mergefilesToJson( prefix, filestomerge); - String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson); - return JsonDecoder.responseFromCutoutJson( OutJson ); - } - - - - // BEGIN parallel - - protected String[] mergefiles_parallel( - String jobId, // IN any identifier to be guaranteed distinct - String logfilename, // IN logfilename without path - String prefix, // IN prefix added after filename-start-word - String[] filestomerge) // IN abs path with filenames to be merged - { - LOGGER.fine("trace"); - - String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge); - for(String sentence : responseCH) VlkbAmqp.LOGGER.finest("responseCmnHdr: " + sentence); - // check if response errored -> abort with 500: Internal Server Error & log details - - int threadsCount = filestomerge.length; - Thread threadArr[] = new Thread[threadsCount]; - Reproject reprojectArr[] = new Reproject[threadsCount]; - int i; - for(i=0; i<threadsCount; i++) - //for(String file : filestomerge) - { - String file = filestomerge[i]; - reprojectArr[i] = new Reproject(this, jobId, prefix, file); - threadArr[i] = new Thread(reprojectArr[i], "reproject: " + String.valueOf(i)); - - threadArr[i].start(); - } - - // wait until all threads finished - - for(i=0; i<threadsCount; i++) - //for(Thread thread : threadArr) - { - try - { - threadArr[i].join(); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } - - - for(String sentence : reprojectArr[i].response) VlkbAmqp.LOGGER.finest("response[" + String.valueOf(i) + "]: " + sentence); - if(!isResponseOk(reprojectArr[i].response)) - { - ;// FIXME response incorrect -> abort merge-job, free resources - // if incorrect paarams -> respond HTTP.WRONG REQUEST - // if other error -> respond HTTP.INTRNAL ERRR & log - } - } - - String[] response = mergefiles_add_reprojected(jobId, prefix); - // check if response errored -> abort with 500: Internal Server Error & log details - - return response; - } - - private boolean isResponseOk(String[] response) - { - // FIXME implement! - return true; - } - - - - - - protected String[] mergefiles_split_execution( - String jobId, // IN any identifier to be guaranteed distinct - String logfilename, // IN logfilename without path - String prefix, // IN prefix added after filename-start-word - String[] filestomerge) // IN abs path with filenames to be merged - { - LOGGER.fine("trace"); - - String[] responseCH = mergefiles_common_header(jobId, logfilename, prefix, filestomerge); - // check if response errored -> abort with 500: Internal Server Error & log details - - for(String file : filestomerge)// FIXME parallelize on threads & join - { - String[] response = mergefiles_reproject(jobId, prefix, file); - // check if response errored -> abort with: 500: Internal Server Error & log details - } - - String[] response = mergefiles_add_reprojected(jobId, prefix); - // check if response errored -> abort with 500: Internal Server Error & log details - - return response; - } - - protected String[] mergefiles_common_header( - String jobId, // IN jobId to distinguish parallel executed requests - String logfilename, // IN logfilename without path - String prefix, // IN prefix added after filename-start-word - String[] filestomerge) // IN abs path with filenames to be merged - { - LOGGER.fine("trace"); - - String InJson = JsonEncoderMerge.mergefilesCommonHeaderToJson(jobId, prefix, filestomerge); - String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson); - String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); - - return results; - } - - - protected String[] mergefiles_reproject( - String jobId, // IN jobId to distinguish parallel executed requests - String prefix, // IN prefix added after filename-start-word - String fitsfilename) // IN logfilename without path - { - LOGGER.fine("trace"); - - String InJson = JsonEncoderMerge.mergefilesReprojectToJson(jobId, prefix, fitsfilename); - String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson); - String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); - - return results; - } - - - protected String[] mergefiles_add_reprojected( - String jobId, // IN jobId to distinguish parallel executed requests - String prefix) // IN prefix added after filename-start-word - { - LOGGER.fine("trace"); - - String InJson = JsonEncoderMerge.mergefilesAddReprojectedToJson(jobId, prefix); - String OutJson = RpcOverAmqp.doRpc(settings.amqpConn, InJson); - String[] results = null; // FIXME JsonDecoder.responseFromJson(OutJson); - - return results; - } - - // END parallel - - - - - - // returns selected data in list of strings: - // -- from cutout: the cutout filename (server local) - private String[] selectCutPathnames(CutResult[] results) { - - LOGGER.fine("trace"); - - // return only data (without MSG's LOG's etc) - ArrayList<String> data = new ArrayList<String>(); - - // sanity check - move after doFunc call (here covered by exception) - // FIXME consider catch null-pointer-exception - if(results == null) { - LOGGER.finest("selectCutPathnames: results-table is null."); - return null; - } - - for (CutResult res : results) { - - /*/ protect substring() calls below; - // FIXME consider catch exception index-out-of-bounds - if(res.length() < 3) { - LOGGER.warning("Assert(Results.toXML): results msg shorter then 3 chars : " + res); - continue; - } - - // decode msg type - switch(res.substring(0,3)){ - case "URL": // from cutout: the cutout filename for download - String localfname = res.substring(4);//.replaceAll(FITScutpath, ""); - String[] ssfn = localfname.split(":"); - //String[] ssfn = localfname.substring(4).split(":"); - LOGGER.finest("ssfn[0]: " + ssfn[0]); - LOGGER.finest("ssfn[1]: " + ssfn[1]); - data.add(ssfn[1]); - //data.add(localfname); - break; - case "NVS": // from cutout : NVS_nn:nn:nn - case "MSG": - case "LOG": - case "CUT": // from cutout: the file which was cut - case "HID": // from search - // no data, do nothing - break; - default: - LOGGER.severe("Assert(Results.toXML): results msg has unhandled msgtype code : " + res); - }*/ - data.add(res.fileName); - } - - return data.toArray(new String[data.size()]); - } - - -} - diff --git a/data-access/servlet/src/main/java/vlkb/amqp/JsonDecoder.java b/data-access/servlet/src/main/java/vlkb/amqp/JsonDecoder.java deleted file mode 100644 index 2d435a66052804acdb6c4b7e82a7c71e57f6d39d..0000000000000000000000000000000000000000 --- a/data-access/servlet/src/main/java/vlkb/amqp/JsonDecoder.java +++ /dev/null @@ -1,87 +0,0 @@ - -import java.util.logging.Logger; -import java.util.logging.Level; -import java.util.Iterator; -/* 'JSON-Simple' library */ -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; - - - -// Engine --> Servlet -// -// 1. exception (decoded inside 2 3) -// -// 2. response from cutout: struct cutout_res_s --> CutResult -// .filesize -// .filename -// .nullval_count : {fill_ratio null_count total_count} -// -// NOTE: MCutout moved to -> dacc/JdlMCutout.java -// 3. response from mcutout: struct mcutout_res_s --> MCutResult -// .filesize -// .tgz_filename -// .cut_resp_s[] : {cut_param_s content_type content} -// -// cut_param_s : {pubdid, coordonates, bool-countNullVals, filename, hdunum, cards[]} - - - -public class JsonDecoder -{ - static final Logger LOGGER = Logger.getLogger(JsonDecoder.class.getName()); - - public static CutResult responseFromCutoutJson(String response) - { - CutResult cut = new CutResult(); - - try - { - JSONParser parser = new JSONParser(); - Object jsonObj = parser.parse(response); - JSONObject jsonObject = (JSONObject) jsonObj; - - JSONObject jexcept = (JSONObject) jsonObject.get("exception"); - if(jexcept != null) - { - String type = (String)jexcept.get("type"); - String msg = (String)jexcept.get("msg"); - if(type.equals("INVALID_PARAM")) - { - throw new IllegalArgumentException(msg); - } - else if(type.equals("SYSTEM_ERROR")) - { - throw new IllegalStateException("Internal system error."); - } - } - else - { - long fileSize = (long) jsonObject.get("filesize"); - String fileName = (String) jsonObject.get("filename"); - - JSONObject jnvc = (JSONObject)jsonObject.get("nullvals_count"); - double fillRatio = (double) jnvc.get("fillratio"); - long null_count = (long) jnvc.get("nullcount"); - long total_count = (long) jnvc.get("totalcount"); - - cut.fileSize = fileSize; - cut.fileName = fileName; - cut.nullValueCount.percent = fillRatio; - cut.nullValueCount.nullCount = null_count; - cut.nullValueCount.totalCount = total_count; - } - } - catch (ParseException e) - { - LOGGER.severe(e.getMessage()); - e.printStackTrace(); - throw new IllegalStateException("Internal system error."); - } - - return cut; - } -} - diff --git a/data-access/servlet/src/main/java/vlkb/amqp/JsonEncoderMerge.java b/data-access/servlet/src/main/java/vlkb/amqp/JsonEncoderMerge.java deleted file mode 100644 index 454d6248ce4d7bd29a66ec42556772a24ee21275..0000000000000000000000000000000000000000 --- a/data-access/servlet/src/main/java/vlkb/amqp/JsonEncoderMerge.java +++ /dev/null @@ -1,258 +0,0 @@ - -import java.util.Iterator; -/* 'JSON-Simple' library */ -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; - -import vo.parameter.*; - -public class JsonEncoderMerge -{ - - - @SuppressWarnings("unchecked") - private static JSONObject coordinatesToJsonObj(Coord coord) - { - JSONObject obj = new JSONObject(); - - obj.put("skysystem", coord.skySystem); - obj.put("shape", coord.pos.shape.toString()); - obj.put("specsystem", coord.specSystem); - - /* SODA */ - - if(coord.pos != null) - { - obj.put("pos", objJPos(coord.pos)); - } - - if(coord.band != null) - { - obj.put("band", arrJBand(coord.band)); - } - - if(coord.time != null) - { - obj.put("time", genTimeJsonArr(coord.time) ); - } - - if(coord.pol != null) - { - obj.put("pol", genPolJsonArr(coord.pol) ); - } - - return obj; - } - - - private static JSONObject objJCircle(Circle circle) - { - JSONObject obj = new JSONObject(); - obj.put("lon", circle.lon); - obj.put("lat", circle.lat); - obj.put("radius", circle.radius); - return obj; - } - - private static JSONObject objJRange(Range range) - { - JSONObject obj = new JSONObject(); - obj.put("lon1", range.lon1); - obj.put("lon2", range.lon2); - obj.put("lat1", range.lat1); - obj.put("lat2", range.lat2); - return obj; - } - - private static JSONObject objJPolygon(Polygon poly) - { - JSONObject obj = new JSONObject(); - obj.put("lon", genPolyLonJsonArr(poly)); - obj.put("lat", genPolyLatJsonArr(poly)); - return obj; - } - - private static JSONObject objJPos(Pos pos) - { - JSONObject obj = new JSONObject(); - if(pos.circle != null) obj.put("circle", objJCircle(pos.circle)); - if(pos.range != null) obj.put("range", objJRange(pos.range)); - if(pos.polygon != null) obj.put("polygon", objJPolygon(pos.polygon)); - return obj; - } - - private static JSONArray arrJBand(Band band) - { - JSONArray arr = new JSONArray(); - arr.add(band.min); - arr.add(band.max); - return arr; - } - - private static JSONArray genTimeJsonArr(Time time) - { - JSONArray arr = new JSONArray(); - arr.add(time.min); - arr.add(time.max); - return arr; - } - - private static JSONArray genPolyLonJsonArr(Polygon polygon) - { - JSONArray jarr = new JSONArray(); - for(double dbl : polygon.lon) jarr.add(dbl); - return jarr; - } - private static JSONArray genPolyLatJsonArr(Polygon polygon) - { - JSONArray jarr = new JSONArray(); - for(double dbl : polygon.lat) jarr.add(dbl); - return jarr; - } - - - - private static JSONArray genPolJsonArr(Pol pol) - { - JSONArray jarr = new JSONArray(); - for(String str : pol.states) jarr.add(str); - return jarr; - } - - - private static JSONArray extraCardsToJson(FitsCard[] extraCards) - { - JSONArray jcards = new JSONArray(); - for(FitsCard card : extraCards) - { - //jcards.add(card); FIXME check what would this add; compiler did not complain - - JSONObject jcard = new JSONObject(); - jcard.put("key", card.key); - jcard.put("value", card.value); - jcard.put("comment", card.comment); - - jcards.add(jcard); - } - return jcards; - } - - - - - - @SuppressWarnings("unchecked") - public static String subimgToJson( - String imgPathname, - int imgHdunum, - Coord coord, - String subimgFilename, - FitsCard[] extraCards, - boolean countNullValues) - { - JSONObject obj = new JSONObject(); - - obj.put("service", "SUBIMG"); - - obj.put("img_pathname", imgPathname); - obj.put("img_hdunum", imgHdunum); - obj.put("coordinates", coordinatesToJsonObj(coord)); - obj.put("subimg_filename", subimgFilename); - - if((extraCards != null) && (extraCards.length > 0)) - { - obj.put("extra_cards", extraCardsToJson(extraCards)); - } - - obj.put("count_null_values", countNullValues); - - return obj.toJSONString(); - } - - - - @SuppressWarnings("unchecked") - public static String mergefilesToJson( - String dimensionality, - String[] filestomerge ) - { - JSONArray fnames = new JSONArray(); - for(String fn : filestomerge){ - fnames.add(fn); - } - - JSONObject obj = new JSONObject(); - obj.put("service", "MERGEF"); - obj.put("dimensionality", dimensionality); - obj.put("files_to_merge", fnames); - - return obj.toJSONString(); - } - - - - // BEGIN merge-parallel - - @SuppressWarnings("unchecked") - public static String mergefilesCommonHeaderToJson( - String jobId, - String dimensionality, - String[] filestomerge ) - { - JSONObject objParameters = new JSONObject(); - objParameters.put("merge_id", jobId); - objParameters.put("dimensionality", dimensionality); - - JSONArray fnames = new JSONArray(); - for(String fn : filestomerge){ - fnames.add(fn); - } - objParameters.put("files_to_merge", fnames); - - JSONObject obj = new JSONObject(); - obj.put("service", "MERGE1"); // MERGE phase 1: create common header - obj.put("parameters", objParameters); - - - return obj.toJSONString(); - } - - @SuppressWarnings("unchecked") - public static String mergefilesReprojectToJson( - String jobId, - String dimensionality, - String fitsFileName) - { - JSONObject objParameters = new JSONObject(); - objParameters.put("merge_id", jobId); - objParameters.put("dimensionality", dimensionality); - objParameters.put("fits_filename", fitsFileName); - - JSONObject obj = new JSONObject(); - obj.put("service", "MERGE2"); // MERGE phase 2: reproject one fitsfile - obj.put("parameters", objParameters); - - return obj.toJSONString(); - } - - @SuppressWarnings("unchecked") - public static String mergefilesAddReprojectedToJson( - String jobId, - String dimensionality ) - { - JSONObject objParameters = new JSONObject(); - objParameters.put("merge_id", jobId); - objParameters.put("dimensionality", dimensionality); - - JSONObject obj = new JSONObject(); - obj.put("service", "MERGE3"); // MERGE phase 3: add all reprojected files - obj.put("parameters", objParameters); - - return obj.toJSONString(); - } - - // END merge-parallel - - -} - diff --git a/data-access/servlet/src/main/java/vlkb/amqp/RpcOverAmqp.java b/data-access/servlet/src/main/java/vlkb/amqp/RpcOverAmqp.java deleted file mode 100644 index 8aa1b1ad2144f8a8561a0a381ecd62657f4612cf..0000000000000000000000000000000000000000 --- a/data-access/servlet/src/main/java/vlkb/amqp/RpcOverAmqp.java +++ /dev/null @@ -1,177 +0,0 @@ - -// At each vlkb-request: -// establish "connection" to RabbitMQ-broker (host:port) on autogenerated "channel" as user ??? -// then using this connection:channel do: -// * create a reply queue with autogenerated name -// * start a consumer on this queue -// generate a message with properties: corrId & reply-queue -// * publish the message to the pre-defined "amq.direct" exchange with routingKey from config -// * start waiting on reply-queue for next delivery -// -// It is admins responsibility to configure routingKey in Java-client (see Settings) to the -// same value as queuename used starting vlkbd to ensure delivery -// of vlkb-requests from Exchange to the correct queue - - -import java.util.logging.Logger; - -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.QueueingConsumer; -import com.rabbitmq.client.AMQP.BasicProperties; -import java.util.UUID; - - -public class RpcOverAmqp -{ - private static final Logger LOGGER = Logger.getLogger(RpcOverAmqp.class.getName()); - - private final boolean NO_ACK = true; - // affects message consume from queue: - // broker will remove msg right after delivery without waiting for confirmation - // improves performance on expense of reliability: - - private String userName = "guest"; - private String password = "guest"; - private String hostName; - private int portNumber; - private String routingKey; - - private Connection connection; - private Channel channel; - private String replyQueueName; - private QueueingConsumer consumer; - - private int channelNumber; - - - public static String doRpc(Settings.AmqpConn amqpConn, String InStr) - { - LOGGER.fine("trace"); - - final String userName = "guest"; - final String password = "guest"; - // FIXME move these to Settings - - RpcOverAmqp rpc = new RpcOverAmqp( - userName, password, - amqpConn.hostName(), - amqpConn.portNumber(), - amqpConn.routingKey()); - - rpc.initConnectionAndReplyQueue(); - - String OutStr = null; - - try - { - LOGGER.finer("Sent request : " + InStr); - OutStr = rpc.callAndWaitReply(InStr); - LOGGER.finer("Got response : " + OutStr); - } - catch (Exception e) - { - LOGGER.severe("Exception: " + e.getMessage()); - } - finally - { - try - { - rpc.close(); - } - catch (Exception ignore) - { - LOGGER.finer("ignoring exception on rpc.close():" + ignore.getMessage()); - } - } - - return OutStr; - } - - RpcOverAmqp(String userName, String password, String hostName, int portNumber, String routingKey) - { - this.userName = userName; - this.password = password; - this.hostName = hostName; - this.portNumber = portNumber; - this.routingKey = routingKey; - } - - - - public void initConnectionAndReplyQueue() - { - try - { - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(hostName); - factory.setPort(portNumber); - - factory.setUsername(userName); - factory.setPassword(password); - - connection = factory.newConnection(); - channel = connection.createChannel(); - - channelNumber = channel.getChannelNumber(); - - replyQueueName = channel.queueDeclare().getQueue(); - consumer = new QueueingConsumer(channel); - - // Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag. - channel.basicConsume(replyQueueName, NO_ACK, consumer); - } - catch(Exception e) - { - LOGGER.severe("Exception: " + e.getMessage()); - } - } - - - - public String callAndWaitReply(String message) throws Exception { - String response = null; - String corrId = UUID.randomUUID().toString(); - - BasicProperties props = new BasicProperties - .Builder() - .correlationId(corrId) - .replyTo(replyQueueName) - .build(); - - // send rpc params and where to reply (reply-queue & corrId) - - channel.basicPublish("", routingKey, props, message.getBytes("UTF-8")); - //channel.basicPublish("amq.direct", routingKey, props, message.getBytes("UTF-8")); - - // wait for reply msg and return if corrId matches - - while (true) - { - - QueueingConsumer.Delivery delivery = consumer.nextDelivery(); - - LOGGER.finest("CorrId sent[" + channelNumber + "]: " + delivery.getProperties().getCorrelationId() - + "\nCorrId recv: " + corrId - + "\nreplyQueueName: " + replyQueueName); - - if (delivery.getProperties().getCorrelationId().equals(corrId)) - { - response = new String(delivery.getBody(),"UTF-8"); - break; - } - } - - return response; - } - - - - public void close() throws Exception - { - connection.close(); - } - -} - diff --git a/data-access/servlet/src/main/java/vlkb/webapi/UWSMerge.java b/data-access/servlet/src/main/java/vlkb/webapi/UWSMerge.java deleted file mode 100644 index 3e4eae098e1005e8ef2e6dbdcc0f3140704e248e..0000000000000000000000000000000000000000 --- a/data-access/servlet/src/main/java/vlkb/webapi/UWSMerge.java +++ /dev/null @@ -1,99 +0,0 @@ -import java.io.IOException; -import java.io.PrintWriter; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import uws.UWSException; -import uws.job.ErrorType; -import uws.job.JobList; -import uws.job.JobThread; -import uws.job.UWSJob; -import uws.job.parameters.InputParamController; -import uws.job.parameters.NumericParamController; -import uws.job.parameters.StringParamController; -import uws.job.user.JobOwner; -import uws.service.UWSServlet; -import uws.service.UWSUrl; - -public class UWSMerge extends UWSServlet { - private static final long serialVersionUID = 1L; - - public static final Settings settings = Settings.getInstance(); - - /* REQUIRED - * Initialize your UWS. At least, you should create one jobs list. */ - @Override - public void initUWS() throws UWSException{ - addJobList(new JobList("merges")); - - addExpectedAdditionalParameter("surveyname"); - addExpectedAdditionalParameter("species"); - addExpectedAdditionalParameter("transition"); - - addExpectedAdditionalParameter("pubdid"); - //setInputParamController("pubdid", new StringParamController("pubdid")); - - addExpectedAdditionalParameter("l"); - addExpectedAdditionalParameter("b"); - addExpectedAdditionalParameter("r"); - addExpectedAdditionalParameter("dl"); - addExpectedAdditionalParameter("db"); - - addExpectedAdditionalParameter("vl"); - addExpectedAdditionalParameter("vu"); - addExpectedAdditionalParameter("vt"); - - addExpectedAdditionalParameter("BAND"); - addExpectedAdditionalParameter("BANDSYS"); - addExpectedAdditionalParameter("POS"); - addExpectedAdditionalParameter("POSSYS"); - addExpectedAdditionalParameter("TIME"); - addExpectedAdditionalParameter("POL"); - addExpectedAdditionalParameter("PIXELS"); - - /* setInputParamController("l", new NumericParamController()); - setInputParamController("b", new NumericParamController()); - setInputParamController("r", new NumericParamController()); - setInputParamController("dl", new NumericParamController()); - setInputParamController("db", new NumericParamController()); - setInputParamController("vu", new NumericParamController()); - setInputParamController("vl", new NumericParamController()); - setInputParamController("vt", new StringParamController("1", "1", new String[]{"1","2"}, false)); - */ // FIXME replace "1" "2" with proper spectral axis names - } - - /* - * REQUIRED - * Create instances of jobs, but only the "work" part. The "work" and the description of the job (and all the provided parameters) - * are now separated and only kept in the UWSJob given in parameter. This one is created automatically by the API. - * You just have to provide the "work" part. - */ - @Override - public JobThread createJobThread(UWSJob job) throws UWSException{ - if (job.getJobList().getName().equals("merges")) - return new UWSMergeWork(job); - else - throw new UWSException("Impossible to create a job inside the jobs list \"" + job.getJobList().getName() + "\" !"); - } - - /* OPTIONAL - * By overriding this function, you can customize the root of your UWS. - * If this function is not overridden an XML document which lists all registered jobs lists is returned. */ - @Override - protected void writeHomePage(UWSUrl requestUrl, HttpServletRequest req, HttpServletResponse resp, JobOwner user) throws UWSException, ServletException, IOException{ - PrintWriter out = resp.getWriter(); - - out.println("<html><head><title>VLKB Merge service (by UWS version 4)</title></head><body>"); - out.println("<h1>VLKB Merge service</h1"); - out.println("<p>Available job lists:</p>"); - - out.println("<ul>"); - for(JobList jl : this){ - out.println("<li>" + jl.getName() + " - " + jl.getNbJobs() + " jobs - <a href=\"" + requestUrl.listJobs(jl.getName()) + "\">" + requestUrl.listJobs(jl.getName()) + "</a></li>"); - } - out.println("</ul>"); - } - -} diff --git a/data-access/servlet/src/main/java/vlkb/webapi/UWSMergeWork.java b/data-access/servlet/src/main/java/vlkb/webapi/UWSMergeWork.java deleted file mode 100644 index c8d7b916dfacf42e7d9ee3b0971d84cdd7fc81bf..0000000000000000000000000000000000000000 --- a/data-access/servlet/src/main/java/vlkb/webapi/UWSMergeWork.java +++ /dev/null @@ -1,184 +0,0 @@ -import java.io.PrintWriter; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.BufferedOutputStream; -import javax.servlet.ServletOutputStream; -import java.io.IOException; -import java.io.FileNotFoundException; -import java.io.File; -import java.io.FileInputStream; - -import uws.UWSException; -import uws.job.ErrorType; -import uws.job.JobThread; -import uws.job.Result; -import uws.job.UWSJob; -import uws.service.UWSUrl; - -/* for datasets::doAction */ -import java.security.Principal; -import java.util.Map; -import java.util.HashMap; -import java.util.Set; -import java.util.List; -import java.util.ArrayList; - -import vo.parameter.*; - -public class UWSMergeWork extends JobThread -{ - private Settings settings = UWSMerge.settings; - - final String RESPONSE_ENCODING = "utf-8"; - - protected Vlkb vlkb = ( settings.amqpConn.isHostnameEmpty() ? new VlkbCli(settings): new VlkbAmqp(settings) ); - - /* NOTE needed if cutouts dir served by vlkb-datasets */ - private String webappRootRequestUrl = null; - - public UWSMergeWork(UWSJob j) throws UWSException - { - super(j); - UWSUrl url = j.getUrl(); - webappRootRequestUrl = url.getUrlHeader(); - } - - - /* FIXME in UWS howto result.setSize(size); */ - - @Override - protected void jobWork() throws UWSException, InterruptedException - { - try - { - long startTime_msec = System.currentTimeMillis(); - boolean showDuration = true; - - Map<String, String[]> params = collectSodaParams(job); - - String id = SingleStringParam.parseSingleStringParam(params, "ID"); - Pos pos = Pos.parsePos(params); - Band band = Band.parseBand(params); - Time time = Time.parseTime(params); - Pol pol = null;// FIXME Pol.parsePol(params); - String pixels = SingleStringParam.parseSingleStringParam(params, "PIXELS"); - - /* if(parser.sodaReq_hasSodaId()) - { - id = parser.sodaReq_getId(); - pos = parser.sodaReq_getPosCirclePolygon(); - band = parser.sodaReq_getBand(); - time = parser.sodaReq_getTime(); - pol = parser.sodaReq_getPol(); - } - else - { - id = parser.vlkbReq_getPubdid(); - pos = parser.vlkbReq_getCircleRect(); - band = parser.vlkbReq_getVelocity(); - } - */ - Coord coord = new Coord(pos, band, time, pol, pixels); - - CutResult cutResult = vlkb.doMerge(parseLegacyPubdidArr(id), coord, false);//countNullValues); - - final String respFormat = "text/xml";// FIXME read from param RESPONSEFORMAT ? - - - String contentType = respFormat; - String respEncoding = RESPONSE_ENCODING; - Result result = createResult("Report"); - result.setMimeType(respFormat); - OutputStream respOutputStream = getResultOutput(result); - - if(contentType.equals("text/xml") || contentType.equals("application/xml")) - { - PrintWriter writer = new PrintWriter(new OutputStreamWriter(respOutputStream, respEncoding)); - - String accessUrl = convertLocalPathnameToRemoteUrl(cutResult.fileName, - settings.fitsPaths.cutouts(), - settings.fitsPaths.cutoutsUrl()); - - XmlSerializer.serializeToLegacyCutResult(writer, RESPONSE_ENCODING, - cutResult, accessUrl, - id, pos, band, time, pol, pixels, false,//countNullValues, - showDuration, startTime_msec); - - writer.close(); - } - else if(contentType.equals("application/fits")) - { - String absCutPathname = cutResult.fileName; - File downloadFile = new File(absCutPathname); - FileInputStream input = new FileInputStream(downloadFile); - input.transferTo(respOutputStream); - downloadFile.delete(); - } - else - { - throw new AssertionError("Unsupported contentType for output: " + contentType); - } - - /* publishResult(result);*/ - respOutputStream.close(); - } - catch(IllegalArgumentException ex) - { - throw new UWSException(UWSException.BAD_REQUEST, ex.getMessage()); - } - catch(FileNotFoundException ex) - { - throw new UWSException(UWSException.BAD_REQUEST, ex.getMessage()); - } - catch(IOException ex) - { - throw new UWSException(UWSException.BAD_REQUEST, ex.getMessage()); - } - } - - - - - - private String convertLocalPathnameToRemoteUrl(String localPathname, - String FITScutpath, String FITSRemoteUrlCutouts) - { - //LOGGER.fine("trace " + localPathname); - String fileName = localPathname.replaceAll(FITScutpath + "/", ""); - //LOGGER.finer("local filename: " + fileName); - String remotefname = FITSRemoteUrlCutouts + "/" + fileName; - //LOGGER.finer("remote url : " + remotefname); - return remotefname; - } - - - - - /* semi-colon separated list of pudids convert to arra */ - private String[] parseLegacyPubdidArr(String pubdids) - { - List<String> pubdidList = new ArrayList<String>(); - String[] pdArr = pubdids.split(";"); - for(String pd : pdArr) - if(pd.length() > 0) pubdidList.add(pd); - - String[] pubdidArr = new String[pubdidList.size()]; - - return pubdidList.toArray(pubdidArr); - } - - private Map<String, String[]> collectSodaParams(UWSJob job) - { - Map<String, String[]> params = new HashMap<String, String[]>(); - String[] paraTokens = {"skysystem","specsystem","pubdid","l","b","r","dl","db","vl","vu","vt","ID","POSSYS","BANDSYS","POS", "BAND", "TIME", "POL", "PIXELS"}; - for(String paramToken : paraTokens) - { - String[] paramValue = new String[1]; - paramValue[0] = (String)job.getAdditionalParameterValue(paramToken); - params.put(paramToken, paramValue); - } - return params; - } - -} - diff --git a/docker/Makefile b/docker/Makefile index ee4707e9e4377829dbd3954cdf1573b5dbd3514f..50366e0d46f5b846ae62aae5529f582203e60cc7 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -62,12 +62,12 @@ publish-remotely-to-ska-harbor: # @echo "REMOTE_SODA_IMAGE_NAME : "$(REMOTE_SODA_IMAGE_NAME) ############################################################################### -download-all: vlkb-soda vlkbd vlkb-obscore vlkb +download-all: vlkb-soda vlkb-obscore vlkb vlkb-soda: make download PACK_FILE=$@-$(VERSION).war -vlkbd vlkb-obscore vlkb: +vlkb-obscore vlkb: make download PACK_FILE=$@-$(VERSION).rpm make download PACK_FILE=$@-$(VERSION).deb