package tap;
/*
* This file is part of TAPLibrary.
*
* TAPLibrary is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* TAPLibrary is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with TAPLibrary. If not, see .
*
* Copyright 2012-2013 - UDS/Centre de Données astronomiques de Strasbourg (CDS),
* Astronomisches Rechen Institute (ARI)
*/
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import javax.servlet.http.HttpServletResponse;
import tap.db.DBConnection;
import tap.db.DBException;
import tap.formatter.OutputFormat;
import tap.log.TAPLog;
import tap.metadata.TAPSchema;
import tap.metadata.TAPTable;
import tap.parameters.TAPParameters;
import tap.upload.TableLoader;
import uws.UWSException;
import uws.job.JobThread;
import uws.job.Result;
import adql.parser.ADQLParser;
import adql.parser.ADQLQueryFactory;
import adql.parser.ParseException;
import adql.parser.QueryChecker;
import adql.query.ADQLQuery;
import adql.translator.ADQLTranslator;
import adql.translator.TranslationException;
/**
*
*
* @author Grégory Mantelet (CDS;ARI) - gmantele@ari.uni-heidelberg.de
* @version 1.1 (12/2013)
*
* @param
*/
public class ADQLExecutor< R > {
protected final ServiceConnection service;
protected final TAPLog logger;
protected Thread thread;
protected TAPParameters tapParams;
protected HttpServletResponse response;
protected TAPExecutionReport report;
private DBConnection dbConn = null;
protected TAPSchema uploadSchema = null;
public ADQLExecutor(final ServiceConnection service){
this.service = service;
this.logger = service.getLogger();
}
public final TAPLog getLogger(){
return logger;
}
public final TAPExecutionReport getExecReport(){
return report;
}
public boolean hasUploadedTables(){
return (uploadSchema != null) && (uploadSchema.getNbTables() > 0);
}
protected final DBConnection getDBConnection() throws TAPException{
return (dbConn != null) ? dbConn : (dbConn = service.getFactory().createDBConnection((report != null) ? report.jobID : null));
}
public final void closeDBConnection() throws TAPException{
if (dbConn != null){
dbConn.close();
dbConn = null;
}
}
private final void uploadTables() throws TAPException{
TableLoader[] tables = tapParams.getTableLoaders();
if (tables.length > 0){
logger.info("JOB " + report.jobID + "\tLoading uploaded tables (" + tables.length + ")...");
long start = System.currentTimeMillis();
try{
uploadSchema = service.getFactory().createUploader(getDBConnection()).upload(tables);
}finally{
TAPParameters.deleteUploadedTables(tables);
report.setDuration(ExecutionProgression.UPLOADING, System.currentTimeMillis() - start);
}
}
}
private final R executeADQL() throws ParseException, InterruptedException, TranslationException, SQLException, TAPException, UWSException{
long start;
tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.PARSING);
start = System.currentTimeMillis();
ADQLQuery adql = parseADQL();
report.setDuration(ExecutionProgression.PARSING, System.currentTimeMillis() - start);
if (thread.isInterrupted())
throw new InterruptedException();
report.resultingColumns = adql.getResultingColumns();
final int limit = adql.getSelect().getLimit();
final Integer maxRec = tapParams.getMaxRec();
if (maxRec != null && maxRec > -1){
if (limit <= -1 || limit > maxRec)
adql.getSelect().setLimit(maxRec + 1);
}
tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.TRANSLATING);
start = System.currentTimeMillis();
String sqlQuery = translateADQL(adql);
report.setDuration(ExecutionProgression.TRANSLATING, System.currentTimeMillis() - start);
report.sqlTranslation = sqlQuery;
if (thread.isInterrupted())
throw new InterruptedException();
tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.EXECUTING_SQL);
start = System.currentTimeMillis();
R result = executeQuery(sqlQuery, adql);
report.setDuration(ExecutionProgression.EXECUTING_SQL, System.currentTimeMillis() - start);
return result;
}
public final TAPExecutionReport start(final AsyncThread thread) throws TAPException, UWSException, InterruptedException, ParseException, TranslationException, SQLException{
if (this.thread != null || this.report != null)
throw new UWSException(UWSException.INTERNAL_SERVER_ERROR, "This ADQLExecutor has already been executed !");
this.thread = thread;
TAPJob tapJob = thread.getTAPJob();
this.tapParams = tapJob.getTapParams();
this.report = new TAPExecutionReport(tapJob.getJobId(), false, tapParams);
this.response = null;
return start();
}
public final TAPExecutionReport start(final Thread thread, final String jobId, final TAPParameters params, final HttpServletResponse response) throws TAPException, UWSException, InterruptedException, ParseException, TranslationException, SQLException{
if (this.thread != null || this.report != null)
throw new TAPException("This ADQLExecutor has already been executed !");
this.thread = thread;
this.tapParams = params;
this.report = new TAPExecutionReport(jobId, true, tapParams);
this.response = response;
return start();
}
protected final TAPExecutionReport start() throws TAPException, UWSException, InterruptedException, ParseException, TranslationException, SQLException{
long start = System.currentTimeMillis();
try{
// Upload tables if needed:
if (tapParams != null && tapParams.getTableLoaders() != null && tapParams.getTableLoaders().length > 0){
tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.UPLOADING);
uploadTables();
}
if (thread.isInterrupted())
throw new InterruptedException();
// Parse, translate in SQL and execute the ADQL query:
R queryResult = executeADQL();
if (queryResult == null || thread.isInterrupted())
throw new InterruptedException();
// Write the result:
tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.WRITING_RESULT);
writeResult(queryResult);
logger.info("JOB " + report.jobID + " COMPLETED");
tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.FINISHED);
report.success = true;
return report;
}catch(NullPointerException npe){
npe.printStackTrace();
throw npe;
}finally{
try{
dropUploadedTables();
}catch(TAPException e){
logger.error("JOB " + report.jobID + "\tCan not drop uploaded tables !", e);
}
try{
closeDBConnection();
}catch(TAPException e){
logger.error("JOB " + report.jobID + "\tCan not close the DB connection !", e);
}
report.setTotalDuration(System.currentTimeMillis() - start);
logger.queryFinished(report);
}
}
protected ADQLQuery parseADQL() throws ParseException, InterruptedException, TAPException{
ADQLQueryFactory queryFactory = service.getFactory().createQueryFactory();
QueryChecker queryChecker = service.getFactory().createQueryChecker(uploadSchema);
ADQLParser parser;
if (queryFactory == null)
parser = new ADQLParser(queryChecker);
else
parser = new ADQLParser(queryChecker, queryFactory);
parser.setCoordinateSystems(service.getCoordinateSystems());
parser.setDebug(false);
//logger.info("Job "+report.jobID+" - 1/5 Parsing ADQL....");
return parser.parseQuery(tapParams.getQuery());
}
protected String translateADQL(ADQLQuery query) throws TranslationException, InterruptedException, TAPException{
ADQLTranslator translator = service.getFactory().createADQLTranslator();
//logger.info("Job "+report.jobID+" - 2/5 Translating ADQL...");
return translator.translate(query);
}
protected R executeQuery(String sql, ADQLQuery adql) throws SQLException, InterruptedException, TAPException{
//logger.info("Job "+report.jobID+" - 3/5 Creating DBConnection....");
DBConnection dbConn = getDBConnection();
//logger.info("Job "+report.jobID+" - 4/5 Executing query...\n"+sql);
final long startTime = System.currentTimeMillis();
R result = dbConn.executeQuery(sql, adql);
if (result == null)
logger.info("JOB " + report.jobID + " - QUERY ABORTED AFTER " + (System.currentTimeMillis() - startTime) + " MS !");
else
logger.info("JOB " + report.jobID + " - QUERY SUCCESFULLY EXECUTED IN " + (System.currentTimeMillis() - startTime) + " MS !");
return result;
}
protected OutputFormat getFormatter() throws TAPException{
// Search for the corresponding formatter:
String format = tapParams.getFormat();
OutputFormat formatter = service.getOutputFormat((format == null) ? "votable" : format);
if (format != null && formatter == null)
formatter = service.getOutputFormat("votable");
// Format the result:
if (formatter == null)
throw new TAPException("Impossible to format the query result: no formatter has been found for the given MIME type \"" + format + "\" and for the default MIME type \"votable\" (short form) !");
return formatter;
}
protected final void writeResult(R queryResult) throws InterruptedException, TAPException, UWSException{
OutputFormat formatter = getFormatter();
// Synchronous case:
if (response != null){
long start = System.currentTimeMillis();
try{
response.setContentType(formatter.getMimeType());
writeResult(queryResult, formatter, response.getOutputStream());
}catch(IOException ioe){
throw new TAPException("Impossible to get the output stream of the HTTP request to write the result of the job " + report.jobID + " !", ioe);
}finally{
report.setDuration(ExecutionProgression.WRITING_RESULT, System.currentTimeMillis() - start);
}
}// Asynchronous case:
else{
long start = System.currentTimeMillis();
try{
JobThread jobThread = (JobThread)thread;
Result result = jobThread.createResult();
result.setMimeType(formatter.getMimeType());
writeResult(queryResult, formatter, jobThread.getResultOutput(result));
result.setSize(jobThread.getResultSize(result));
jobThread.publishResult(result);
}catch(IOException ioe){
throw new UWSException(UWSException.INTERNAL_SERVER_ERROR, ioe, "Impossible to get the output stream of the result file to write the result of the job " + report.jobID + " !");
}finally{
report.setDuration(ExecutionProgression.WRITING_RESULT, System.currentTimeMillis() - start);
}
}
}
protected void writeResult(R queryResult, OutputFormat formatter, OutputStream output) throws InterruptedException, TAPException{
//logger.info("Job "+report.jobID+" - 5/5 Writing result file...");
formatter.writeResult(queryResult, output, report, thread);
}
protected void dropUploadedTables() throws TAPException{
if (uploadSchema != null){
// Drop all uploaded tables:
DBConnection dbConn = getDBConnection();
for(TAPTable t : uploadSchema){
try{
dbConn.dropTable(t);
}catch(DBException dbe){
logger.error("JOB " + report.jobID + "\tCan not drop the table \"" + t.getDBName() + "\" (in adql \"" + t.getADQLName() + "\") from the database !", dbe);
}
}
closeDBConnection();
}
}
}