diff --git a/src/tap/data/LimitedTableIterator.java b/src/tap/data/LimitedTableIterator.java index 97f82e6661285782af2ed190f65e3b07c51f4912..4adeb237d2a6318a591171dfbcce3ca64d1ca228 100644 --- a/src/tap/data/LimitedTableIterator.java +++ b/src/tap/data/LimitedTableIterator.java @@ -55,7 +55,7 @@ import com.oreilly.servlet.multipart.ExceededSizeException; * </p> * * @author Grégory Mantelet (ARI) - * @version 2.0 (08/2014) + * @version 2.0 (12/2014) * @since 2.0 */ public class LimitedTableIterator implements TableIterator { @@ -171,7 +171,7 @@ public class LimitedTableIterator implements TableIterator { } @Override - public TAPColumn[] getMetadata(){ + public TAPColumn[] getMetadata() throws DataReadException{ return innerIt.getMetadata(); } diff --git a/src/tap/data/TableIterator.java b/src/tap/data/TableIterator.java index 17d8445d1e3e7a681685e0bd73b6254d0ba63cb4..cdc16de18754ddca928626b0bc4946e4496d98a8 100644 --- a/src/tap/data/TableIterator.java +++ b/src/tap/data/TableIterator.java @@ -21,8 +21,8 @@ package tap.data; import java.util.NoSuchElementException; -import adql.db.DBType; import tap.metadata.TAPColumn; +import adql.db.DBType; /** * <p>Let's iterate on each row and then on each column over a table dataset.</p> @@ -50,8 +50,8 @@ import tap.metadata.TAPColumn; * } * </pre> * - * @author Grégory Mantelet (ARI) - gmantele@ari.uni-heidelberg.de - * @version 2.0 (08/2014) + * @author Grégory Mantelet (ARI) + * @version 2.0 (12/2014) * @since 2.0 */ public interface TableIterator { @@ -70,7 +70,7 @@ public interface TableIterator { * * @see #getColType() */ - public TAPColumn[] getMetadata(); + public TAPColumn[] getMetadata() throws DataReadException; /** * <p>Go to the next row if there is one.</p> diff --git a/src/tap/data/VOTableIterator.java b/src/tap/data/VOTableIterator.java index c79dd05eeb33ffd48169d84174f420e5aa476b42..c7bd03bc290e22dbd330154dde18ed9ebb66b775 100644 --- a/src/tap/data/VOTableIterator.java +++ b/src/tap/data/VOTableIterator.java @@ -1,40 +1,21 @@ package tap.data; -/* - * This file is part of TAPLibrary. - * - * TAPLibrary is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * TAPLibrary is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with TAPLibrary. If not, see <http://www.gnu.org/licenses/>. - * - * Copyright 2014 - Astronomisches Rechen Institut (ARI) - */ - import java.io.IOException; import java.io.InputStream; import java.util.NoSuchElementException; -import adql.db.DBType; import tap.TAPException; import tap.metadata.TAPColumn; import tap.metadata.VotType; import tap.metadata.VotType.VotDatatype; import uk.ac.starlink.table.ColumnInfo; import uk.ac.starlink.table.DescribedValue; -import uk.ac.starlink.table.OnceRowPipe; -import uk.ac.starlink.table.RowSequence; import uk.ac.starlink.table.StarTable; import uk.ac.starlink.table.StarTableFactory; import uk.ac.starlink.table.TableBuilder; +import uk.ac.starlink.table.TableFormatException; +import uk.ac.starlink.table.TableSink; +import adql.db.DBType; /** * <p>{@link TableIterator} which lets iterate over a VOTable input stream using STIL.</p> @@ -42,25 +23,310 @@ import uk.ac.starlink.table.TableBuilder; * <p>{@link #getColType()} will return TAP type based on the type declared in the VOTable metadata part.</p> * * @author Grégory Mantelet (ARI) - * @version 2.0 (07/2014) + * @version 2.0 (12/2014) * @since 2.0 */ public class VOTableIterator implements TableIterator { - /** Metadata of all columns identified before the iteration. */ - private final TAPColumn[] colMeta; - /** Number of columns to read. */ - private final int nbColumns; - /** Sequence of rows over which we must iterate. */ - private final RowSequence rowSeq; + /** + * <p>This class lets consume the metadata and rows of a VOTable document.</p> + * + * <p> + * On the contrary to a usual TableSink, this one will stop after each row until this row has been fetched by {@link VOTableIterator}. + * </p> + * + * <p> + * Besides, the metadata returned by StarTable are immediately converted into TAP metadata. If this conversion fails, the error is kept + * in metaError, so that the VOTable reading can continue if the fact that metadata are missing is not a problem for the class using the + * {@link VOTableIterator}. + * </p> + * + * @author Grégory Mantelet (ARI) + * @version 2.0 (12/2014) + * @since 2.0 + */ + protected static class StreamVOTableSink implements TableSink { + + /** <p>The accepted VOTable metadata, after conversion from StarTable metadata.</p> + * <p><i>Note: this may be NULL after the metadata has been read if an error occurred while performing the conversion. + * In this case, metaError contains this error.</> */ + private TAPColumn[] meta = null; + + /** The error which happened while converting the StarTable metadata into TAP metadata. */ + private DataReadException metaError = null; + + /** The last accepted row. */ + private Object[] pendingRow = null; + + /** Flag meaning that the end of the stream has been reached + * OR if the VOTable reading should be stopped before reading more rows. */ + private boolean endReached = false; + + /** + * <p>Stop nicely reading the VOTable.</p> + * + * <p> + * An exception will be thrown to the STILTS class using this TableSink, + * but no exception should be thrown to VOTableIterator. + * </p> + */ + public synchronized void stop(){ + endReached = true; + notifyAll(); + } + + @Override + public synchronized void acceptMetadata(final StarTable metaTable) throws TableFormatException { + try{ + // Convert the StartTable metadata into TAP metadata: + meta = extractColMeta(metaTable); + + }catch(DataReadException dre){ + // Save the error ; this error will be throw when a call to getMetadata() will be done: + metaError = dre; + + }finally{ + // Free all waiting threads: + notifyAll(); + } + } + + @Override + public synchronized void acceptRow(final Object[] row) throws IOException { + try{ + // Wait until the last accepted row has been consumed: + while(!endReached && pendingRow != null) + wait(); + + /* If the end has been reached, this is not normal + * (because endRows() is always called after acceptRow()...so, it means the iteration has been aborted before the end) + * and so the stream reading should be interrupted: */ + if (endReached) + throw new IOException("Streaming aborted!"); + + // Otherwise, keep the given row: + pendingRow = row; + + /* Security for the cases where a row to accept is NULL. + * In such case, pendingRow will be set to NULL and the function getRow() will wait for ever. + * This case is not supposed to happen because the caller of acceptRow(...) should not give a NULL row... + * ...which should then mean that the end of the stream has been reached. */ + if (pendingRow == null) + endReached = true; + + }catch(InterruptedException ie){ + /* If the thread has been interrupted, set this TableSink in a state similar to + * when the end of the stream has been reached: */ + pendingRow = null; + endReached = true; + + }finally{ + // In all cases, all waiting threads must be freed: + notifyAll(); + } + } + + @Override + public synchronized void endRows() throws IOException { + // No more rows are available: + pendingRow = null; + // Set the END flag: + endReached = true; + // Notify all waiting threads that the end has been reached: + notifyAll(); + } + + /** + * <p>Get the metadata found in the VOTable.</p> + * + * <p><i>Note: + * This method is blocking until metadata are fully available by this TableSink + * or if an error occurred while converting them in TAP metadata. + * A Thread interruption will also make this function returning. + * </i></p> + * + * @return The metadata found in the VOTable header. + * + * @throws DataReadException If the metadata can not be interpreted correctly. + */ + public synchronized TAPColumn[] getMeta() throws DataReadException{ + try{ + // Wait until metadata are available, or if an error has occurred while accepting them: + while(metaError == null && meta == null) + wait(); + + // If there was an error while interpreting the accepted metadata, throw it: + if (metaError != null) + throw metaError; + + // Otherwise, just return the metadata: + return meta; + + }catch(InterruptedException ie){ + /* If the thread has been interrupted, set this TableSink in a state similar to + * when the end of the stream has been reached: */ + endReached = true; + /* Return the metadata ; + * NULL will be returned if the interruption has occurred before the real reading of the VOTable metadata: */ + return meta; + + }finally{ + // In all cases, the waiting threads must be freed: + notifyAll(); + } + } + + /** + * <p>Get the last accepted row.</p> + * + * <p><i>Note: + * This function is blocking until a row has been accepted or the end of the stream has been reached. + * A Thread interruption will also make this function returning. + * </i></p> + * + * @return + */ + public synchronized Object[] getRow() { + try{ + // Wait until a row has been accepted or the end has been reached: + while(!endReached && pendingRow == null) + wait(); + + // If there is no more rows, just return NULL (meaning for the called "end of stream"): + if (endReached) + return null; + + /* Otherwise, reset pendingRow to NULL in order to enable the reading of the next row, + * and finally return the last accepted row: */ + Object[] row = pendingRow; + pendingRow = null; + return row; + + }catch(InterruptedException ie){ + /* If the thread has been interrupted, set this TableSink in a state similar to + * when the end of the stream has been reached: */ + endReached = true; + // Return NULL, meaning the end of the stream has been reached: + return null; + + }finally { + // In all cases, the waiting threads must be freed: + notifyAll(); + } + } + + /** + * Extract an array of {@link TAPColumn} objects. Each corresponds to one of the columns listed in the given table, + * and so corresponds to the metadata of a column. + * + * @param table {@link StarTable} which contains only the columns' information. + * + * @return The corresponding list of {@link TAPColumn} objects. + * + * @throws DataReadException If there is a problem while resolving the field datatype (for instance: unknown datatype, a multi-dimensional array is provided, a bad number format for the arraysize). + */ + protected TAPColumn[] extractColMeta(final StarTable table) throws DataReadException{ + // Count the number columns and initialize the array: + TAPColumn[] columns = new TAPColumn[table.getColumnCount()]; + + // Add all columns meta: + for(int i = 0; i < columns.length; i++){ + // get the field: + ColumnInfo colInfo = table.getColumnInfo(i); + + // get the datatype: + String datatype = getAuxDatumValue(colInfo, "Datatype"); + + // get the arraysize: + String arraysize = ColumnInfo.formatShape(colInfo.getShape()); + + // get the xtype: + String xtype = getAuxDatumValue(colInfo, "xtype"); + + // Resolve the field type: + DBType type; + try{ + type = resolveVotType(datatype, arraysize, xtype).toTAPType(); + }catch(TAPException te){ + if (te instanceof DataReadException) + throw (DataReadException)te; + else + throw new DataReadException(te.getMessage(), te); + } + + // build the TAPColumn object: + TAPColumn col = new TAPColumn(colInfo.getName(), type, colInfo.getDescription(), colInfo.getUnitString(), colInfo.getUCD(), colInfo.getUtype()); + col.setPrincipal(false); + col.setIndexed(false); + col.setStd(false); + + // append it to the array: + columns[i] = col; + } + + return columns; + } + + /** + * Extract the specified auxiliary datum value from the given {@link ColumnInfo}. + * + * @param colInfo {@link ColumnInfo} from which the auxiliary datum must be extracted. + * @param auxDatumName The name of the datum to extract. + * + * @return The extracted value as String. + */ + protected String getAuxDatumValue(final ColumnInfo colInfo, final String auxDatumName){ + DescribedValue value = colInfo.getAuxDatumByName(auxDatumName); + return (value != null) ? value.getValue().toString() : null; + } + + /** + * Resolve a VOTable field type by using the datatype, arraysize and xtype strings as specified in a VOTable document. + * + * @param datatype Attribute value of VOTable corresponding to the datatype. + * @param arraysize Attribute value of VOTable corresponding to the arraysize. + * @param xtype Attribute value of VOTable corresponding to the xtype. + * + * @return The resolved VOTable field type, or a CHAR(*) type if the specified type can not be resolved. + * + * @throws DataReadException If a field datatype is unknown. + */ + protected VotType resolveVotType(final String datatype, final String arraysize, final String xtype) throws DataReadException{ + // If no datatype is specified, return immediately a CHAR(*) type: + if (datatype == null || datatype.trim().length() == 0) + return new VotType(VotDatatype.CHAR, "*"); + + // Identify the specified datatype: + VotDatatype votdatatype; + try{ + votdatatype = VotDatatype.valueOf(datatype.toUpperCase()); + }catch(IllegalArgumentException iae){ + throw new DataReadException("unknown field datatype: \"" + datatype + "\""); + } + + // Build the VOTable type: + return new VotType(votdatatype, arraysize, xtype); + } + + } + + /** Stream containing the VOTable on which this {@link TableIterator} is iterating. */ + protected final InputStream input; + /** The StarTable consumer which is used to iterate on each row. */ + protected final StreamVOTableSink sink; /** Indicate whether the row iteration has already started. */ - private boolean iterationStarted = false; + protected boolean iterationStarted = false; /** Indicate whether the last row has already been reached. */ - private boolean endReached = false; + protected boolean endReached = false; + + /** The last read row. Column iteration is done on this array. */ + protected Object[] row; /** Index of the last read column (=0 just after {@link #nextRow()} and before {@link #nextCol()}, ={@link #nbColumns} after the last column has been read). */ - private int colIndex; - + protected int indCol = -1; + /** Number of columns available according to the metadata. */ + protected int nbCol = 0; + /** * Build a TableIterator able to read rows and columns inside the given VOTable input stream. * @@ -73,200 +339,114 @@ public class VOTableIterator implements TableIterator { // An input stream MUST BE provided: if (input == null) throw new NullPointerException("Missing VOTable document input stream over which to iterate!"); - + this.input = input; + try{ // Set the VOTable builder/interpreter: - TableBuilder tb = (new StarTableFactory()).getTableBuilder("votable"); - - // Set the TableSink to use in order to stream the data: - OnceRowPipe rowPipe = new OnceRowPipe(); + final TableBuilder tb = (new StarTableFactory()).getTableBuilder("votable"); + // Build the TableSink to use: + sink = new StreamVOTableSink(); + // Initiate the stream process: - tb.streamStarTable(input, rowPipe, null); - - // Start by reading just the metadata: - StarTable table = rowPipe.waitForStarTable(); + Thread streamThread = new Thread() { + public void run() { + try{ + tb.streamStarTable(input, sink, null); + }catch(IOException e) { + if (e.getMessage() != null && !e.getMessage().equals("Reading interrupted!")) + e.printStackTrace(); + } + } + }; + streamThread.start(); - // Convert columns' information into TAPColumn object: - colMeta = extractColMeta(table); - nbColumns = colMeta.length; - - // Set the sequence of rows on which this iterator will iterate: - rowSeq = table.getRowSequence(); - - }catch(TAPException te){ - throw new DataReadException("Unexpected field datatype: " + te.getMessage(), te); }catch(Exception ex){ throw new DataReadException("Unable to parse/read the given VOTable input stream!", ex); } } - /** - * Extract an array of {@link TAPColumn} objects. Each corresponds to one of the columns listed in the given table, - * and so corresponds to the metadata of a column. - * - * @param table {@link StarTable} which contains only the columns' information. - * - * @return The corresponding list of {@link TAPColumn} objects. - * - * @throws TAPException If there is a problem while resolving the field datatype (for instance: unknown datatype, a multi-dimensional array is provided, a bad number format for the arraysize). - */ - private static final TAPColumn[] extractColMeta(final StarTable table) throws TAPException{ - // Count the number columns and initialize the array: - TAPColumn[] columns = new TAPColumn[table.getColumnCount()]; - - // Add all columns meta: - for(int i = 0; i < columns.length; i++){ - // get the field: - ColumnInfo colInfo = table.getColumnInfo(i); - - // get the datatype: - String datatype = getAuxDatumValue(colInfo, "Datatype"); - - // get the arraysize: - String arraysize = ColumnInfo.formatShape(colInfo.getShape()); - - // get the xtype: - String xtype = getAuxDatumValue(colInfo, "xtype"); - - // Resolve the field type: - DBType type = resolveVotType(datatype, arraysize, xtype).toTAPType(); - - // build the TAPColumn object: - TAPColumn col = new TAPColumn(colInfo.getName(), type, colInfo.getDescription(), colInfo.getUnitString(), colInfo.getUCD(), colInfo.getUtype()); - col.setPrincipal(false); - col.setIndexed(false); - col.setStd(false); - - // append it to the array: - columns[i] = col; - } - - return columns; - } - - /** - * Extract the specified auxiliary datum value from the given {@link ColumnInfo}. - * - * @param colInfo {@link ColumnInfo} from which the auxiliary datum must be extracted. - * @param auxDatumName The name of the datum to extract. - * - * @return The extracted value as String. - */ - private static final String getAuxDatumValue(final ColumnInfo colInfo, final String auxDatumName){ - DescribedValue value = colInfo.getAuxDatumByName(auxDatumName); - return (value != null) ? value.getValue().toString() : null; - } - - /** - * Resolve a VOTable field type by using the datatype, arraysize and xtype strings as specified in a VOTable document. - * - * @param datatype Attribute value of VOTable corresponding to the datatype. - * @param arraysize Attribute value of VOTable corresponding to the arraysize. - * @param xtype Attribute value of VOTable corresponding to the xtype. - * - * @return The resolved VOTable field type, or a CHAR(*) type if the specified type can not be resolved. - * - * @throws TAPException If a field datatype is unknown. - */ - private static VotType resolveVotType(final String datatype, final String arraysize, final String xtype) throws TAPException{ - // If no datatype is specified, return immediately a CHAR(*) type: - if (datatype == null || datatype.trim().length() == 0) - return new VotType(VotDatatype.CHAR, "*"); - - // Identify the specified datatype: - VotDatatype votdatatype; - try{ - votdatatype = VotDatatype.valueOf(datatype.toUpperCase()); - }catch(IllegalArgumentException iae){ - throw new TAPException("unknown field datatype: \"" + datatype + "\""); - } - - // Build the VOTable type: - return new VotType(votdatatype, arraysize, xtype); - } - - /** - * <p>Check the row iteration state. That's to say whether:</p> - * <ul> - * <li>the row iteration has started = the first row has been read = a first call of {@link #nextRow()} has been done</li> - * <li>AND the row iteration is not finished = the last row has been read.</li> - * </ul> - * @throws IllegalStateException - */ - private void checkReadState() throws IllegalStateException{ - if (!iterationStarted) - throw new IllegalStateException("No row has yet been read!"); - else if (endReached) - throw new IllegalStateException("End of VOTable file already reached!"); - } - @Override - public void close() throws DataReadException{ - try{ - rowSeq.close(); - }catch(IOException ioe){ - throw new DataReadException("Can not close the iterated VOTable!", ioe); - } + public TAPColumn[] getMetadata() throws DataReadException { + return sink.getMeta(); } @Override - public TAPColumn[] getMetadata(){ - return colMeta; - } - - @Override - public boolean nextRow() throws DataReadException{ - try{ - // go to the next row: - boolean rowFetched = rowSeq.next(); - endReached = !rowFetched; - // prepare the iteration over its columns: - colIndex = 0; + public boolean nextRow() throws DataReadException { + // If no more rows, return false directly: + if (endReached) + return false; + + // Fetch the row: + row = sink.getRow(); + + // Reset the column iteration: + if (!iterationStarted){ iterationStarted = true; - return rowFetched; - }catch(IOException e){ - throw new DataReadException("Unable to read the next VOTable row!", e); + nbCol = sink.getMeta().length; } + indCol = 0; + + // Tells whether there is more rows or not: + endReached = (row == null); + return !endReached; } @Override - public boolean hasNextCol() throws IllegalStateException, DataReadException{ + public boolean hasNextCol() throws IllegalStateException, DataReadException { // Check the read state: checkReadState(); // Determine whether the last column has been reached or not: - return (colIndex < nbColumns); + return (indCol < nbCol); } @Override - public Object nextCol() throws NoSuchElementException, IllegalStateException, DataReadException{ + public Object nextCol() throws NoSuchElementException, IllegalStateException, DataReadException { // Check the read state and ensure there is still at least one column to read: if (!hasNextCol()) throw new NoSuchElementException("No more field to read!"); // Get the column value: - try{ - return rowSeq.getCell(colIndex++); - }catch(IOException se){ - throw new DataReadException("Can not read the value of the " + colIndex + "-th field!", se); - } + return row[indCol++]; } @Override - public DBType getColType() throws IllegalStateException, DataReadException{ + public DBType getColType() throws IllegalStateException, DataReadException { // Basically check the read state (for rows iteration): checkReadState(); // Check deeper the read state (for columns iteration): - if (colIndex <= 0) + if (indCol <= 0) throw new IllegalStateException("No field has yet been read!"); - else if (colIndex > nbColumns) + else if (indCol > nbCol) throw new IllegalStateException("All fields have already been read!"); // Return the column type: - return colMeta[colIndex - 1].getDatatype(); + return sink.getMeta()[indCol - 1].getDatatype(); + } + + @Override + public void close() throws DataReadException { + endReached = true; + sink.stop(); + // input.close(); // in case sink.stop() is not enough to stop the VOTable reading! + } + + /** + * <p>Check the row iteration state. That's to say whether:</p> + * <ul> + * <li>the row iteration has started = the first row has been read = a first call of {@link #nextRow()} has been done</li> + * <li>AND the row iteration is not finished = the last row has been read.</li> + * </ul> + * @throws IllegalStateException + */ + protected void checkReadState() throws IllegalStateException{ + if (!iterationStarted) + throw new IllegalStateException("No row has yet been read!"); + else if (endReached) + throw new IllegalStateException("End of VOTable file already reached!"); } }