From 7613f228501884d06f452be63c7a98815e3f0380 Mon Sep 17 00:00:00 2001
From: gmantele <>
Date: Tue, 9 Dec 2014 16:51:53 +0100
Subject: [PATCH] [TAP] Fix bug in VOTable reading. The STIL consumer must be
 in a different thread. OnceRowPipe has been then replaced by a new internal
 static class re-doing the same work plus some adaptations, particularly to
 stop properly the stream reading before reaching the end of the VOTable.

 src/tap/data/ |   4 +-
 src/tap/data/        |   8 +-
 src/tap/data/      | 548 ++++++++++++++++---------
 3 files changed, 370 insertions(+), 190 deletions(-)

diff --git a/src/tap/data/ b/src/tap/data/
index 97f82e6..4adeb23 100644
--- a/src/tap/data/
+++ b/src/tap/data/
@@ -55,7 +55,7 @@ import com.oreilly.servlet.multipart.ExceededSizeException;
  * </p>
  * @author Gr&eacute;gory Mantelet (ARI)
- * @version 2.0 (08/2014)
+ * @version 2.0 (12/2014)
  * @since 2.0
 public class LimitedTableIterator implements TableIterator {
@@ -171,7 +171,7 @@ public class LimitedTableIterator implements TableIterator {
-	public TAPColumn[] getMetadata(){
+	public TAPColumn[] getMetadata() throws DataReadException{
 		return innerIt.getMetadata();
diff --git a/src/tap/data/ b/src/tap/data/
index 17d8445..cdc16de 100644
--- a/src/tap/data/
+++ b/src/tap/data/
@@ -21,8 +21,8 @@ package;
 import java.util.NoSuchElementException;
-import adql.db.DBType;
 import tap.metadata.TAPColumn;
+import adql.db.DBType;
  * <p>Let's iterate on each row and then on each column over a table dataset.</p>
@@ -50,8 +50,8 @@ import tap.metadata.TAPColumn;
  * 	}
  * </pre>
- * @author Gr&eacute;gory Mantelet (ARI) -
- * @version 2.0 (08/2014)
+ * @author Gr&eacute;gory Mantelet (ARI)
+ * @version 2.0 (12/2014)
  * @since 2.0
 public interface TableIterator {
@@ -70,7 +70,7 @@ public interface TableIterator {
 	 * @see #getColType()
-	public TAPColumn[] getMetadata();
+	public TAPColumn[] getMetadata() throws DataReadException;
 	 * <p>Go to the next row if there is one.</p>
diff --git a/src/tap/data/ b/src/tap/data/
index c79dd05..c7bd03b 100644
--- a/src/tap/data/
+++ b/src/tap/data/
@@ -1,40 +1,21 @@
- * This file is part of TAPLibrary.
- * 
- * TAPLibrary is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Lesser General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- * 
- * TAPLibrary is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * GNU Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with TAPLibrary.  If not, see <>.
- * 
- * Copyright 2014 - Astronomisches Rechen Institut (ARI)
- */
 import java.util.NoSuchElementException;
-import adql.db.DBType;
 import tap.TAPException;
 import tap.metadata.TAPColumn;
 import tap.metadata.VotType;
 import tap.metadata.VotType.VotDatatype;
+import adql.db.DBType;
  * <p>{@link TableIterator} which lets iterate over a VOTable input stream using STIL.</p>
@@ -42,25 +23,310 @@ import;
  * <p>{@link #getColType()} will return TAP type based on the type declared in the VOTable metadata part.</p>
  * @author Gr&eacute;gory Mantelet (ARI)
- * @version 2.0 (07/2014)
+ * @version 2.0 (12/2014)
  * @since 2.0
 public class VOTableIterator implements TableIterator {
-	/** Metadata of all columns identified before the iteration. */
-	private final TAPColumn[] colMeta;
-	/** Number of columns to read. */
-	private final int nbColumns;
-	/** Sequence of rows over which we must iterate. */
-	private final RowSequence rowSeq;
+	/**
+	 * <p>This class lets consume the metadata and rows of a VOTable document.</p>
+	 * 
+	 * <p>
+	 * 	On the contrary to a usual TableSink, this one will stop after each row until this row has been fetched by {@link VOTableIterator}.
+	 * </p>
+	 * 
+	 * <p>
+	 * 	Besides, the metadata returned by StarTable are immediately converted into TAP metadata. If this conversion fails, the error is kept
+	 * 	in metaError, so that the VOTable reading can continue if the fact that metadata are missing is not a problem for the class using the
+	 * 	{@link VOTableIterator}.
+	 * </p> 
+	 * 
+	 * @author Gr&eacute;gory Mantelet (ARI)
+	 * @version 2.0 (12/2014)
+	 * @since 2.0
+	 */
+	protected static class StreamVOTableSink implements TableSink {
+		/** <p>The accepted VOTable metadata, after conversion from StarTable metadata.</p>
+		 * <p><i>Note: this may be NULL after the metadata has been read if an error occurred while performing the conversion.
+		 * In this case, metaError contains this error.</> */
+		private TAPColumn[] meta = null;
+		/** The error which happened while converting the StarTable metadata into TAP metadata. */
+		private DataReadException metaError = null;
+		/** The last accepted row. */
+		private Object[] pendingRow = null;
+		/** Flag meaning that the end of the stream has been reached
+		 * OR if the VOTable reading should be stopped before reading more rows. */
+		private boolean endReached = false;
+		/**
+		 * <p>Stop nicely reading the VOTable.</p>
+		 * 
+		 * <p>
+		 * 	An exception will be thrown to the STILTS class using this TableSink,
+		 * 	but no exception should be thrown to VOTableIterator.
+		 * </p>
+		 */
+		public synchronized void stop(){
+			endReached = true;
+			notifyAll();
+		}
+		@Override
+		public synchronized void acceptMetadata(final StarTable metaTable) throws TableFormatException {
+			try{
+				// Convert the StartTable metadata into TAP metadata:
+				meta = extractColMeta(metaTable);
+			}catch(DataReadException dre){
+				// Save the error ; this error will be throw when a call to getMetadata() will be done:
+				metaError = dre;
+			}finally{
+				// Free all waiting threads:
+				notifyAll();
+			}
+		}
+		@Override
+		public synchronized void acceptRow(final Object[] row) throws IOException {
+			try{
+				// Wait until the last accepted row has been consumed: 
+				while(!endReached && pendingRow != null)
+					wait();
+				/* If the end has been reached, this is not normal
+				 * (because endRows() is always called after acceptRow(), it means the iteration has been aborted before the end)
+				 * and so the stream reading should be interrupted: */
+				if (endReached)
+					throw new IOException("Streaming aborted!");
+				// Otherwise, keep the given row:
+				pendingRow = row;
+				/* Security for the cases where a row to accept is NULL.
+				 * In such case, pendingRow will be set to NULL and the function getRow() will wait for ever.
+				 * This case is not supposed to happen because the caller of acceptRow(...) should not give a NULL row...
+				 * ...which should then mean that the end of the stream has been reached. */
+				if (pendingRow == null)
+					endReached = true;
+			}catch(InterruptedException ie){
+				/* If the thread has been interrupted, set this TableSink in a state similar to
+				 * when the end of the stream has been reached: */
+				pendingRow = null;
+				endReached = true;
+			}finally{
+				// In all cases, all waiting threads must be freed:
+				notifyAll();
+			}
+		}
+		@Override
+		public synchronized void endRows() throws IOException {
+			// No more rows are available:
+			pendingRow = null;
+			// Set the END flag:
+			endReached = true;
+			// Notify all waiting threads that the end has been reached:
+			notifyAll();
+		}
+		/**
+		 * <p>Get the metadata found in the VOTable.</p>
+		 * 
+		 * <p><i>Note:
+		 * 	This method is blocking until metadata are fully available by this TableSink
+		 * 	or if an error occurred while converting them in TAP metadata.
+		 * 	A Thread interruption will also make this function returning.
+		 * </i></p>
+		 * 
+		 * @return The metadata found in the VOTable header.
+		 * 
+		 * @throws DataReadException	If the metadata can not be interpreted correctly.
+		 */
+		public synchronized TAPColumn[] getMeta() throws DataReadException{
+			try{
+				// Wait until metadata are available, or if an error has occurred while accepting them:
+				while(metaError == null && meta == null)
+					wait();
+				// If there was an error while interpreting the accepted metadata, throw it:
+				if (metaError != null)
+					throw metaError;
+				// Otherwise, just return the metadata:
+				return meta;
+			}catch(InterruptedException ie){
+				/* If the thread has been interrupted, set this TableSink in a state similar to
+				 * when the end of the stream has been reached: */
+				endReached = true;
+				/* Return the metadata ;
+				 * NULL will be returned if the interruption has occurred before the real reading of the VOTable metadata: */
+				return meta;
+			}finally{
+				// In all cases, the waiting threads must be freed:
+				notifyAll();
+			}
+		}
+		/**
+		 * <p>Get the last accepted row.</p>
+		 * 
+		 * <p><i>Note:
+		 * 	This function is blocking until a row has been accepted or the end of the stream has been reached.
+		 * 	A Thread interruption will also make this function returning.
+		 * </i></p>
+		 * 
+		 * @return
+		 */
+		public synchronized Object[] getRow() {
+			try{
+				// Wait until a row has been accepted or the end has been reached:
+				while(!endReached && pendingRow == null)
+					wait();
+				// If there is no more rows, just return NULL (meaning for the called "end of stream"):
+				if (endReached)
+					return null;
+				/* Otherwise, reset pendingRow to NULL in order to enable the reading of the next row,
+				 * and finally return the last accepted row: */
+				Object[] row = pendingRow;
+				pendingRow = null;
+				return row;
+			}catch(InterruptedException ie){
+				/* If the thread has been interrupted, set this TableSink in a state similar to
+				 * when the end of the stream has been reached: */
+				endReached = true;
+				// Return NULL, meaning the end of the stream has been reached:
+				return null;
+			}finally {
+				// In all cases, the waiting threads must be freed:
+				notifyAll();
+			}
+		}
+		/**
+		 * Extract an array of {@link TAPColumn} objects. Each corresponds to one of the columns listed in the given table,
+		 * and so corresponds to the metadata of a column. 
+		 * 
+		 * @param table		{@link StarTable} which contains only the columns' information.
+		 * 
+		 * @return			The corresponding list of {@link TAPColumn} objects.
+		 * 
+		 * @throws DataReadException	If there is a problem while resolving the field datatype (for instance: unknown datatype, a multi-dimensional array is provided, a bad number format for the arraysize).
+		 */
+		protected TAPColumn[] extractColMeta(final StarTable table) throws DataReadException{
+			// Count the number columns and initialize the array:
+			TAPColumn[] columns = new TAPColumn[table.getColumnCount()];
+			// Add all columns meta:
+			for(int i = 0; i < columns.length; i++){
+				// get the field:
+				ColumnInfo colInfo = table.getColumnInfo(i);
+				// get the datatype:
+				String datatype = getAuxDatumValue(colInfo, "Datatype");
+				// get the arraysize:
+				String arraysize = ColumnInfo.formatShape(colInfo.getShape());
+				// get the xtype:
+				String xtype = getAuxDatumValue(colInfo, "xtype");
+				// Resolve the field type:
+				DBType type;
+				try{
+					type = resolveVotType(datatype, arraysize, xtype).toTAPType();
+				}catch(TAPException te){
+					if (te instanceof DataReadException)
+						throw (DataReadException)te;
+					else
+						throw new DataReadException(te.getMessage(), te);
+				}
+				// build the TAPColumn object:
+				TAPColumn col = new TAPColumn(colInfo.getName(), type, colInfo.getDescription(), colInfo.getUnitString(), colInfo.getUCD(), colInfo.getUtype());
+				col.setPrincipal(false);
+				col.setIndexed(false);
+				col.setStd(false);
+				// append it to the array:
+				columns[i] = col;
+			}
+			return columns;
+		}
+		/**
+		 * Extract the specified auxiliary datum value from the given {@link ColumnInfo}.
+		 * 
+		 * @param colInfo			{@link ColumnInfo} from which the auxiliary datum must be extracted.
+		 * @param auxDatumName		The name of the datum to extract.
+		 * 
+		 * @return	The extracted value as String.
+		 */
+		protected String getAuxDatumValue(final ColumnInfo colInfo, final String auxDatumName){
+			DescribedValue value = colInfo.getAuxDatumByName(auxDatumName);
+			return (value != null) ? value.getValue().toString() : null;
+		}
+		/**
+		 * Resolve a VOTable field type by using the datatype, arraysize and xtype strings as specified in a VOTable document.
+		 * 
+		 * @param datatype		Attribute value of VOTable corresponding to the datatype.
+		 * @param arraysize		Attribute value of VOTable corresponding to the arraysize.
+		 * @param xtype			Attribute value of VOTable corresponding to the xtype.
+		 * 
+		 * @return	The resolved VOTable field type, or a CHAR(*) type if the specified type can not be resolved.
+		 * 
+		 * @throws DataReadException	If a field datatype is unknown.
+		 */
+		protected VotType resolveVotType(final String datatype, final String arraysize, final String xtype) throws DataReadException{
+			// If no datatype is specified, return immediately a CHAR(*) type:
+			if (datatype == null || datatype.trim().length() == 0)
+				return new VotType(VotDatatype.CHAR, "*");
+			// Identify the specified datatype:
+			VotDatatype votdatatype;
+			try{
+				votdatatype = VotDatatype.valueOf(datatype.toUpperCase());
+			}catch(IllegalArgumentException iae){
+				throw new DataReadException("unknown field datatype: \"" + datatype + "\"");
+			}
+			// Build the VOTable type:
+			return new VotType(votdatatype, arraysize, xtype);
+		}
+	}
+	/** Stream containing the VOTable on which this {@link TableIterator} is iterating. */
+	protected final InputStream input;
+	/** The StarTable consumer which is used to iterate on each row. */
+	protected final StreamVOTableSink sink;
 	/** Indicate whether the row iteration has already started. */
-	private boolean iterationStarted = false;
+	protected boolean iterationStarted = false;
 	/** Indicate whether the last row has already been reached. */
-	private boolean endReached = false;
+	protected boolean endReached = false;
+	/** The last read row. Column iteration is done on this array. */
+	protected Object[] row;
 	/** Index of the last read column (=0 just after {@link #nextRow()} and before {@link #nextCol()}, ={@link #nbColumns} after the last column has been read). */
-	private int colIndex;
+	protected int indCol = -1;
+	/** Number of columns available according to the metadata. */
+	protected int nbCol = 0;
 	 * Build a TableIterator able to read rows and columns inside the given VOTable input stream.
@@ -73,200 +339,114 @@ public class VOTableIterator implements TableIterator {
 		// An input stream MUST BE provided:
 		if (input == null)
 			throw new NullPointerException("Missing VOTable document input stream over which to iterate!");
+		this.input = input;
 			// Set the VOTable builder/interpreter:
-			TableBuilder tb = (new StarTableFactory()).getTableBuilder("votable");
-			// Set the TableSink to use in order to stream the data: 
-			OnceRowPipe rowPipe = new OnceRowPipe();
+			final TableBuilder tb = (new StarTableFactory()).getTableBuilder("votable");
+			// Build the TableSink to use:
+			sink = new StreamVOTableSink();
 			// Initiate the stream process:
-			tb.streamStarTable(input, rowPipe, null);
-			// Start by reading just the metadata:
-			StarTable table = rowPipe.waitForStarTable();
+			Thread streamThread = new Thread() {
+                public void run() {
+                    try{
+            			tb.streamStarTable(input, sink, null);
+                    }catch(IOException e) {
+                    	if (e.getMessage() != null && !e.getMessage().equals("Reading interrupted!"))
+                    		e.printStackTrace();
+                    }
+                }
+            };
+            streamThread.start();
-			// Convert columns' information into TAPColumn object:
-			colMeta = extractColMeta(table);
-			nbColumns = colMeta.length;
-			// Set the sequence of rows on which this iterator will iterate:
-			rowSeq = table.getRowSequence();
-		}catch(TAPException te){
-			throw new DataReadException("Unexpected field datatype: " + te.getMessage(), te);
 		}catch(Exception ex){
 			throw new DataReadException("Unable to parse/read the given VOTable input stream!", ex);
-	/**
-	 * Extract an array of {@link TAPColumn} objects. Each corresponds to one of the columns listed in the given table,
-	 * and so corresponds to the metadata of a column. 
-	 * 
-	 * @param table		{@link StarTable} which contains only the columns' information.
-	 * 
-	 * @return			The corresponding list of {@link TAPColumn} objects.
-	 * 
-	 * @throws TAPException	If there is a problem while resolving the field datatype (for instance: unknown datatype, a multi-dimensional array is provided, a bad number format for the arraysize).
-	 */
-	private static final TAPColumn[] extractColMeta(final StarTable table) throws TAPException{
-		// Count the number columns and initialize the array:
-		TAPColumn[] columns = new TAPColumn[table.getColumnCount()];
-		// Add all columns meta:
-		for(int i = 0; i < columns.length; i++){
-			// get the field:
-			ColumnInfo colInfo = table.getColumnInfo(i);
-			// get the datatype:
-			String datatype = getAuxDatumValue(colInfo, "Datatype");
-			// get the arraysize:
-			String arraysize = ColumnInfo.formatShape(colInfo.getShape());
-			// get the xtype:
-			String xtype = getAuxDatumValue(colInfo, "xtype");
-			// Resolve the field type:
-			DBType type = resolveVotType(datatype, arraysize, xtype).toTAPType();
-			// build the TAPColumn object:
-			TAPColumn col = new TAPColumn(colInfo.getName(), type, colInfo.getDescription(), colInfo.getUnitString(), colInfo.getUCD(), colInfo.getUtype());
-			col.setPrincipal(false);
-			col.setIndexed(false);
-			col.setStd(false);
-			// append it to the array:
-			columns[i] = col;
-		}
-		return columns;
-	}
-	/**
-	 * Extract the specified auxiliary datum value from the given {@link ColumnInfo}.
-	 * 
-	 * @param colInfo			{@link ColumnInfo} from which the auxiliary datum must be extracted.
-	 * @param auxDatumName		The name of the datum to extract.
-	 * 
-	 * @return	The extracted value as String.
-	 */
-	private static final String getAuxDatumValue(final ColumnInfo colInfo, final String auxDatumName){
-		DescribedValue value = colInfo.getAuxDatumByName(auxDatumName);
-		return (value != null) ? value.getValue().toString() : null;
-	}
-	/**
-	 * Resolve a VOTable field type by using the datatype, arraysize and xtype strings as specified in a VOTable document.
-	 * 
-	 * @param datatype		Attribute value of VOTable corresponding to the datatype.
-	 * @param arraysize		Attribute value of VOTable corresponding to the arraysize.
-	 * @param xtype			Attribute value of VOTable corresponding to the xtype.
-	 * 
-	 * @return	The resolved VOTable field type, or a CHAR(*) type if the specified type can not be resolved.
-	 * 
-	 * @throws TAPException	If a field datatype is unknown.
-	 */
-	private static VotType resolveVotType(final String datatype, final String arraysize, final String xtype) throws TAPException{
-		// If no datatype is specified, return immediately a CHAR(*) type:
-		if (datatype == null || datatype.trim().length() == 0)
-			return new VotType(VotDatatype.CHAR, "*");
-		// Identify the specified datatype:
-		VotDatatype votdatatype;
-		try{
-			votdatatype = VotDatatype.valueOf(datatype.toUpperCase());
-		}catch(IllegalArgumentException iae){
-			throw new TAPException("unknown field datatype: \"" + datatype + "\"");
-		}
-		// Build the VOTable type:
-		return new VotType(votdatatype, arraysize, xtype);
-	}
-	/**
-	 * <p>Check the row iteration state. That's to say whether:</p>
-	 * <ul>
-	 * 	<li>the row iteration has started = the first row has been read = a first call of {@link #nextRow()} has been done</li>
-	 * 	<li>AND the row iteration is not finished = the last row has been read.</li>
-	 * </ul>
-	 * @throws IllegalStateException
-	 */
-	private void checkReadState() throws IllegalStateException{
-		if (!iterationStarted)
-			throw new IllegalStateException("No row has yet been read!");
-		else if (endReached)
-			throw new IllegalStateException("End of VOTable file already reached!");
-	}
-	public void close() throws DataReadException{
-		try{
-			rowSeq.close();
-		}catch(IOException ioe){
-			throw new DataReadException("Can not close the iterated VOTable!", ioe);
-		}
+	public TAPColumn[] getMetadata() throws DataReadException {
+		return sink.getMeta();
-	public TAPColumn[] getMetadata(){
-		return colMeta;
-	}
-	@Override
-	public boolean nextRow() throws DataReadException{
-		try{
-			// go to the next row:
-			boolean rowFetched =;
-			endReached = !rowFetched;
-			// prepare the iteration over its columns:
-			colIndex = 0;
+	public boolean nextRow() throws DataReadException {
+		// If no more rows, return false directly:
+		if (endReached)
+			return false;
+		// Fetch the row:
+		row = sink.getRow();
+		// Reset the column iteration:
+		if (!iterationStarted){
 			iterationStarted = true;
-			return rowFetched;
-		}catch(IOException e){
-			throw new DataReadException("Unable to read the next VOTable row!", e);
+			nbCol = sink.getMeta().length;
+		indCol = 0;
+		// Tells whether there is more rows or not:
+		endReached = (row == null);
+		return !endReached;
-	public boolean hasNextCol() throws IllegalStateException, DataReadException{
+	public boolean hasNextCol() throws IllegalStateException, DataReadException {
 		// Check the read state:
 		// Determine whether the last column has been reached or not:
-		return (colIndex < nbColumns);
+		return (indCol < nbCol);
-	public Object nextCol() throws NoSuchElementException, IllegalStateException, DataReadException{
+	public Object nextCol() throws NoSuchElementException, IllegalStateException, DataReadException {
 		// Check the read state and ensure there is still at least one column to read:
 		if (!hasNextCol())
 			throw new NoSuchElementException("No more field to read!");
 		// Get the column value:
-		try{
-			return rowSeq.getCell(colIndex++);
-		}catch(IOException se){
-			throw new DataReadException("Can not read the value of the " + colIndex + "-th field!", se);
-		}
+		return row[indCol++];
-	public DBType getColType() throws IllegalStateException, DataReadException{
+	public DBType getColType() throws IllegalStateException, DataReadException {
 		// Basically check the read state (for rows iteration):
 		// Check deeper the read state (for columns iteration):
-		if (colIndex <= 0)
+		if (indCol <= 0)
 			throw new IllegalStateException("No field has yet been read!");
-		else if (colIndex > nbColumns)
+		else if (indCol > nbCol)
 			throw new IllegalStateException("All fields have already been read!");
 		// Return the column type:
-		return colMeta[colIndex - 1].getDatatype();
+		return sink.getMeta()[indCol - 1].getDatatype();
+	}
+	@Override
+	public void close() throws DataReadException {
+		endReached = true;
+		sink.stop();
+		// input.close(); // in case sink.stop() is not enough to stop the VOTable reading!
+	}
+	/**
+	 * <p>Check the row iteration state. That's to say whether:</p>
+	 * <ul>
+	 * 	<li>the row iteration has started = the first row has been read = a first call of {@link #nextRow()} has been done</li>
+	 * 	<li>AND the row iteration is not finished = the last row has been read.</li>
+	 * </ul>
+	 * @throws IllegalStateException
+	 */
+	protected void checkReadState() throws IllegalStateException{
+		if (!iterationStarted)
+			throw new IllegalStateException("No row has yet been read!");
+		else if (endReached)
+			throw new IllegalStateException("End of VOTable file already reached!");