From 1a698ff62facb9bd131811a9d525cbbee0f18fca Mon Sep 17 00:00:00 2001 From: gmantele <gmantele@ari.uni-heidelberg.de> Date: Wed, 20 Aug 2014 19:31:03 +0200 Subject: [PATCH] [TAP] Check ADQLExecutor behavior + Review/Add/Complete Javadoc in all classes of this commit except AsyncThread and TAPParameters + Remake the upload management --- src/tap/ADQLExecutor.java | 438 ++++++++++++++++++--- src/tap/AsyncThread.java | 13 - src/tap/data/LimitedTableIterator.java | 43 +- src/tap/parameters/TAPParameters.java | 2 +- src/tap/upload/LimitedSizeInputStream.java | 57 ++- src/tap/upload/TableLoader.java | 145 +++++-- src/tap/upload/Uploader.java | 57 ++- 7 files changed, 629 insertions(+), 126 deletions(-) diff --git a/src/tap/ADQLExecutor.java b/src/tap/ADQLExecutor.java index c478517..82c105a 100644 --- a/src/tap/ADQLExecutor.java +++ b/src/tap/ADQLExecutor.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import javax.servlet.http.HttpServletResponse; +import tap.data.DataReadException; import tap.data.TableIterator; import tap.db.DBConnection; import tap.db.DBException; @@ -35,6 +36,7 @@ import tap.metadata.TAPSchema; import tap.metadata.TAPTable; import tap.parameters.TAPParameters; import tap.upload.TableLoader; +import tap.upload.Uploader; import uws.UWSException; import uws.job.JobThread; import uws.job.Result; @@ -46,37 +48,138 @@ import adql.query.ADQLQuery; import adql.translator.TranslationException; /** + * <p>Let process completely an ADQL query.</p> * + * <p>Thus, this class aims to apply the following actions (in the given order):</p> + * <ol> + * <li>Upload the user tables, if any</li> + * <li>Parse the ADQL query (and so, transform it in an object tree)</li> + * <li>Execute it in the "database"</li> + * <li>Format and write the result</li> + * <li>Drop all uploaded tables from the "database"</li> + * </ol> + * + * <h3>Job execution mode</h3> + * + * <p> + * This executor is able to process queries coming from a synchronous job (the result must be written directly in the HTTP response) + * and from an asynchronous job (the result must be written, generally, in a file). Two start(...) functions let deal with + * the differences between the two job execution modes: {@link #start(AsyncThread)} for asynchronous jobs + * and {@link #start(Thread, String, TAPParameters, HttpServletResponse)} for synchronous jobs. + * </p> + * + * <h3>Input/Output formats</h3> + * + * <p>Uploaded tables must be provided in VOTable format.</p> + * + * <p> + * Query results must be formatted in the format specified by the user in the job parameters. A corresponding formatter ({@link OutputFormat}) + * is asked to the description of the TAP service ({@link ServiceConnection}). If none can be found, VOTable will be chosen by default. + * </p> + * + * <h3>Executor customization</h3> + * + * <p>It is totally possible to customize some parts of the ADQL query processing. However, the main algorithm must remain the same and is implemented + * by {@link #start()}. This function is final, like {@link #start(AsyncThread)} and {@link #start(Thread, String, TAPParameters, HttpServletResponse)}, + * which are just preparing the execution for {@link #start()} in function of the job execution mode (asynchronous or synchronous). + * </p> + * + * <p><i>Note: + * {@link #start()} is using the Template Method Design Pattern: it defines the skeleton/algorithm of the processing, and defers some steps + * to other functions. + * </i></p> + * + * <p> + * So, you are able to customize almost all individual steps of the ADQL query processing: {@link #parseADQL()}, {@link #executeADQL(ADQLQuery)} and + * {@link #writeResult(TableIterator, OutputFormat, OutputStream)}. + * </p> + * + * <p><i>Note: + * Note that the formatting of the result is done by an OutputFormat and that the executor is just calling the appropriate function of the formatter. + * </i></p> + * + * <p> + * There is no way in this executor to customize the upload. However, it does not mean it can not be customized. + * Indeed you can do it easily by extending {@link Uploader} and by providing the new class inside your {@link TAPFactory} implementation + * (see {@link TAPFactory#createUploader(DBConnection)}). + * </p> * * @author Grégory Mantelet (CDS;ARI) * @version 2.0 (08/2014) */ public class ADQLExecutor { + /** Description of the current TAP service. */ protected final ServiceConnection service; + /** The logger to use. */ protected final TAPLog logger; + /** The thread which is using this executor. */ protected Thread thread; + /** List of all TAP parameters needed for the query execution (and particularly the ADQL query itself). */ protected TAPParameters tapParams; + /** Description of the ADQL schema containing all the tables uploaded by the user for this specific query execution. + * <i>Note: This attribute is NULL before calling one of the start(...) function. It MAY be NULL also after if no table has been uploaded.</i> */ + protected TAPSchema uploadSchema = null; + + /** The HTTP response in which the query execution must be written. This attribute is NULL if the execution is asynchronous. */ protected HttpServletResponse response; + /** The execution report to fill gradually while the processing of the query. + * <i>Note: This attribute is NULL before calling one of the start(...) function, but it will never be after this call.</i> */ protected TAPExecutionReport report; + /** Connection to the "database". + * <i>Note: This attribute is NULL before and after the query processing (= call of a start(...) function).</i> */ private DBConnection dbConn = null; - protected TAPSchema uploadSchema = null; - + /** ID of the current query processing step (uploading, parsing, execution, writing result, ...). + * <i>Note: This attribute is NULL before and after the query processing (= call of a start(...) function).</i> */ + private ExecutionProgression progression = null; + /** Date/Time at which the current query processing step has started. */ + private long startStep = -1; + + /** + * Build an {@link ADQLExecutor}. + * + * @param service The description of the TAP service. + */ public ADQLExecutor(final ServiceConnection service){ this.service = service; this.logger = service.getLogger(); } + /** + * Get the logger used by this executor. + * + * @return The used logger. + */ public final TAPLog getLogger(){ return logger; } + /** + * <p>Get the report of the query execution. It helps indicating the execution progression and the duration of each step.</p> + * + * <p><i>Note: + * Before starting the execution (= before the call of a "start(...)" function), this function will return NULL. + * It is set when the query processing starts and remains not NULL after that (even when the execution is finished). + * </i></p> + * + * @return The execution report. + */ public final TAPExecutionReport getExecReport(){ return report; } + /** + * <p>Get the object to use in order to write the query result in the appropriate format + * (either the asked one, or else VOTable).</p> + * + * @return The appropriate result formatter to use. <i>Can not be NULL!</i> + * + * @throws TAPException If no format corresponds to the asked one and if no default format (for VOTable) can be found. + * + * @see ServiceConnection#getOutputFormat(String) + */ protected OutputFormat getFormatter() throws TAPException{ // Search for the corresponding formatter: String format = tapParams.getFormat(); @@ -91,9 +194,26 @@ public class ADQLExecutor { return formatter; } + /** + * <p>Start the asynchronous processing of the ADQL query.</p> + * + * <p> + * This function initialize the execution report, get the execution parameters (including the query to process) + * and then call {@link #start()}. + * </p> + * + * @param thread The asynchronous thread which asks the query processing. + * + * @return The resulting execution report. + * + * @throws UWSException + * @throws InterruptedException + * + * @see #start() + */ public final TAPExecutionReport start(final AsyncThread thread) throws UWSException, InterruptedException{ if (this.thread != null || this.report != null) - throw new UWSException(UWSException.INTERNAL_SERVER_ERROR, "This ADQLExecutor has already been executed !"); + throw new UWSException(UWSException.INTERNAL_SERVER_ERROR, "This ADQLExecutor has already been executed!"); this.thread = thread; @@ -105,9 +225,26 @@ public class ADQLExecutor { return start(); } + /** + * <p>Start the synchronous processing of the ADQL query.</p> + * + * <p>This function initialize the execution report and then call {@link #start()}.</p> + * + * @param thread The synchronous thread which asks the query processing. + * @param jobId ID of the corresponding job. + * @param params All execution parameters (including the query to process). + * @param response Object in which the result or the error must be written. + * + * @return The resulting execution report. + * + * @throws TAPException + * @throws InterruptedException + * + * @see #start() + */ public final TAPExecutionReport start(final Thread thread, final String jobId, final TAPParameters params, final HttpServletResponse response) throws TAPException, InterruptedException{ if (this.thread != null || this.report != null) - throw new TAPException("This ADQLExecutor has already been executed !"); + throw new TAPException("This ADQLExecutor has already been executed!"); this.thread = thread; this.tapParams = params; @@ -117,63 +254,180 @@ public class ADQLExecutor { return start(); } + /** + * <p>Process the ADQL query.</p> + * + * <p>This function calls the following function (in the same order):</p> + * <ol> + * <li>{@link TAPFactory#getConnection(String)}</li> + * <li>{@link #uploadTables()}</li> + * <li>{@link #parseADQL()}</li> + * <li>{@link #executeADQL(ADQLQuery)}</li> + * <li>{@link #writeResult(TableIterator)}</li> + * <li>{@link #dropUploadedTables()}</li> + * <li>{@link TAPFactory#freeConnection(DBConnection)}</li> + * </ol> + * + * <p> + * The execution report is updated gradually. Besides a job parameter - progression - is set at each step of the process in order to + * notify the user of the progression of the query execution. This parameter is removed at the end of the execution if it is successful. + * </p> + * + * <p>The "interrupted" flag of the associated thread is often tested so that stopping the execution as soon as possible.</p> + * + * @return The updated execution report. + * + * @throws TAPException + * @throws UWSException + * @throws InterruptedException + * @throws ParseException + * @throws TranslationException + * @throws SQLException + */ protected final TAPExecutionReport start() throws TAPException, UWSException, InterruptedException, ParseException, TranslationException, SQLException{ + // Save the start time (for reporting usage): long start = System.currentTimeMillis(); + + TableIterator queryResult = null; + try{ // Get a "database" connection: dbConn = service.getFactory().getConnection(report.jobID); - // 1. UPLOAD TABLES, if needed: - if (tapParams != null && tapParams.getTableLoaders() != null && tapParams.getTableLoaders().length > 0){ - tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.UPLOADING); + // 1. UPLOAD TABLES, if there is any: + if (tapParams.getTableLoaders() != null && tapParams.getTableLoaders().length > 0){ + startStep(ExecutionProgression.UPLOADING); uploadTables(); + endStep(); } if (thread.isInterrupted()) throw new InterruptedException(); // 2. PARSE THE ADQL QUERY: - tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.PARSING); + startStep(ExecutionProgression.PARSING); + // Parse the query: ADQLQuery adqlQuery = parseADQL(); + // List all resulting columns (it will be useful later to format the result): + report.resultingColumns = adqlQuery.getResultingColumns(); + endStep(); - if (adqlQuery == null || thread.isInterrupted()) + if (thread.isInterrupted()) throw new InterruptedException(); // 3. EXECUTE THE ADQL QUERY: - tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.EXECUTING_ADQL); - TableIterator queryResult = executeADQL(adqlQuery); + startStep(ExecutionProgression.EXECUTING_ADQL); + queryResult = executeADQL(adqlQuery); + endStep(); + + if (thread.isInterrupted()) + throw new InterruptedException(); // 4. WRITE RESULT: - tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.WRITING_RESULT); + startStep(ExecutionProgression.WRITING_RESULT); writeResult(queryResult); + endStep(); // Report the COMPLETED status: logger.info("JOB " + report.jobID + " COMPLETED"); - tapParams.set(TAPJob.PARAM_PROGRESSION, ExecutionProgression.FINISHED); + tapParams.remove(TAPJob.PARAM_PROGRESSION); report.success = true; return report; - }catch(NullPointerException npe){ - npe.printStackTrace(); - throw npe; }finally{ + // Close the result if any: + if (queryResult != null){ + try{ + queryResult.close(); + }catch(DataReadException dre){ + logger.error("JOB " + report.jobID + "\tCan not close the database query result!", dre); + } + } + // Drop all the uploaded tables (they are not supposed to exist after the query execution): try{ dropUploadedTables(); }catch(TAPException e){ - logger.error("JOB " + report.jobID + "\tCan not drop uploaded tables !", e); + logger.error("JOB " + report.jobID + "\tCan not drop the uploaded tables from the database!", e); } + // Free the connection (so that giving it back to a pool, if any, otherwise, just free resources): if (dbConn != null){ service.getFactory().freeConnection(dbConn); dbConn = null; } + // Set the total duration in the report: report.setTotalDuration(System.currentTimeMillis() - start); logger.queryFinished(report); } } + /** + * <p>Memorize the time at which the step starts, the step ID and update the job parameter "progression" + * (to notify the user about the progression of the query processing).</p> + * + * <p><i>Note: + * If for some reason the job parameter "progression" can not be updated, no error will be thrown. A WARNING message + * will be just written in the log. + * </i></p> + * + * <p><i>Note: + * This function is designed to work with {@link #endStep()}, which must be called after it, when the step is finished (successfully or not). + * </i></p> + * + * @param progression ID of the starting step. + * + * @see #endStep() + */ + private void startStep(final ExecutionProgression progression){ + // Save the start time (for report usage): + startStep = System.currentTimeMillis(); + // Memorize the current step: + this.progression = progression; + // Update the job parameter "progression", to notify the user about the progression of the query processing: + try{ + tapParams.set(TAPJob.PARAM_PROGRESSION, this.progression); + }catch(UWSException ue){ + logger.warning("Can not set/update the informative job parameter \"" + TAPJob.PARAM_PROGRESSION + "\" (this parameter would be just for notification purpose about the execution progression)! CAUSE: " + ue.getClass().getName() + " - " + ue.getMessage()); + } + } + + /** + * <p>Set the duration of the current step in the execution report.</p> + * + * <p><i>Note: + * The start time and the ID of the step are then forgotten. + * </i></p> + * + * <p><i>Note: + * This function is designed to work with {@link #startStep(ExecutionProgression)}, which must be called before it, when the step is starting. + * It marks the end of a step. + * </i></p> + * + * @see #startStep(ExecutionProgression) + */ + private void endStep(){ + if (progression != null){ + // Set the duration of this step in the execution report: + report.setDuration(progression, System.currentTimeMillis() - startStep); + // No start time: + startStep = -1; + // No step for the moment: + progression = null; + } + } + + /** + * <p>Create in the "database" all tables uploaded by the user (only for this specific query execution).</p> + * + * <p><i>Note: + * Obviously, nothing is done if no table has been uploaded. + * </i></p> + * + * @throws TAPException If any error occurs while reading the uploaded table + * or while importing them in the database. + */ private final void uploadTables() throws TAPException{ // Fetch the tables to upload: TableLoader[] tables = tapParams.getTableLoaders(); @@ -181,22 +435,45 @@ public class ADQLExecutor { // Upload them, if needed: if (tables.length > 0){ logger.info("JOB " + report.jobID + "\tLoading uploaded tables (" + tables.length + ")..."); - long start = System.currentTimeMillis(); try{ - /* TODO Problem with the DBConnection! One is created here for the Uploader (and dbConn is set) and closed by its uploadTables function (but dbConn is not set to null). - * Ideally, the connection should not be close, or at least dbConn should be set to null just after. */ uploadSchema = service.getFactory().createUploader(dbConn).upload(tables); }finally{ TAPParameters.deleteUploadedTables(tables); - report.setDuration(ExecutionProgression.UPLOADING, System.currentTimeMillis() - start); } } } + /** + * <p>Parse the ADQL query provided in the parameters by the user.</p> + * + * <p>The query factory and the query checker are got from the TAP factory.</p> + * + * <p> + * The configuration of this TAP service list all allowed coordinate systems. These are got here and provided to the query checker + * in order to ensure the coordinate systems used in the query are in this list. + * </p> + * + * <p> + * The row limit specified in the ADQL query (with TOP) is checked and adjusted (if needed). Indeed, this limit + * can not exceed MAXREC given in parameter and the maximum value specified in the configuration of this TAP service. + * In the case no row limit is specified in the query or the given value is greater than MAXREC, (MAXREC+1) is used by default. + * The "+1" aims to detect overflows. + * </p> + * + * @return The object representation of the ADQL query. + * + * @throws ParseException If the given ADQL query can not be parsed or if the construction of the object representation has failed. + * @throws InterruptedException If the thread has been interrupted. + * @throws TAPException If the TAP factory is unable to create the ADQL factory or the query checker. + */ protected ADQLQuery parseADQL() throws ParseException, InterruptedException, TAPException{ - long start = System.currentTimeMillis(); + // Get the ADQL factory: ADQLQueryFactory queryFactory = service.getFactory().createQueryFactory(); + + // Get the query checker: QueryChecker queryChecker = service.getFactory().createQueryChecker(uploadSchema); + + // Parse the ADQL query: ADQLParser parser; if (queryFactory == null) parser = new ADQLParser(queryChecker); @@ -205,77 +482,146 @@ public class ADQLExecutor { parser.setCoordinateSystems(service.getCoordinateSystems()); parser.setDebug(false); ADQLQuery query = parser.parseQuery(tapParams.getQuery()); + + // Set or check the row limit: final int limit = query.getSelect().getLimit(); final Integer maxRec = tapParams.getMaxRec(); if (maxRec != null && maxRec > -1){ if (limit <= -1 || limit > maxRec) query.getSelect().setLimit(maxRec + 1); } - report.setDuration(ExecutionProgression.PARSING, System.currentTimeMillis() - start); - report.resultingColumns = query.getResultingColumns(); + return query; } - protected TableIterator executeADQL(ADQLQuery adql) throws SQLException, InterruptedException, TAPException{ - final long startTime = System.currentTimeMillis(); + /** + * <p>Execute in "database" the given object representation of an ADQL query.</p> + * + * <p>By default, this function is just calling {@link DBConnection#executeQuery(ADQLQuery)} and then it returns the value returned by this call.</p> + * + * <p><i>Note: + * An INFO message is logged at the end of the query execution in order to report the result status (success or error) + * and the execution duration. + * </i></p> + * + * @param adql The object representation of the ADQL query to execute. + * + * @return The result of the query, + * or NULL if the query execution has failed. + * + * @throws SQLException If the query execution has failed ; the database is not able to execute this query. + * @throws InterruptedException If the thread has been interrupted. + * @throws TAPException If the {@link DBConnection} has failed to deal with the given ADQL query. + * + * @see {@link DBConnection#executeQuery(ADQLQuery)} + */ + protected TableIterator executeADQL(final ADQLQuery adql) throws SQLException, InterruptedException, TAPException{ TableIterator result = dbConn.executeQuery(adql); if (result == null) - logger.info("JOB " + report.jobID + " - QUERY ABORTED AFTER " + (System.currentTimeMillis() - startTime) + " MS !"); + logger.info("JOB " + report.jobID + " - QUERY ABORTED AFTER " + (System.currentTimeMillis() - startStep) + " MS !"); else - logger.info("JOB " + report.jobID + " - QUERY SUCCESFULLY EXECUTED IN " + (System.currentTimeMillis() - startTime) + " MS !"); + logger.info("JOB " + report.jobID + " - QUERY SUCCESFULLY EXECUTED IN " + (System.currentTimeMillis() - startStep) + " MS !"); return result; } - protected final void writeResult(TableIterator queryResult) throws InterruptedException, TAPException, UWSException{ + /** + * <p>Write the given query result into the appropriate format in the appropriate output + * (HTTP response for a synchronous execution, otherwise a file or any output provided by UWS).</p> + * + * <p>This function prepare the output in function of the execution type (synchronous or asynchronous). + * Once prepared, the result, the output and the formatter to use are given to {@link #writeResult(TableIterator, OutputFormat, OutputStream)} + * which will really process the result formatting and writing. + * </p> + * + * @param queryResult The result of the query execution in database. + * + * @throws InterruptedException If the thread has been interrupted. + * @throws TAPException If an error occurs while getting the appropriate formatter or while formatting or writing (synchronous execution) the result. + * @throws UWSException If an error occurs while getting the output stream or while writing (asynchronous execution) the result. + * + * @see #writeResult(TableIterator, OutputFormat, OutputStream) + */ + protected final void writeResult(final TableIterator queryResult) throws InterruptedException, TAPException, UWSException{ + // Get the appropriate result formatter: OutputFormat formatter = getFormatter(); - // Synchronous case: + // CASE SYNCHRONOUS: if (response != null){ - long start = System.currentTimeMillis(); try{ + // Set the HTTP content type to the MIME type of the result format: response.setContentType(formatter.getMimeType()); + + // Write the formatted result in the HTTP response output: writeResult(queryResult, formatter, response.getOutputStream()); + }catch(IOException ioe){ throw new TAPException("Impossible to get the output stream of the HTTP request to write the result of the job " + report.jobID + " !", ioe); - }finally{ - report.setDuration(ExecutionProgression.WRITING_RESULT, System.currentTimeMillis() - start); } - - }// Asynchronous case: + } + // CASE ASYNCHRONOUS: else{ - long start = System.currentTimeMillis(); try{ + // Create a UWS Result object to store the result + // (the result will be stored in a file and this object is the association between the job and the result file): JobThread jobThread = (JobThread)thread; Result result = jobThread.createResult(); + + // Set the MIME type of the result format in the result description: result.setMimeType(formatter.getMimeType()); + + // Write the formatted result in the file output: writeResult(queryResult, formatter, jobThread.getResultOutput(result)); + + // Set the size (in bytes) of the result in the result description: result.setSize(jobThread.getResultSize(result)); + + // Add the result description and link in the job description: jobThread.publishResult(result); + }catch(IOException ioe){ throw new UWSException(UWSException.INTERNAL_SERVER_ERROR, ioe, "Impossible to get the output stream of the result file to write the result of the job " + report.jobID + " !"); - }finally{ - report.setDuration(ExecutionProgression.WRITING_RESULT, System.currentTimeMillis() - start); } } } + /** + * <p>Format and write the given result in the given output with the given formatter.</p> + * + * <p>By default, this function is just calling {@link OutputFormat#writeResult(TableIterator, OutputStream, TAPExecutionReport, Thread)}.</p> + * + * <p><i>Note: + * {@link OutputFormat#writeResult(TableIterator, OutputStream, TAPExecutionReport, Thread)} is often testing the "interrupted" flag of the + * thread in order to stop as fast as possible if the user has cancelled the job or if the thread has been interrupted for another reason. + * </i></p> + * + * @param queryResult Query result to format and to output. + * @param formatter The object able to write the result in the appropriate format. + * @param output The stream in which the result must be written. + * + * @throws InterruptedException If the thread has been interrupted. + * @throws TAPException If there is an error while formatting the result. + */ protected void writeResult(TableIterator queryResult, OutputFormat formatter, OutputStream output) throws InterruptedException, TAPException{ - try{ - //logger.info("Job "+report.jobID+" - 5/5 Writing result file..."); - formatter.writeResult(queryResult, output, report, thread); - }finally{ - queryResult.close(); - } + formatter.writeResult(queryResult, output, report, thread); } + /** + * <p>Drop all tables uploaded by the user from the database.</p> + * + * <p><i>Note: + * By default, if an error occurs while dropping a table from the database, the error will just be logged ; it won't be thrown/propagated. + * </i></p> + * + * @throws TAPException If a grave error occurs. <i>By default, no exception is thrown ; they are just logged.</i> + */ protected void dropUploadedTables() throws TAPException{ if (uploadSchema != null){ // Drop all uploaded tables: for(TAPTable t : uploadSchema){ try{ - dbConn.dropUploadedTable(t.getDBName()); + dbConn.dropUploadedTable(t); }catch(DBException dbe){ - logger.error("JOB " + report.jobID + "\tCan not drop the table \"" + t.getDBName() + "\" (in adql \"" + t.getADQLName() + "\") from the database !", dbe); + logger.error("JOB " + report.jobID + "\tCan not drop the uploaded table \"" + t.getDBName() + "\" (in adql \"" + t.getADQLName() + "\") from the database!", dbe); } } } diff --git a/src/tap/AsyncThread.java b/src/tap/AsyncThread.java index 7d0d048..65c2c37 100644 --- a/src/tap/AsyncThread.java +++ b/src/tap/AsyncThread.java @@ -33,19 +33,6 @@ public class AsyncThread extends JobThread { this.executor = executor; } - @Override - public void interrupt(){ - if (isAlive()){ - try{ - executor.closeDBConnection(); - }catch(TAPException e){ - if (job != null && job.getLogger() != null) - job.getLogger().error("Can not close the DBConnection for the executing job \"" + job.getJobId() + "\" ! => the job will be probably not totally aborted.", e); - } - } - super.interrupt(); - } - @Override protected void jobWork() throws UWSException, InterruptedException{ try{ diff --git a/src/tap/data/LimitedTableIterator.java b/src/tap/data/LimitedTableIterator.java index 96c7c35..5703511 100644 --- a/src/tap/data/LimitedTableIterator.java +++ b/src/tap/data/LimitedTableIterator.java @@ -55,7 +55,7 @@ import com.oreilly.servlet.multipart.ExceededSizeException; * </p> * * @author Grégory Mantelet (ARI) - * @version 2.0 (07/2014) + * @version 2.0 (08/2014) * @since 2.0 */ public class LimitedTableIterator implements TableIterator { @@ -73,7 +73,7 @@ public class LimitedTableIterator implements TableIterator { private boolean overflow = false; /** - * Wrap the given {@link TableIterator} so that limiting the number of rows to read to the given value. + * Wrap the given {@link TableIterator} so that limiting the number of rows to read. * * @param it The iterator to wrap. <i>MUST NOT be NULL</i> * @param maxNbRows Maximum number of rows that can be read. There is overflow if more than this number of rows is asked. <i>A negative value means "no limit".</i> @@ -86,20 +86,45 @@ public class LimitedTableIterator implements TableIterator { } /** - * Wrap the given {@link TableIterator} so that limiting the number of rows to read to the given value. + * <p>Build the specified {@link TableIterator} instance and wrap it so that limiting the number of rows OR bytes to read.</p> * - * @param it The iterator to wrap. <i>MUST NOT be NULL</i> - * @param maxNbRows Maximum number of rows that can be read. There is overflow if more than this number of rows is asked. <i>A negative value means "no limit".</i> + * <p> + * If the limit is on the <b>number of bytes</b>, the given input stream will be first wrapped inside a {@link LimitedSizeInputStream}. + * Then, it will be given as only parameter of the constructor of the specified {@link TableIterator} instance. + * </p> + * + * <p>If the limit is on the <b>number of rows</b>, this {@link LimitedTableIterator} will count and limit itself the number of rows.</p> + * + * <p><i><b>IMPORTANT:</b> The specified class must:</i></p> + * <i><ul> + * <li>extend {@link TableIterator},</li> + * <li>be a concrete class,</li> + * <li>have at least one constructor with only one parameter of type {@link InputStream}.</li> + * </ul></i> + * + * <p><i>Note: + * If the given limit type is NULL (or different from ROWS and BYTES), or the limit value is <=0, no limit will be set. + * All rows and bytes will be read until the end of input is reached. + * </i></p> + * + * @param classIt Class of the {@link TableIterator} implementation to create and whose the output must be limited. + * @param input Input stream toward the table to read. + * @param type Type of the limit: ROWS or BYTES. <i>MAY be NULL</i> + * @param limit Limit in rows or bytes, depending of the "type" parameter. <i>MAY BE <=0</i> + * + * @throws DataReadException If no instance of the given class can be created, + * or if the {@link TableIterator} instance can not be initialized, + * or if the limit (in rows or bytes) has been reached. */ public < T extends TableIterator > LimitedTableIterator(final Class<T> classIt, final InputStream input, final LimitUnit type, final int limit) throws DataReadException{ try{ Constructor<T> construct = classIt.getConstructor(InputStream.class); - if (type == LimitUnit.bytes){ + if (type == LimitUnit.bytes && limit > 0){ maxNbRows = -1; innerIt = construct.newInstance(new LimitedSizeInputStream(input, limit)); }else{ innerIt = construct.newInstance(input); - maxNbRows = (type == null) ? -1 : limit; + maxNbRows = (type == null || type != LimitUnit.rows) ? -1 : limit; } }catch(InvocationTargetException ite){ Throwable t = ite.getCause(); @@ -154,7 +179,7 @@ public class LimitedTableIterator implements TableIterator { public boolean nextRow() throws DataReadException{ // Test the overflow flag and proceed only if not overflowed: if (overflow) - throw new DataReadException("Data read overflow: the limit has been reached! No more data can be read."); + throw new DataReadException("Data read overflow: the limit has already been reached! No more data can be read."); // Read the next row: boolean nextRow; @@ -206,7 +231,7 @@ public class LimitedTableIterator implements TableIterator { */ private void testOverflow() throws IllegalStateException{ if (overflow) - throw new IllegalStateException("Data read overflow: the limit has been reached! No more data can be read."); + throw new IllegalStateException("Data read overflow: the limit has already been reached! No more data can be read."); } /** diff --git a/src/tap/parameters/TAPParameters.java b/src/tap/parameters/TAPParameters.java index 697e37a..045e5af 100644 --- a/src/tap/parameters/TAPParameters.java +++ b/src/tap/parameters/TAPParameters.java @@ -288,7 +288,7 @@ public class TAPParameters extends UWSParameters { for(int i = 0; i < pairs.length; i++){ String[] table = pairs[i].split(","); if (table.length != 2) - throw new TAPException("Bad syntax ! An UPLOAD parameter must contain a list of pairs separated by a ';'. Each pair is composed of 2 parts, a table name and a URI separated by a ','."); + throw new TAPException("UPLOAD parameter incorrect: bad syntax! An UPLOAD parameter must contain a list of pairs separated by a ';'. Each pair is composed of 2 parts, a table name and a URI separated by a ','."); loaders[i] = new TableLoader(table[0], table[1], multipart); } diff --git a/src/tap/upload/LimitedSizeInputStream.java b/src/tap/upload/LimitedSizeInputStream.java index 2836545..60acbe8 100644 --- a/src/tap/upload/LimitedSizeInputStream.java +++ b/src/tap/upload/LimitedSizeInputStream.java @@ -16,7 +16,8 @@ package tap.upload; * You should have received a copy of the GNU Lesser General Public License * along with TAPLibrary. If not, see <http://www.gnu.org/licenses/>. * - * Copyright 2012 - UDS/Centre de Données astronomiques de Strasbourg (CDS) + * Copyright 2012,2014 - UDS/Centre de Données astronomiques de Strasbourg (CDS), + * Astronomisches Rechen Institut (ARI) */ import java.io.IOException; @@ -25,15 +26,34 @@ import java.security.InvalidParameterException; import com.oreilly.servlet.multipart.ExceededSizeException; +/** + * Let limit the number of bytes that can be read from a given input stream. + * + * @author Grégory Mantelet (CDS;ARI) + * @version 2.0 (08/2014) + */ public final class LimitedSizeInputStream extends InputStream { + /** Input stream whose the number of bytes that can be read must be limited. */ private final InputStream input; + /** Maximum number of bytes that can be read. */ private final long sizeLimit; + /** Number of bytes currently read. */ private long counter = 0; + /** Indicate whether the byte limit has already been reached. If <i>true</i> no more byte can be read ; + * all read(...) function will throw an {@link ExceededSizeException}. */ private boolean exceed = false; + /** + * Wrap the given input stream so that limiting the number of bytes that can be read. + * + * @param stream Stream to limit. + * @param sizeLimit Maximum number of bytes that can be read. <i>If <=0 an {@link InvalidParameterException} will be thrown.</i> + * + * @throws NullPointerException If the input stream is missing. + */ public LimitedSizeInputStream(final InputStream stream, final long sizeLimit) throws NullPointerException{ if (stream == null) throw new NullPointerException("The given input stream is NULL !"); @@ -44,6 +64,26 @@ public final class LimitedSizeInputStream extends InputStream { this.sizeLimit = sizeLimit; } + /** + * Get the input stream wrapped by this instance of {@link LimitedSizeInputStream}. + * + * @return The wrapped input stream. + * @since 2.0 + */ + public final InputStream getInnerStream(){ + return input; + } + + /** + * <p>Update the number of bytes currently read and them check whether the limit has been exceeded. + * If the limit has been exceeded, an {@link ExceededSizeException} is thrown.</p> + * + * <p>Besides, the flag {@link #exceed} is set to true in order to forbid the further reading of bytes.</p> + * + * @param nbReads Number of bytes read. + * + * @throws ExceededSizeException If, after update, the limit of bytes has been exceeded. + */ private void updateCounter(final long nbReads) throws ExceededSizeException{ if (nbReads > 0){ counter += nbReads; @@ -54,6 +94,15 @@ public final class LimitedSizeInputStream extends InputStream { } } + /** + * <p>Tell whether the limit has already been exceeded or not.</p> + * + * <p><i>Note: + * If <i>true</i> is returned, no more read will be allowed, and any attempt to read a byte will throw an {@link ExceededSizeException}. + * </i></p> + * + * @return <i>true</i> if the byte limit has been exceeded, <i>false</i> otherwise. + */ public final boolean sizeExceeded(){ return exceed; } @@ -98,17 +147,17 @@ public final class LimitedSizeInputStream extends InputStream { @Override public synchronized void mark(int readlimit) throws UnsupportedOperationException{ - throw new UnsupportedOperationException("mark() not supported in a LimitedSizeInputStream !"); + input.mark(readlimit); } @Override public boolean markSupported(){ - return false; + return input.markSupported(); } @Override public synchronized void reset() throws IOException, UnsupportedOperationException{ - throw new UnsupportedOperationException("mark() not supported in a LimitedSizeInputStream !"); + input.reset(); } } diff --git a/src/tap/upload/TableLoader.java b/src/tap/upload/TableLoader.java index e8b0502..ee3d621 100644 --- a/src/tap/upload/TableLoader.java +++ b/src/tap/upload/TableLoader.java @@ -16,53 +16,129 @@ package tap.upload; * You should have received a copy of the GNU Lesser General Public License * along with TAPLibrary. If not, see <http://www.gnu.org/licenses/>. * - * Copyright 2012 - UDS/Centre de Données astronomiques de Strasbourg (CDS) + * Copyright 2012,2014 - UDS/Centre de Données astronomiques de Strasbourg (CDS), + * Astronomisches Rechen Institut (ARI) */ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; - import java.net.MalformedURLException; import java.net.URL; - import java.security.InvalidParameterException; import java.util.Enumeration; +import tap.TAPException; +import tap.parameters.TAPParameters; + import com.oreilly.servlet.MultipartRequest; +/** + * <p>Represent an uploaded table in a {@link TAPParameters} object.</p> + * + * <p> + * This class is very useful to interpret the "upload" parameter and to get the ADQL name of the table + * and particularly to get easily an input stream on its data. Thus, it is able to open a stream on table data + * provided as a URL or inline (inside a multipart HTTP request). + * </p> + * + * <p>The syntax for the "upload" parameter is the following:</p> + * <ul> + * <li><b>Case tables provided as URL:</b> table_a,http://host_a/path;table_b,http://host_b/path;...</li> + * <li><b>Case tables provided inline:</b> table_c,param:table1;... + * and "table1" is the name of the parameter (a multipart item = a file) containing the table data. + * </li> + * </ul> + * + * @author Grégory Mantelet (CDS;ARI) + * @version 2.0 (08/2014) + */ public class TableLoader { + /** Regular expression of any acceptable URL for a table data source. */ private static final String URL_REGEXP = "^(https?|ftp)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-a-zA-Z0-9+&@#/%=~_|]"; + /** Prefix of a multipart item name (when tables are included inline in a multipart HTTP request). */ private static final String PARAM_PREFIX = "param:"; + /** Name of the uploaded table. This name is the one used in the ADQL query. */ public final String tableName; + + /** URL at which the table data are. + * <i>Note: This attribute is NULL if the table is provided inline.</i> */ private final URL url; - private final String param; + /** Name of the multipart HTTP request parameter (a multipart item = a file) containing the table data. + * <i>Note: This attribute is NULL if the table is provided as a URL.</i> */ + private final String param; + /** File containing the table data. It points toward the multipart item/parameter whose the name matches the attribute {@link #param}. + * <i>Note: This attribute is NULL if the table is provided as a URL.</i> */ private final File file; - public TableLoader(final String name, final String value){ - this(name, value, (MultipartRequest)null); + /** + * <p>Build the object representation of an item of the UPLOAD parameter: a table.</p> + * + * <p> + * <b>This table MUST be provided as a URL!</b> Otherwise, a multipart request MUST be provided and in this case + * the other constructor ({@link #TableLoader(String, String, MultipartRequest)}) MUST be used. + * </p> + * + * @param name ADQL name of the table. <i>It is the key of an item inside the UPLOAD parameter value.</i> + * @param url URL at which the table data can be found. + * + * @throws TAPException If the given URI is malformed, + * or if no multipart request is provided whereas the table is provided as a request parameter, + * or if the name or value is missing. + * + * @see #TableLoader(String, String, MultipartRequest) + */ + public TableLoader(final String name, final String url) throws TAPException{ + this(name, url, (MultipartRequest)null); } + /** + * <p>Build the object representation of an item of the UPLOAD parameter: a table.</p> + * + * <p>This table can be provided either as a URL or inline ; the generated instance of {@link TableLoader} is able to deal with both.</p> + * + * <p><i>Note: + * The search of a parameter inside the multipart request is done case sensitively. + * </i></p> + * + * @param name ADQL name of the table. <i>It is the key of an item inside the UPLOAD parameter value.</i> + * @param value URL or "param:"+paramName (where paramName is the name of the multipart request parameter containing the table data). + * <i>It is the value of an item inside the UPLAOD parameter value.</i> + * @param multipart Request containing all parameters provided in multipart. <i>It MAY be NULL if the given "value" is an URL. Otherwise, it MUST NOT be NULL.</i> + * + * @throws TAPException If the given URI is malformed, + * or if no multipart request is provided whereas the table is provided as a request parameter, + * or if the name or value is missing. + */ @SuppressWarnings("unchecked") - public TableLoader(final String name, final String uri, final MultipartRequest multipart){ + public TableLoader(final String name, final String value, final MultipartRequest multipart) throws TAPException{ + // Get the ADQL table name: if (name == null || name.trim().isEmpty()) - throw new NullPointerException("A table name can not be NULL !"); - tableName = name.trim(); + throw new TAPException("UPLOAD parameter incorrect: missing table name!"); + else + tableName = name.trim(); - if (uri == null || uri.trim().isEmpty()) - throw new NullPointerException("The table URI can not be NULL !"); - String URI = uri.trim(); - if (URI.startsWith(PARAM_PREFIX)){ + // Get the item value (either URL or parameter name): + if (value == null || value.trim().isEmpty()) + throw new NullPointerException("UPLOAD parameter incorrect: missing table URI!"); + String tableId = value.trim(); + + // CASE MULTIPART PARAMETER: + if (tableId.startsWith(PARAM_PREFIX)){ + // Ensure the multipart request is provided and the parameter name is correct: if (multipart == null) - throw new InvalidParameterException("The URI scheme \"param\" can be used ONLY IF the VOTable is provided inside the HTTP request (multipart/form-data) !"); - else if (URI.length() <= PARAM_PREFIX.length()) - throw new InvalidParameterException("Incomplete URI (" + URI + "): empty parameter name !"); + throw new TAPException("UPLOAD parameter incorrect: incorrect table URI: \"" + tableId + "\"! The URI scheme \"" + PARAM_PREFIX + "\" can be used ONLY IF the VOTable is provided inside the HTTP request (multipart/form-data)!"); + else if (tableId.length() <= PARAM_PREFIX.length()) + throw new TAPException("UPLOAD parameter incorrect: missing parameter name in \"" + tableId + "\"!"); + + // Set the parameter name: url = null; - param = URI.substring(PARAM_PREFIX.length()).trim(); + param = tableId.substring(PARAM_PREFIX.length()).trim(); + // Get the corresponding file in the multipart request (search case sensitive): Enumeration<String> enumeration = multipart.getFileNames(); File foundFile = null; while(foundFile == null && enumeration.hasMoreElements()){ @@ -71,22 +147,34 @@ public class TableLoader { foundFile = multipart.getFile(fileName); } + // Set the file: if (foundFile == null) - throw new InvalidParameterException("Incorrect file reference (" + URI + "): the parameter \"" + param + "\" does not exist !"); + throw new TAPException("UPLOAD parameter incorrect: parameter not found: \"" + tableId + "\"!"); else file = foundFile; - }else if (URI.matches(URL_REGEXP)){ + } + // CASE URL: + else if (tableId.matches(URL_REGEXP)){ try{ - url = new URL(URI); + url = new URL(tableId); param = null; file = null; }catch(MalformedURLException mue){ throw new InvalidParameterException(mue.getMessage()); } - }else - throw new InvalidParameterException("Invalid table URI: \"" + URI + "\" !"); + } + // OTHER: + else + throw new TAPException("UPLOAD parameter incorrect: invalid table URI: \"" + tableId + "\"!"); } + /** + * Open a stream toward the table data (whatever is their source, a URL or a file). + * + * @return Input over the table data. + * + * @throws IOException If any error occurs while open the stream. + */ public InputStream openStream() throws IOException{ if (url != null) return url.openStream(); @@ -94,11 +182,22 @@ public class TableLoader { return new FileInputStream(file); } + /** + * <p>Delete the table data stored in the cache.</p> + * + * <p> + * This function will just delete the file in case the table data are coming from a multipart request. + * If the table data are provided as a URL, nothing is done (so we can consider that the cache does not contain any more the associated table data). + * </p> + * + * @return <i>true</i> if the file does not exist any more in the cache, + * <i>false</i> otherwise. + */ public boolean deleteFile(){ if (file != null && file.exists()) return file.delete(); else - return false; + return true; } } diff --git a/src/tap/upload/Uploader.java b/src/tap/upload/Uploader.java index 8cef0db..8286b96 100644 --- a/src/tap/upload/Uploader.java +++ b/src/tap/upload/Uploader.java @@ -27,6 +27,8 @@ import tap.ServiceConnection; import tap.ServiceConnection.LimitUnit; import tap.TAPException; import tap.data.DataReadException; +import tap.data.LimitedTableIterator; +import tap.data.TableIterator; import tap.data.VOTableIterator; import tap.db.DBConnection; import tap.metadata.TAPColumn; @@ -37,25 +39,28 @@ import tap.metadata.TAPTable; import com.oreilly.servlet.multipart.ExceededSizeException; /** - * <p>Let upload properly given VOTable inputs.</p> - * - * <p>This class manages particularly the upload limit in rows - * (thanks to {@link VOTableIterator}) and in bytes (thanks to a {@link LimitedSizeInputStream}).</p> + * <p>Let create properly given VOTable inputs in the "database".</p> * + * <p> + * This class manages particularly the upload limit in rows and in bytes by creating a {@link LimitedTableIterator} + * with a {@link VOTableIterator}. + * </p> * * @author Grégory Mantelet (CDS;ARI) - * @version 2.0 (07/2014) + * @version 2.0 (08/2014) + * + * @see LimitedTableIterator + * @see VOTableIterator */ public class Uploader { - /** Specification of the TAP service. */ protected final ServiceConnection service; /** Connection to the "database" (which lets upload the content of any given VOTable). */ protected final DBConnection dbConn; - /** Limit on the number of rows allowed to be uploaded in once (whatever is the number of tables). */ - protected final int nbRowsLimit; - /** Limit on the number of bytes allowed to be uploaded in once (whatever is the number of tables). */ - protected final int nbBytesLimit; + /** Type of limit to set: ROWS or BYTES. <i>MAY be NULL ; if NULL, no limit will be set.</i> */ + protected final LimitUnit limitUnit; + /** Limit on the number of rows or bytes (depending of {@link #limitUnit}) allowed to be uploaded in once (whatever is the number of tables). */ + protected final int limit; /** Number of rows already loaded. */ protected int nbRows = 0; @@ -82,12 +87,12 @@ public class Uploader { // Ensure UPLOAD is allowed by the TAP service specification... if (this.service.uploadEnabled()){ // ...and set the rows or bytes limit: - if (this.service.getUploadLimitType()[1] == LimitUnit.rows){ - nbRowsLimit = ((this.service.getUploadLimit()[1] > 0) ? this.service.getUploadLimit()[1] : -1); - nbBytesLimit = -1; + if (this.service.getUploadLimitType()[1] != null && this.service.getUploadLimit()[1] > 0){ + limit = this.service.getUploadLimit()[1]; + limitUnit = this.service.getUploadLimitType()[1]; }else{ - nbBytesLimit = ((this.service.getUploadLimit()[1] > 0) ? this.service.getUploadLimit()[1] : -1); - nbRowsLimit = -1; + limit = -1; + limitUnit = null; } }else throw new TAPException("Upload aborted: this functionality is disabled in this TAP service!"); @@ -105,22 +110,19 @@ public class Uploader { * @see DBConnection#addUploadedTable(TAPTable, tap.data.TableIterator) */ public TAPSchema upload(final TableLoader[] loaders) throws TAPException{ - TAPSchema uploadSchema = new TAPSchema(STDSchema.UPLOADSCHEMA.getLabel()); + TAPSchema uploadSchema = new TAPSchema(STDSchema.UPLOADSCHEMA.label); InputStream votable = null; String tableName = null; try{ + // Iterate over the full list of uploaded tables: for(TableLoader loader : loaders){ tableName = loader.tableName; // Open a stream toward the VOTable: votable = loader.openStream(); - // Set a byte limit if one is required: - if (nbBytesLimit > 0) - votable = new LimitedSizeInputStream(votable, nbBytesLimit); - - // Start reading the VOTable: - VOTableIterator dataIt = new VOTableIterator(votable); + // Start reading the VOTable (with the identified limit, if any): + TableIterator dataIt = new LimitedTableIterator(VOTableIterator.class, votable, limitUnit, limit); // Define the table to upload: TAPColumn[] columns = dataIt.getMetadata(); @@ -133,7 +135,7 @@ public class Uploader { uploadSchema.addTable(table); // Create and fill the corresponding table in the database: - dbConn.addUploadedTable(table, dataIt, nbRowsLimit); + dbConn.addUploadedTable(table, dataIt); // Close the VOTable stream: votable.close(); @@ -141,16 +143,11 @@ public class Uploader { } }catch(DataReadException dre){ if (dre.getCause() instanceof ExceededSizeException) - throw new TAPException("Upload limit exceeded ! You can upload at most " + ((nbBytesLimit > 0) ? (nbBytesLimit + " bytes.") : (nbRowsLimit + " rows."))); + throw dre; else throw new TAPException("Error while reading the VOTable \"" + tableName + "\": " + dre.getMessage(), dre); }catch(IOException ioe){ throw new TAPException("Error while reading the VOTable of \"" + tableName + "\"!", ioe); - }catch(NullPointerException npe){ - if (votable != null && votable instanceof LimitedSizeInputStream) - throw new TAPException("Upload limit exceeded ! You can upload at most " + ((nbBytesLimit > 0) ? (nbBytesLimit + " bytes.") : (nbRowsLimit + " rows."))); - else - throw new TAPException(npe); }finally{ try{ if (votable != null) @@ -160,7 +157,7 @@ public class Uploader { } } - // Return the TAP_UPLOAD schema (containing just the uploaded tables): + // Return the TAP_UPLOAD schema (containing just the description of the uploaded tables): return uploadSchema; } -- GitLab