Skip to content
Snippets Groups Projects
Commit fb2a84d4 authored by Robert Butora's avatar Robert Butora
Browse files

soda: removes Amqp & merge

parent 4f76dc9e
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 1609 deletions
......@@ -33,5 +33,4 @@ upload-war-deb:
curl --header "PRIVATE-TOKEN: glpat-JhqpFhEGvxuVzHqxjwqx" --upload-file vlkb-soda-$(VERSION).war https://ict.inaf.it/gitlab/api/v4/projects/1780/packages/generic/vlkb-soda/1.7/vlkb-soda-$(VERSION).war
curl --header "PRIVATE-TOKEN: glpat-JhqpFhEGvxuVzHqxjwqx" --upload-file vlkb-$(VERSION).deb https://ict.inaf.it/gitlab/api/v4/projects/1780/packages/generic/vlkb-soda/1.7/ubuntu22/vlkb-$(VERSION).deb
curl --header "PRIVATE-TOKEN: glpat-JhqpFhEGvxuVzHqxjwqx" --upload-file vlkb-obscore-$(VERSION).deb https://ict.inaf.it/gitlab/api/v4/projects/1780/packages/generic/vlkb-soda/1.7/ubuntu22/vlkb-obscore-$(VERSION).deb
curl --header "PRIVATE-TOKEN: glpat-JhqpFhEGvxuVzHqxjwqx" --upload-file vlkbd-$(VERSION).deb https://ict.inaf.it/gitlab/api/v4/projects/1780/packages/generic/vlkb-soda/1.7/ubuntu22/vlkbd-$(VERSION).deb
PREFIX ?= /usr/local
AMQP_QUEUE ?= vlkbdevel
INST_PATH ?= $(PREFIX)
VERSION ?= $(shell git describe)
......@@ -13,14 +12,12 @@ all : build
.PHONY: build
build:
make -C src/common VERSION=$(VERSION)
make -C src/vlkbd VERSION=$(VERSION)
make -C src/vlkb-obscore VERSION=$(VERSION)
make -C src/vlkb VERSION=$(VERSION)
.PHONY: clean
clean:
make -C src/common clean
make -C src/vlkbd clean
make -C src/vlkb-obscore clean
make -C src/vlkb clean
......@@ -31,15 +28,11 @@ install:
mkdir -p $(INST_PATH)/bin
install ./src/vlkb/bin/vlkb $(INST_PATH)/bin
install ./src/vlkb-obscore/bin/vlkb-obscore $(INST_PATH)/bin
install ./src/vlkbd/bin/vlkbd $(INST_PATH)/bin
install vlkbd_exec.sh $(INST_PATH)/bin
.PHONY: uninstall
uninstall:
rm -f $(INST_PATH)/bin/vlkb
rm -f $(INST_PATH)/bin/vlkb-obscore
rm -f $(INST_PATH)/bin/vlkbd
rm -f $(INST_PATH)/bin/vlkbd_exec.sh
......@@ -53,25 +46,8 @@ vlkb-devel: stop uninstall clean build install config start
.PHONY: config
config:
mkdir -p $(INST_PATH)/etc/vlkb-obscore
mkdir -p $(INST_PATH)/etc/vlkbd
cp config/vlkb-obscore.datasets.conf $(INST_PATH)/etc/vlkb-obscore/datasets.conf
cp config/vlkbd.datasets.conf $(INST_PATH)/etc/vlkbd/datasets.conf
.PHONY: start
start:
vlkbd_exec.sh localhost $(AMQP_QUEUE) $(INST_PATH)/etc/vlkbd/datasets.conf
.PHONY: stop
stop:
pkill -f 'vlkbd.* $(AMQP_QUEUE).*$(INST_PATH)/etc/vlkbd/datasets.conf'
.PHONY: status
status:
ps ax | grep vlkbd
.PHONY: reload
reload: stop start
#================================================================================
EXEC_NAME = vlkbd
VERSION ?= $(shell git describe)
BUILD_ ?= $(shell LANG=us_US date; hostname)
#================================================================================
DEPS_DIR := ../common ../../ext/aria-csv ../../ext/nlohmann-json
DEPS_INC := $(foreach d, $(DEPS_DIR), $d/include)
DEPS_LIB := $(foreach d, $(DEPS_DIR), $d/lib)
#================================================================================
COMMON_DIR=../common
COMMON_LIB = $(COMMON_DIR)/lib/libvlkbcommon.a
#================================================================================
INC_DIR=src $(DEPS_INC) $(COMMON_DIR)/include ../../ext \
/usr/include/cfitsio \
/usr/include/postgresql
LIB_DIR= $(DEPS_LIB) $(COMMON_DIR)/lib /usr/lib64/ast /usr/local/lib
#================================================================================
CC=g++
CXX_DEBUG_FLAGS=-g -DFDB_DEBUG
CXX_RELEASE_FLAGS=-O2
CXX_DEFAULT_FLAGS=-c -x c++ -std=c++11 -fPIC -Wall -Wextra -Wconversion -fno-common -DVERSIONSTR='"$(VERSION)"' -DBUILD='"$(BUILD_)"'
# FIXME: -last_pal missing in some builds (not realluy needed only for linker)
LDFLAGS = -Wall -lvlkbcommon -lpq -lpqxx -lcfitsio -lrabbitmq -last -last_grf_2.0 -last_grf_3.2 -last_grf_5.6 -last_grf3d -last_err -pthread -lstdc++ -lm
INC_PARM=$(foreach d, $(INC_DIR), -I$d)
LIB_PARM=$(foreach d, $(LIB_DIR), -L$d)
#================================================================================
SRC_DIR=src
OBJ_DIR=obj
BIN_DIR=bin
#================================================================================
EXECUTABLE := $(BIN_DIR)/$(EXEC_NAME)
CPP_FILES := $(wildcard $(SRC_DIR)/*.cpp)
OBJ_FILES := $(addprefix $(OBJ_DIR)/,$(notdir $(CPP_FILES:.cpp=.o)))
#================================================================================
NPROCS = $(shell grep -c 'processor' /proc/cpuinfo)
MAKEFLAGS += -j$(NPROCS)
#================================================================================
.PHONY: all
all : debug
.PHONY: release
release: CXXFLAGS+=$(CXX_RELEASE_FLAGS) $(CXX_DEFAULT_FLAGS)
release: $(EXECUTABLE)
.PHONY: debug
debug: CXXFLAGS+=$(CXX_DEBUG_FLAGS) $(CXX_DEFAULT_FLAGS)
debug: $(EXECUTABLE)
$(EXECUTABLE) : $(COMMON_LIB) makedir $(OBJ_FILES)
$(CC) $(OBJ_FILES) $(LIB_PARM) $(LDFLAGS) -o $@
$(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp
$(CC) $(CXXFLAGS) $(INC_PARM) -o $@ $<
.PHONY: $(COMMON_LIB)
$(COMMON_LIB) :
make -C $(COMMON_DIR)
.PHONY: makedir
makedir:
-mkdir -p $(OBJ_DIR) $(BIN_DIR)
.PHONY: clean
clean :
-rm -fr $(OBJ_DIR) $(BIN_DIR)
.PHONY: test
test :
@tabs 20
@echo -e "EXEC_NAME:\t" $(EXEC_NAME)
@echo -e "VERSION:\t" $(VERSION)
@echo -e "CPP_FILES:\t" $(CPP_FILES)
@echo -e "OBJ_FILES:\t" $(OBJ_FILES)
@echo -e "C_FILES:\t" $(C_FILES)
@echo -e "C_OBJ_FILES:\t" $(C_OBJ_FILES)
@echo -e "INC_PARM:\t" $(INC_PARM)
@echo -e "LIB_PARM:\t" $(LIB_PARM)
#include "io.hpp"
#include "config.hpp"
#include <iostream>
#include <fstream>
#include <sstream>
#include <map>
/*/ C
#include <stdio.h>
#include <stdlib.h> // atoi needed
#include <string.h>
*/
using namespace std;
void config::read_config(const std::string & settings_path)
{
std::ifstream settings_file(settings_path);
std::string line;
LOG_STREAM << "config::read_config()" << endl;
if (settings_file.fail())
{
LOG_STREAM << "config file does not exist. Default options used." << endl;
return;
}
while (std::getline(settings_file, line))
{
std::istringstream iss(line);
std::string id, eq, val;
if (std::getline(iss, id, '='))
{
if (std::getline(iss, val))
{
if (m_settings.find(id) != m_settings.end())
{
if (val.empty())
{
LOG_STREAM << "config " << id.c_str()
<< " is empty. Keeping default " << m_settings[id].c_str() << endl;
}
else
{
m_settings[id] = val;
LOG_STREAM << "config " << id.c_str()
<<" read as " << m_settings[id].c_str() << endl;
}
}
else
{
//Not present in map
LOG_STREAM << "Setting "<< id.c_str() << " not defined, ignoring it" << endl;
continue;
}
}
else
{
// Comment line, skiping it
continue;
}
}
else
{
//Empty line, skipping it
continue;
}
}
}
#ifndef CONFIG_HPP
#define CONFIG_HPP
#include <string>
#include <map>
class config
{
public:
void read_config(const std::string & settings_path);
std::string getLogDir() const {return m_settings.at(log_dir);}
std::string getLogFileName() const {return m_settings.at(log_filename);}
// std::string getAuthority() const {return m_settings.at(ivoid_authority);}
// std::string getResourceKey() const {return m_settings.at(ivoid_resource_key);}
// std::string getObsCorePublisher() const {return std::string{"ivo://"} + getAuthority() + std::string{"/"} + getResourceKey();}
// std::string getObsCoreAccessFormat() const {return m_settings.at(obscore_access_format);}
// std::string getRemoteFitsDir() const {return m_settings.at(fits_url);}
std::string getFitsDir() const {return m_settings.at(fits_dir);}
std::string getFitsCutDir() const {return m_settings.at(fits_cutdir);}
/*
std::string getDbms() const {return m_settings.at(db_dbms);}
std::string getDbHostName() const {return m_settings.at(db_host_name);}
std::string getDbPort() const {return m_settings.at(db_port);}
std::string getDbSchema() const {return m_settings.at(db_schema);}
std::string getDbName() const {return m_settings.at(db_name);}
std::string getDbUserName() const {return m_settings.at(db_user_name);}
std::string getDbPassword() const {return m_settings.at(db_password);}
std::string getDbUri(bool with_password = false) const
{return
m_settings.at(db_dbms)
+ "://"
+ m_settings.at(db_user_name)
+ (with_password ? ":"+m_settings.at(db_password) : "")
+ "@"
+ m_settings.at(db_host_name)
+ ":"
+ m_settings.at(db_port)
+ "/"
+ m_settings.at(db_name);
}
std::string getDbPostgresConnectString(bool with_password = false) const
{return
"dbname = " + m_settings.at(db_name)
+ " port = " + m_settings.at(db_port)
+ " host = " + m_settings.at(db_host_name)
+ " user = " + m_settings.at(db_user_name)
+ (with_password ? " password = " + m_settings.at(db_password) : "")
+ " options=\'-c search_path=" + m_settings.at(db_schema) + "\'";
}
*/
private:
std::string value(std::string key) {return m_settings.at(key);}
const std::string fits_dir{"fits_path_surveys"};
const std::string fits_cutdir{"fits_path_cutouts"};
const std::string log_dir{"log_dir"};
const std::string log_filename{"log_filename"};
// const std::string fits_url{"fits_url_surveys"};
/* const std::string ivoid_authority{"ivoid_authority"};
const std::string ivoid_resource_key{"ivoid_resource_key"};
const std::string obscore_access_format{"obscore_access_format"};
const std::string db_dbms{"db_dbms"};
const std::string db_host_name{"db_host_name"};
const std::string db_port{"db_port"};
const std::string db_user_name{"db_user_name"};
const std::string db_password{"db_password"};
const std::string db_name{"db_name"};
const std::string db_schema{"db_schema"};
*/
//-------------------------------------------------
// defaults
//-------------------------------------------------
const std::string empty_string;
std::map<const std::string, std::string> m_settings
{
{fits_dir, "/srv/surveys"},
{fits_cutdir, "/srv/cutouts"},
{log_dir, "/tmp"},
{log_filename, "vlkbd.log"},
/*
{ivoid_authority, empty_string},
{ivoid_resource_key, empty_string},
{obscore_access_format, "application/fits"},
{fits_url, empty_string},
{db_dbms, empty_string},
{db_host_name, empty_string},
{db_port, empty_string},
{db_user_name, empty_string},
{db_password, empty_string},
{db_name, empty_string},
{db_schema, empty_string}
*/
};
};
#endif
#include "json_reply.hpp"
#include "json.hpp"
#include "io.hpp"
#include <vector>
#include <string>
#include <stdexcept>
#include <stdlib.h>
#include <string.h>
#include <sched.h> // sched_getcpu()
using namespace std;
const string ENGINE_VERSION{"engine version " + string(VERSIONSTR) + " " + string(BUILD) + " on CPU#"+to_string(sched_getcpu())};
#include "cutout.hpp"
#include "cutout_nljson.hpp"
#include "mcutout.hpp"
#include "mcutout_nljson.hpp"
#include "json.hpp"
#include <vector>
#include <string>
/* All nlohmann-json exception are json::exception <- std::exception.
* So let them be caught by std::excpetion as 'Internal errors' in rpc-call's infinite loop,
* assuming all API syntactic errors were caught in servlet API parser */
class json_reply
{
public:
json_reply() {j = nlohmann::json::object();};
void put_cutout_result(cutout_res_s res) { j = res; };
void put_mcutout_result(mcutout_res_s res) { j = res; };
std::string json_str() { return j.dump();};
private:
nlohmann::json j;
};
#include "json_request.hpp"
#include "io.hpp"
#include "cutout.hpp"
#include "cutout_nljson.hpp"
#include "mcutout.hpp"
#include "mcutout_nljson.hpp"
#include <stdexcept>
#include "json.hpp"
#include <iostream>
using json = nlohmann::json;
const bool ASSERTS = true;
using namespace std;
NLOHMANN_JSON_SERIALIZE_ENUM( service, {
{MCUTOUT,"MCUTOUT"},
{MERGEF, "MERGEF"},
{MERGE1, "MERGE1"},
{MERGE2, "MERGE2"},
{MERGE3, "MERGE3"},
{SUBIMG, "SUBIMG"}
});
json_request::json_request(string request_json)
{
LOG_trace(__func__);
m_jservice = json::parse(request_json, nullptr, ASSERTS);
m_service = m_jservice.at("service");
LOG_STREAM << m_jservice.dump() << endl;
}
struct coordinates json_request::coordinates() {return m_jservice.at("coordinates");}
vector<struct cut_param_s> json_request::cut_params()
{
LOG_trace(__func__);
vector<struct cut_param_s> cut_pars;
json cuts = m_jservice.at("cuts");
cut_pars = cuts.get<std::vector<struct cut_param_s>>();
return cut_pars;
}
std::vector<struct fits_card> json_request::extra_cards()
{
vector<struct fits_card> cards;
if(m_jservice.contains("extra_cards"))
{
json jcards = m_jservice.at("extra_cards");
cards = jcards.get<std::vector<struct fits_card>>();
}
return cards;
}
std::vector<std::string> json_request::get_pol()
{
vector<string> str;
if(m_jservice.contains("pol"))
{
json j = m_jservice.at("pol");
str = j.get<std::vector<string>>();
}
return str;
}
#include "cutout.hpp"
#include "cutout_nljson.hpp"
#include "mcutout.hpp"
#include "mcutout_nljson.hpp"
#include "json.hpp"
#include <string>
using json = nlohmann::json;
/* All nlohmann-json exception are json::exception <- std::exception.
* So let them be caught by std::excpetion as 'Internal errors' in rpc-call's infinite loop,
* assuming all API syntactic errors were caught in servlet API parser */
enum service {SEARCH, CUTOUT, MCUTOUT, MERGEF, MERGE1, MERGE2, MERGE3, SUBIMG};
class json_request
{
public:
json_request(std::string request_json);
bool is_search() {return m_service == SEARCH;}
bool is_cutout() {return m_service == CUTOUT;}
bool is_mcutout() {return m_service == MCUTOUT;}
bool is_mergefiles() {return m_service == MERGEF;}
bool is_mergefiles_common_header() {return m_service == MERGE1;}
bool is_mergefiles_reproject() {return m_service == MERGE2;}
bool is_mergefiles_add_reprojected() {return m_service == MERGE3;}
bool is_subimg() {return m_service == SUBIMG;}
std::string pubdid() {return m_jservice.at("pubdid");}
struct coordinates coordinates();
bool count_null_values(){return m_jservice.at("count_null_values");}
std::vector<struct cut_param_s> cut_params();
std::string merge_id() {return m_jservice.at("merge_id");}
std::string dimensionality() {return m_jservice.at("dimensionality");}
std::vector<std::string> files_to_merge() {return m_jservice.at("files_to_merge");}
std::string fitsfilename() {return m_jservice.at("fits_filename");}
/* SUBIMG */
std::string abs_subimg_pathname() {return m_jservice.at("subimg_filename");}
std::string img_pathname() {return m_jservice.at("img_pathname");}
int img_hdunum() {return m_jservice.at("img_hdunum");}
std::vector<struct fits_card> extra_cards();
/* new: no coordinates instead separate pos band time pol */
position get_pos() { return (m_jservice.contains("pos") ? (position) m_jservice.at("pos") : pos_none ); }
band get_band() { return (m_jservice.contains("band") ? (band) m_jservice.at("band") : band_none); }
time_axis get_time() { return (m_jservice.contains("time") ? (time_axis) m_jservice.at("time") : time_none); }
std::vector<std::string> get_pol();
private:
json m_jservice;
service m_service;
};
// convert parameters to/from JSON and call vlkb services
// NOTE merge_id
// jobId exists if we run under UWS (MERGE1,2,3)
// create merge-id = amqp-queuename + jobId
// such id is uniq when running MERGE parallel and more deploys
// of vlkb-dtasets present against the same amqp-broker
#include "json_service_call.hpp"
#include "config.hpp"
#include "io.hpp"
#include "cutout.hpp"
#include "cutout_nljson.hpp"
#include "mcutout.hpp"
#include "mcutout_nljson.hpp"
#include "json_request.hpp"
#include "json_reply.hpp"
#include "fitsfiles.hpp" // calc_nullvals
#include <stdexcept>
#include <vector>
#include <string>
using namespace std;
string to_string(service_error serr)
{
string str;
switch(serr)
{
case service_error::INVALID_PARAM: return "INVALID_PARAM"; break;
case service_error::SYSTEM_ERROR: return "SYSTEM_ERROR"; break;
}
LOG_STREAM << string(__FILE__) << ":" << to_string(__LINE__) << "unrecognized value in service_error type" << endl;
return str;
}
string service_exception(service_error error, const string what)
{
LOG_trace(__func__);
json jreply;
switch(error)
{
case service_error::INVALID_PARAM:
jreply["exception"] = { {"type","INVALID_PARAM"}, {"msg",what} };
break;
case service_error::SYSTEM_ERROR:
jreply["exception"] = { {"type","SYSTEM_ERROR"}, {"msg",what} };
break;
}
LOG_STREAM << to_string(error) + ": " + what << endl;
return jreply.dump();
}
string service_call(string request_json, string queuename, config conf)
{
LOG_trace(__func__);
const string setts_fitsdir{conf.getFitsDir()};
const string setts_fitscutdir{conf.getFitsCutDir()};
LOG_STREAM << request_json << endl;
json_request req(request_json);
json_reply reply;
if(req.is_subimg())
{
cutout_res_s cutres = do_cutout_file(
req.img_pathname(), req.img_hdunum(),
req.get_pos(), req.get_band(), req.get_time(), req.get_pol(),
req.count_null_values(),
req.extra_cards(),
setts_fitsdir,
setts_fitscutdir);
reply.put_cutout_result(cutres);
}
else if(req.is_mcutout())
{
struct mcutout_res_s mres = mcutout(req.cut_params(), setts_fitsdir, setts_fitscutdir);
mres.tgz_filename = setts_fitscutdir + "/" + mres.tgz_filename;
mres.filesize = fitsfiles::fileSize(mres.tgz_filename);
reply.put_mcutout_result(mres);
}
else if(req.is_mergefiles())
{
string mergedfile_pathname;
unsigned long fsize = xmergefiles(
req.files_to_merge(),
req.dimensionality(),
setts_fitscutdir, setts_fitscutdir,
mergedfile_pathname);
cutout_res_s cutres{ fsize, mergedfile_pathname, {-1.0, 0, 0} };
reply.put_cutout_result(cutres);
}
else if(req.is_mergefiles_common_header())
{
string merge_id(queuename + "_" + req.merge_id());
xmergefiles_common_header(
merge_id,
req.files_to_merge(),
req.dimensionality(),//FIXME convert to int: dimensionslity
setts_fitscutdir, setts_fitscutdir);
}
else if(req.is_mergefiles_reproject())
{
string merge_id(queuename + "_" + req.merge_id());
xmergefiles_reproject(
merge_id,
req.fitsfilename(),
req.dimensionality(),//FIXME convert to int: dimensionslity
setts_fitscutdir, setts_fitscutdir);
}
else if(req.is_mergefiles_add_reprojected())
{
string merge_id(queuename + "_" + req.merge_id());
string mergedfile_pathname;
unsigned long fsize = xmergefiles_add_reprojected(
merge_id,
req.dimensionality(),
setts_fitscutdir, setts_fitscutdir,
mergedfile_pathname);
cutout_res_s cutres{ fsize, mergedfile_pathname, {-1.0, 0, 0} };
reply.put_cutout_result(cutres);
}
else
{
throw std::runtime_error("unrecognized vlkb service");
}
return reply.json_str();
}
// in service_call():
// queuename serves as identifier for merge working-dirs, to distinguish
// more instances of VLKB running on the same broker: each instance
// needs separate queue for rpc
// FIXME qname should come from config file
#ifndef JSON_SERVICE_CALL_HPP
#define JSON_SERVICE_CALL_HPP
#include "config.hpp"
#include <string>
std::string service_call(std::string request_json, std::string queuename, config conf);
enum class service_error {INVALID_PARAM, SYSTEM_ERROR};
std::string service_exception(enum service_error error, std::string what);
#endif
// RPC over AMQP
#include "rpc_amqp.hpp"
#include "config.hpp"
#include "rpc_amqp_utils.hpp"
#include "json_service_call.hpp"
#include <stdexcept>
#include <string>
#include <stdio.h>
#include <syslog.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "io.hpp"
using namespace std;
// error handling
void throw_ex_on_amqp_error(amqp_rpc_reply_t rc, amqp_connection_state_t conn, amqp_channel_t channel, char const *context)
{
std::string ct(context);
if(rc.reply_type != AMQP_RESPONSE_NORMAL)
{
amqp_rpc_reply_t rc_ch_close = amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
if(rc_ch_close.reply_type != AMQP_RESPONSE_NORMAL)
throw std::runtime_error("cannot close channel after unsuccessful " + ct);
amqp_rpc_reply_t rc_conn_close = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
if(rc_conn_close.reply_type != AMQP_RESPONSE_NORMAL)
throw std::runtime_error("cannot close connection after unsuccessful " + ct);
if(AMQP_STATUS_OK != amqp_destroy_connection(conn))
throw std::runtime_error("cannot end connection after unsuccessful " + ct);
else
throw std::runtime_error(ct + " failed");
}
}
void syslog_on_amqp_error(amqp_rpc_reply_t x, char const *context)
{
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
syslog(LOG_ERR, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
syslog(LOG_ERR, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD:
{
amqp_connection_close_t *m =
(amqp_connection_close_t *)x.reply.decoded;
syslog(LOG_ERR, "%s: server connection error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD:
{
amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded;
syslog(LOG_ERR, "%s: server channel error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
break;
}
default:
syslog(LOG_ERR, "%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
break;
}
break;
}
}
// AMQP RPC
//
// establish connection to RabbitMQ-broker on "conn" and channel=1
// use this connection [conn,channel] to:
// * create queue where Java-vlkb-client will put messages (queuename must match routingKey of Java-client config file)
// * bind the queue to pre-defined exchange "amq.direct"
// * ask the broker to start basic-consumer on that queue
// WAIT: Consume message from "conn"
// Create new reply-message with CorrdId from received message
// * publish the reply-msg to reply-to queue
// return to WAIT: ... loop forever
amqp_connection_state_t login_to_broker(const string user_name, const string password,
const string hostname, int port)
{
// allocate new conn and initialize
// NOTE: must destroy conn at exit
amqp_connection_state_t conn = amqp_new_connection();
if(conn == NULL)
throw std::runtime_error("cannot create new connection");
{ // open new TCP-socket and store in conn
amqp_socket_t *socket = NULL;
socket = amqp_tcp_socket_new(conn);
if (socket == NULL)
{
if(AMQP_STATUS_OK != amqp_destroy_connection(conn))
throw std::runtime_error("cannot end connection after unsuccessful new TCP socket");
else
throw std::runtime_error("error creating TCP socket");
}
int status;
status = amqp_socket_open(socket, hostname.c_str(), port);
if (status != 0)
{
if(AMQP_STATUS_OK != amqp_destroy_connection(conn))
throw std::runtime_error("cannot end connection after unsuccessful socket open");
else
throw std::runtime_error("error opening TCP socket");// FIXME add status to msg
}
}
amqp_rpc_reply_t rc;
rc = amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user_name.c_str(), password.c_str());
if(rc.reply_type != AMQP_RESPONSE_NORMAL)
{
amqp_rpc_reply_t rc_close = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
if(rc_close.reply_type != AMQP_RESPONSE_NORMAL)
throw std::runtime_error("cannot close connection after unsuccessful amqp login");
else if(AMQP_STATUS_OK != amqp_destroy_connection(conn))
throw std::runtime_error("cannot end connection after unsuccessful amqp login");
else
throw std::runtime_error("amqp_login failed");
}
return conn;
}
// RPC-loop
int channel_open(amqp_connection_state_t conn, amqp_channel_t channel)
{
amqp_channel_open(conn, channel);
amqp_rpc_reply_t rep = amqp_get_rpc_reply(conn);
return (rep.reply_type != AMQP_RESPONSE_NORMAL);
}
void declare_nondurable_autodelete_queue(
amqp_connection_state_t conn, amqp_channel_t channel,
amqp_bytes_t queuename)
{
amqp_queue_declare(conn, channel,
queuename,
0, // 'passive' guarantees that this client sees the queue which was created already
0, // 'durable' queue survives broker restarts
0, // 'exclusive' to current connection (queue deleted when conn closes)
1, // 'auto_delete' the queue when not used
amqp_empty_table); // amqp_table_t arguments specific for AMQP broker implementation (none in RabbitMQ)
}
// start a queue consumer (e.g. start delivering msgs from the queue to this client)
// broker-implementation should support at least 16 consumers per queue
void start_basic_consumer_noack(
amqp_connection_state_t conn, amqp_channel_t channel,
amqp_bytes_t queuename)
{
amqp_basic_consume(conn, channel,
queuename,
amqp_empty_bytes, // consumer_tag amqp_bytes_t: consumer-identifier (if empty, server generates a tag)
0, // no_local amqp_boolean_t: broker will not send msgs to connection which published them
1, // no_ack amqp_boolean_t: broker does not expect acknowledgement for delivered msgs
0, // exclusive amqp_boolean_t : only this consumer can access the queue
amqp_empty_table); // arguments amqp_table_t: implementation specific args (not used in RabbitMQ)
}
int consume_message_wait_forever(amqp_connection_state_t conn, amqp_envelope_t *envelope)
{
// release memory associated with all channels
amqp_maybe_release_buffers(conn);
amqp_rpc_reply_t res = amqp_consume_message(conn,
envelope, // message in envelope
NULL, // timeout (struct *)
0); // flags (int) AMQP_UNUSED
if (AMQP_RESPONSE_NORMAL != res.reply_type)
{
syslog_on_amqp_error(res, "amqp_consume_message");
}
return (AMQP_RESPONSE_NORMAL != res.reply_type);
}
void basic_publish_on_queue_or_drop(amqp_connection_state_t conn, amqp_channel_t channel,
amqp_bytes_t queuename,
amqp_bytes_t correlation_id,
const char * msg_buff)
{
amqp_basic_properties_t props;
props._flags =
AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
AMQP_BASIC_CORRELATION_ID_FLAG;
props.content_type = amqp_cstring_bytes("application/json");// FIXME make sure encoding is UTF-8
//props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2;
// 1: non-persistent
// 2: persistent (delivered even if broker re-boots - msg held on hard disk)
props.correlation_id = correlation_id;
int rc = amqp_basic_publish(conn, channel,
amqp_empty_bytes, // exchange amqp_bytes_t: empty = default-exchange
queuename, // routingKey := queuename amqp_bytes_t
0, // mandatory amqp_boolean_t 0: drop the msg if cannot be routed (1: return msg)
0, // immediate amqp_boolean_t 0: queue the msg if cannot be routed immediately (1: -"-)
&props, // amqp_basic_properties_t
amqp_cstring_bytes(msg_buff)); // body
if (rc < 0)
{
syslog(LOG_ERR, "%s: basic publish failed with %uh, message: %s\n",__func__, rc, amqp_error_string2(rc));
}
}
// run RPC-loop
// even if error happens on consume-request or publish-response
void rpc_loop_forever(
amqp_connection_state_t conn, amqp_channel_t channel,
const string queuename,
const string settings_pathname)
{
if(channel_open(conn, channel))
{
amqp_rpc_reply_t rep_close = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
if(rep_close.reply_type != AMQP_RESPONSE_NORMAL)
throw std::runtime_error("cannot close connection after unsuccessful channel open");
else if(AMQP_STATUS_OK != amqp_destroy_connection(conn))
throw std::runtime_error("cannot end connection after unsuccessful channel open");
else
throw std::runtime_error("channel open failed");
}
declare_nondurable_autodelete_queue(conn, channel, amqp_cstring_bytes(queuename.c_str()));
throw_ex_on_amqp_error(amqp_get_rpc_reply(conn), conn, channel, "amqp queue declare");
start_basic_consumer_noack(conn, channel, amqp_cstring_bytes(queuename.c_str()));
throw_ex_on_amqp_error(amqp_get_rpc_reply(conn), conn, channel, "amqp basic consume");
syslog(LOG_INFO,"AMQP initialized. Run RPC loop.");
config conf;
conf.read_config(settings_pathname);
syslog(LOG_INFO, string("Will log to " + conf.getLogDir()).c_str());
for (;;)
{
amqp_envelope_t envelope;
if(consume_message_wait_forever(conn, &envelope))
{
continue;
}
string request_json((const char*)envelope.message.body.bytes, envelope.message.body.len);
// RPC call
LOG_open(conf.getLogDir(), conf.getLogFileName());
string reply_json;
try
{
reply_json = service_call(request_json, queuename, conf);
}
catch(const invalid_argument& ex)
{
reply_json = service_exception(service_error::INVALID_PARAM, ex.what());
}
catch(const exception& ex)
{
reply_json = service_exception(service_error::SYSTEM_ERROR, ex.what());
}
LOG_close();
basic_publish_on_queue_or_drop(conn, channel,
envelope.message.properties.reply_to,
envelope.message.properties.correlation_id,
reply_json.c_str());
amqp_destroy_envelope(&envelope);
}
// Function never returns. Terminate with signal.
}
void do_cleanup(amqp_connection_state_t conn, amqp_channel_t channel)
{
die_on_amqp_error(amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
LOG_close();
}
// interfaces
// global to make accessible from signal_handler FIXME
amqp_connection_state_t conn;
amqp_channel_t channel;
void rpc_run(const string user_name, const string password,
const string hostname, int port,
const string rpc_queuename,
const string settings_pathname)
{
conn = login_to_broker(user_name, password, hostname, port);
channel = 1; // only single AMQP-channel per connection needed, use channel no. 1
rpc_loop_forever(conn, channel, rpc_queuename, settings_pathname); // func never returns
}
void rpc_cleanup(void)
{
do_cleanup(conn, channel);
}
///////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
// NOTE:
// this was in rpc_run_loop AFTER queue_declare and BEFORE basic_consume :
#ifdef usedefaultexchange
// bind queue to exchange
amqp_queue_bind(conn, channel,
queuename,
amqp_cstring_bytes("amq.direct"), // exchange
queuename, // routingKey := queuename
amqp_empty_table); // empty arguments
throw_ex_on_amqp_error(amqp_get_rpc_reply(conn), "amqp queue bind");
// better load balancing
amqp_basic_qos_ok_t * qok = amqp_basic_qos(conn, channel,
0, // prefetch_size uint32_t
1, // prefetch_count uint16_t
0);// global amqp_boolean_t :
// =0 prefetch_count applies seperatly to each consumer
// =1 prefetch_count applices to all consumers
throw_ex_on_amqp_error(amqp_get_rpc_reply(conn), "amqp basic QoS");
#endif
// ask the broker to start a basic-consumer on queue "queuename"
// serves all channels in connection (envelope.channel) -> always reply to
// queue whos name is in reply-to field (no need to ditinguish channels,
// reply-to queues were created by that channel)
// no_ack affects message consume from queue:
// broker will remove msg right after delivery without waiting for confirmation from connected peer
// improves performance on expense of reliability
/* util
void print_envelope_if(int condition, amqp_envelope_t * envelope)
{
if(condition){
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
(unsigned) envelope->delivery_tag,
(int) envelope->exchange.len, (char *) envelope->exchange.bytes,
(int) envelope->routing_key.len, (char *) envelope->routing_key.bytes);
if (envelope->message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
(int) envelope->message.properties.content_type.len,
(char *) envelope->message.properties.content_type.bytes);
}
printf("----\n");
// amqp_dump(envelope->message.body.bytes, envelope->message.body.len);
}
}
*/
#ifndef RPC_AMQP_HPP
#define RPC_AMQP_HPP
#include <string>
void rpc_run(
const std::string user_name, const std::string password,
const std::string hostname, int port,
const std::string rpc_queuename,
const std::string settings_pathname);
void rpc_cleanup(void);
#endif
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
#include "rpc_amqp_utils.hpp"
#include <string>
#include <stdexcept>
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <ctype.h>
#include <syslog.h>
#include <amqp.h>
#include <amqp_framing.h>
void die(const char *fmt, ...)
{
const size_t BUFF_SIZE = 256;
char buff[BUFF_SIZE];
buff[BUFF_SIZE] = 0;
va_list ap;
va_start(ap, fmt);
vsnprintf(buff, BUFF_SIZE-1, fmt, ap);
va_end(ap);
throw std::runtime_error(buff);
}
void die_on_error(int x, char const *context)
{
if (x < 0) {
die("%s: %s\n", context, amqp_error_string2(x));
}
}
// If synchronous AMQP API methods fail (return NULL) use this.
// Get the last global amqp_rpc_reply (per-connection-global amqp_rpc_reply_t)
// normal operation: AMQP_RESPONSE_NORMAL -> RPC completed successfully.
// error: AMQP_RESPONSE_SERVER_EXCEPTION (conn closed, channel closed, library exception)
void die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
{
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
die("%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
die("%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
die("%s: server connection error %uh, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
die("%s: server channel error %uh, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
default:
die("%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
break;
}
break;
default:
die("%s: unknown server error, reply_type 0x%08X\n", context, x.reply_type);
break;
}
// code never reaches here
}
void throw_ex_on_amqp_error(amqp_connection_state_t conn, amqp_rpc_reply_t x, char const *context)
{
const size_t buff_size = 255;
char buff[buff_size+1];
buff[buff_size+1] = 0;
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
snprintf(buff, buff_size,"%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
snprintf(buff, buff_size,"%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
snprintf(buff, buff_size,"%s: server connection error %uh, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
snprintf(buff, buff_size,"%s: server channel error %uh, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
default:
snprintf(buff, buff_size,"%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
break;
}
break;
default:
snprintf(buff, buff_size,"%s: unknown server error, reply_type 0x%08X\n", context, x.reply_type);
break;
}
if (AMQP_RESPONSE_NORMAL != x.reply_type)
{
die_on_error(amqp_destroy_connection(conn), "Ending connection before throw runtime exception");
throw std::runtime_error(buff);
}
}
// output
static void dump_row(long count, int numinrow, int *chs)
{
int i;
printf("%08lX:", count - numinrow);
if (numinrow > 0) {
for (i = 0; i < numinrow; i++) {
if (i == 8) {
printf(" :");
}
printf(" %02X", chs[i]);
}
for (i = numinrow; i < 16; i++) {
if (i == 8) {
printf(" :");
}
printf(" ");
}
printf(" ");
for (i = 0; i < numinrow; i++) {
if (isprint(chs[i])) {
printf("%c", chs[i]);
} else {
printf(".");
}
}
}
printf("\n");
}
static int rows_eq(int *a, int *b)
{
int i;
for (i=0; i<16; i++)
if (a[i] != b[i]) {
return 0;
}
return 1;
}
void amqp_dump(void const *buffer, size_t len)
{
unsigned char *buf = (unsigned char *) buffer;
long count = 0;
int numinrow = 0;
int chs[16];
int oldchs[16] = {0};
int showed_dots = 0;
size_t i;
for (i = 0; i < len; i++) {
int ch = buf[i];
if (numinrow == 16) {
int j;
if (rows_eq(oldchs, chs)) {
if (!showed_dots) {
showed_dots = 1;
printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
}
} else {
showed_dots = 0;
dump_row(count, numinrow, chs);
}
for (j=0; j<16; j++) {
oldchs[j] = chs[j];
}
numinrow = 0;
}
count++;
chs[numinrow++] = ch;
}
dump_row(count, numinrow, chs);
if (numinrow != 0) {
printf("%08lX:\n", count);
}
}
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
#ifndef RPC_AMQP_UTILS_HPP
#define RPC_AMQP_UTILS_HPP
#include <amqp.h>
void die(const char *fmt, ...);
extern void die_on_error(int x, char const *context);
extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context);
extern void amqp_dump(void const *buffer, size_t len);
extern uint64_t now_microseconds(void);
extern void microsleep(int usec);
void throw_ex_on_amqp_error(amqp_connection_state_t conn, amqp_rpc_reply_t x, char const *context);
#endif
// vlkbd
// daemon for VLKB services
#include "rpc_amqp.hpp"
#include <iostream>
#include <stdexcept>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
// daemon related START
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/signal.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <syslog.h>
// daemon related END
#define DAEMON_NAME "vlkbd"
char dname[256];
using namespace std;
void signal_handler(int sig) {
switch(sig){
case SIGTERM:
syslog(LOG_INFO,"received SIGTERM signal. Exiting ...");
rpc_cleanup();
exit(EXIT_SUCCESS);
break;
default:
syslog(LOG_WARNING,"received unhandled signal (%d) %s",
sig,strsignal(sig));
break;
}
}
void daemonize(void) {
/* Our process ID and Session ID */
pid_t pid, sid;
/* Fork off the parent process */
pid = fork();
if (pid < 0) {
exit(EXIT_FAILURE);
}
/* If we got a good PID, then
we can exit the parent process. */
if (pid > 0) {
exit(EXIT_SUCCESS);
}
// insert signal handler here...
signal(SIGTERM, signal_handler);
/* Change the file mode mask */
umask(0);
// Open any logs here ...
setlogmask(LOG_UPTO(LOG_INFO));
sprintf(dname,"%s[%d]",DAEMON_NAME,getpid());
openlog(dname, LOG_CONS, LOG_USER);
syslog(LOG_INFO,"service started.");
/* Create a new SID for the child process */
sid = setsid();
if (sid < 0) {
/* Log the failure */
exit(EXIT_FAILURE);
}
/* Change the current working directory */
if ((chdir("/")) < 0) {
/* Log the failure */
exit(EXIT_FAILURE);
}
/* Keep the standard file descriptors 0, 1, 2 occupied
so as no socket can have it; to guard the socket
against 'forgotten' printf's */
freopen ("/dev/null", "r", stdin);
freopen ("/dev/null", "w", stdout);
freopen ("/dev/null", "w", stderr);
/* Daemon-specific initialization goes here */
}
std::string base_name(std::string path)
{
return path.substr(path.find_last_of("//") + 1);
// FIXME replace with basename
}
void usage(const string progname)
{
cerr
<< "Usage: " << progname << " host port queuename conf-pathname [-t]" << endl
<< endl
<< " -t : stay on terminal, don't run as daemon" << endl
<< endl
<< "Version: " << VERSIONSTR << " " << BUILD << endl;
}
int main(int argc, char *argv[])
{
const std::string progname = base_name(argv[0]);
if ((argc != 5) && (argc != 6))
{
usage(progname);
return 1;
}
int daemon = (argc == 5);
if(daemon)
daemonize();
// now stdin/out/err are redirected
// start AMQP-consumer
string hostname(argv[1]);
int port = atoi(argv[2]);
string queuename(argv[3]);
string user_name("guest");
string password("guest");
string settings_pathname(argv[4]);
//string settings_pathname("/etc/vlkb/datasets.conf");
// FIXME uname passwd put to conf file or args - cannot start without them
try
{
rpc_run(user_name, password, hostname, port, queuename, settings_pathname);
}
catch (std::runtime_error const& e)
{
if(daemon)
syslog(LOG_ERR, e.what());
else
std::cerr << "Runtime error: " << e.what() << std::endl;
return 2;
}
syslog(LOG_INFO,"service stoped. Exiting...");
return 0;
}
.\" Hey, EMACS: -*- nroff -*-
.\" (C) Copyright 2023 ...
.\"
.TH vlkbd 1
.SH NAME
vlkbd \- vlkbd application
.SH SYNOPSIS
.B vlkbd
.SH DESCRIPTION
The
.B vlkbd
is an engine to perform data access (cutout, multicutout and demosaicing) of FITS files.
List of actual sub-commands is printed by --help.
.SH SEE ALSO
.BR vlkb, vlkb-obscore, vlkbd (1).
.SH AUTHORS
The
.B vlkbd
was written by
RBu <rbu@ia2.inaf.it>
.PP
This document was written by RBu <rbu@ia2.inaf.it> for Debian.
vlkbd (1.4.8) stable; urgency=low
[ VLKB ]
* First release via deb and rpm packages.
-- INAF <RBu@ia2.inaf.com> Thu, 23 Dec 2023 11:30:00 +0100
vlkbd (1.4.7) stable; urgency=low
[ INAF ]
* Adds support for SODA parameters (http://ivoa.net/documents).
-- INAF <RBu@ia2.inaf.org> Wed, 4 Oct 2023 11:00:00 +0100
Package: vlkbd
Version:
Section: utils
Priority: optional
Architecture: all
Maintainer: VLKB <RBu@ia2.vlkb.org>
Description: This is vlkbd engine to perform data access (cutout, multi-cutout, demosaicing) of FITS-files. List of commands is printed in help.
vlkbd
Copyright: 2023 INAF <ia2@inaf.com>
2023-10-30
The entire code base may be distributed under the terms of the GNU General
Public License (GPL), which appears immediately below. Alternatively, all
of the source code as any code derived from that code may instead be
distributed under the GNU Lesser General Public License (LGPL), at the
choice of the distributor. The complete text of the LGPL appears at the
bottom of this file.
See /usr/share/common-licenses/(GPL|LGPL)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment