Skip to content
JDBCConnection.java 130 KiB
Newer Older
	 * <p>Load into the corresponding tables all columns listed in TAP_SCHEMA.columns.</p>
	 * 
	 * <p><i>Note:
	 * 	Tables are searched in the given list by their ADQL name and case sensitively.
	 * 	If they can not be found a {@link DBException} is thrown.
	 * </i></p>
	 * 
	 * @param tableDef		Definition of the table TAP_SCHEMA.columns.
	 * @param lstTables		List of all published tables (= all tables listed in TAP_SCHEMA.tables).
	 * @param stmt			Statement to use in order to interact with the database.
	 * 
	 * @throws DBException	If a table can not be found, or if any other error occurs while interacting with the database. 
	 */
	protected void loadColumns(final TAPTable tableDef, final List<TAPTable> lstTables, final Statement stmt) throws DBException{
		ResultSet rs = null;
			// Determine whether the dbName column exists:
			boolean hasDBName = isColumnExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), DB_NAME_COLUMN, connection.getMetaData());

			// Build the SQL query:
			StringBuffer sqlBuf = new StringBuffer("SELECT ");
			sqlBuf.append(translator.getColumnName(tableDef.getColumn("table_name")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("column_name")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("description")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("unit")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("ucd")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("utype")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("datatype")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("size")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("principal")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("indexed")));
			sqlBuf.append(", ").append(translator.getColumnName(tableDef.getColumn("std")));
			if (hasDBName)
				sqlBuf.append(", ").append(DB_NAME_COLUMN);
			sqlBuf.append(" FROM ").append(translator.getTableName(tableDef, supportsSchema)).append(';');

			// Execute the query:
			rs = stmt.executeQuery(sqlBuf.toString());

			// Create all tables:
			while(rs.next()){
				String tableName = rs.getString(1), columnName = rs.getString(2), description = rs.getString(3), unit = rs.getString(4), ucd = rs.getString(5), utype = rs.getString(6), datatype = rs.getString(7), dbName = (hasDBName ? rs.getString(12) : null);
				int size = rs.getInt(8);
				boolean principal = toBoolean(rs.getObject(9)), indexed = toBoolean(rs.getObject(10)), std = toBoolean(rs.getObject(11));

				// get the table:
				TAPTable table = searchTable(tableName, lstTables.iterator());
				if (table == null){
					if (logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to find the table of the column \"" + columnName + "\": \"" + tableName + "\"!", null);
					throw new DBException("Impossible to find the table of the column \"" + columnName + "\": \"" + tableName + "\"!");
				}

				// resolve the column type (if any) ; by default, it will be "VARCHAR" if unknown or missing:
				// ...try to resolve the datatype in function of all datatypes declared by the TAP standard.
				if (datatype != null){
					try{
						tapDatatype = DBDatatype.valueOf(datatype.toUpperCase());
					}catch(IllegalArgumentException iae){}
				}
				// ...build the column type:
					type = new DBType(DBDatatype.UNKNOWN);
					type = new DBType(tapDatatype, size);

				// create the new column:
				TAPColumn newColumn = new TAPColumn(columnName, type, nullifyIfNeeded(description), nullifyIfNeeded(unit), nullifyIfNeeded(ucd), nullifyIfNeeded(utype));
				newColumn.setPrincipal(principal);
				newColumn.setIndexed(indexed);
				newColumn.setStd(std);

				// add the new column inside its corresponding table:
				table.addColumn(newColumn);
			}
		}catch(SQLException se){
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load columns from TAP_SCHEMA.columns!", se);
			throw new DBException("Impossible to load columns from TAP_SCHEMA.columns!", se);
		}finally{
			close(rs);
	/**
	 * <p>Load into the corresponding tables all keys listed in TAP_SCHEMA.keys and detailed in TAP_SCHEMA.key_columns.</p>
	 * 
	 * <p><i>Note:
	 * 	Tables and columns are searched in the given list by their ADQL name and case sensitively.
	 * 	If they can not be found a {@link DBException} is thrown.
	 * </i></p>
	 * 
	 * @param keysDef		Definition of the table TAP_SCHEMA.keys.
	 * @param keyColumnsDef	Definition of the table TAP_SCHEMA.key_columns.
	 * @param lstTables		List of all published tables (= all tables listed in TAP_SCHEMA.tables).
	 * @param stmt			Statement to use in order to interact with the database.
	 * 
	 * @throws DBException	If a table or a column can not be found, or if any other error occurs while interacting with the database. 
	 */
	protected void loadKeys(final TAPTable keysDef, final TAPTable keyColumnsDef, final List<TAPTable> lstTables, final Statement stmt) throws DBException{
		ResultSet rs = null;
		PreparedStatement keyColumnsStmt = null;
			// Prepare the query to get the columns of each key:
			StringBuffer sqlBuf = new StringBuffer("SELECT ");
			sqlBuf.append(translator.getColumnName(keyColumnsDef.getColumn("from_column")));
			sqlBuf.append(", ").append(translator.getColumnName(keyColumnsDef.getColumn("target_column")));
			sqlBuf.append(" FROM ").append(translator.getTableName(keyColumnsDef, supportsSchema));
			sqlBuf.append(" WHERE ").append(translator.getColumnName(keyColumnsDef.getColumn("key_id"))).append(" = ?").append(';');
			keyColumnsStmt = connection.prepareStatement(sqlBuf.toString());

			// Build the SQL query to get the keys:
			sqlBuf.delete(0, sqlBuf.length());
			sqlBuf.append("SELECT ").append(translator.getColumnName(keysDef.getColumn("key_id")));
			sqlBuf.append(", ").append(translator.getColumnName(keysDef.getColumn("from_table")));
			sqlBuf.append(", ").append(translator.getColumnName(keysDef.getColumn("target_table")));
			sqlBuf.append(", ").append(translator.getColumnName(keysDef.getColumn("description")));
			sqlBuf.append(", ").append(translator.getColumnName(keysDef.getColumn("utype")));
			sqlBuf.append(" FROM ").append(translator.getTableName(keysDef, supportsSchema)).append(';');

			// Execute the query:
			rs = stmt.executeQuery(sqlBuf.toString());

			// Create all foreign keys:
			while(rs.next()){
				String key_id = rs.getString(1), from_table = rs.getString(2), target_table = rs.getString(3), description = rs.getString(4), utype = rs.getString(5);

				// get the two tables (source and target):
				TAPTable sourceTable = searchTable(from_table, lstTables.iterator());
				if (sourceTable == null){
					if (logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to find the source table of the foreign key \"" + key_id + "\": \"" + from_table + "\"!", null);
					throw new DBException("Impossible to find the source table of the foreign key \"" + key_id + "\": \"" + from_table + "\"!");
				}
				TAPTable targetTable = searchTable(target_table, lstTables.iterator());
				if (targetTable == null){
					if (logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to find the target table of the foreign key \"" + key_id + "\": \"" + target_table + "\"!", null);
					throw new DBException("Impossible to find the target table of the foreign key \"" + key_id + "\": \"" + target_table + "\"!");
				}

				// get the list of columns joining the two tables of the foreign key:
				HashMap<String,String> columns = new HashMap<String,String>();
				ResultSet rsKeyCols = null;
				try{
					keyColumnsStmt.setString(1, key_id);
					rsKeyCols = keyColumnsStmt.executeQuery();
					while(rsKeyCols.next())
						columns.put(rsKeyCols.getString(1), rsKeyCols.getString(2));
				}catch(SQLException se){
					if (!isCancelled() && logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load key columns from TAP_SCHEMA.key_columns for the foreign key: \"" + key_id + "\"!", se);
					throw new DBException("Impossible to load key columns from TAP_SCHEMA.key_columns for the foreign key: \"" + key_id + "\"!", se);
				}finally{
					close(rsKeyCols);
				}

				// create and add the new foreign key inside the source table:
				try{
					sourceTable.addForeignKey(key_id, targetTable, columns, nullifyIfNeeded(description), nullifyIfNeeded(utype));
				}catch(Exception ex){
					if ((ex instanceof SQLException && !isCancelled()) && logger != null)
						logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to create the foreign key \"" + key_id + "\" because: " + ex.getMessage(), ex);
					throw new DBException("Impossible to create the foreign key \"" + key_id + "\" because: " + ex.getMessage(), ex);
				}
			}
		}catch(SQLException se){
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "LOAD_TAP_SCHEMA", "Impossible to load columns from TAP_SCHEMA.columns!", se);
			throw new DBException("Impossible to load columns from TAP_SCHEMA.columns!", se);
		}finally{
			close(rs);
			close(keyColumnsStmt);
	/* ********************************** */
	/* SETTING TAP_SCHEMA IN THE DATABASE */
	/* ********************************** */

	/**
	 * <p>This function is just calling the following functions:</p>
	 * <ol>
	 * 	<li>{@link #mergeTAPSchemaDefs(TAPMetadata)}</li>
	 * 	<li>{@link #startTransaction()}</li>
	 * 	<li>{@link #resetTAPSchema(Statement, TAPTable[])}</li>
	 * 	<li>{@link #createTAPSchemaTable(TAPTable, Statement)} for each standard TAP_SCHEMA table</li>
	 * 	<li>{@link #fillTAPSchema(TAPMetadata)}</li>
	 * 	<li>{@link #createTAPTableIndexes(TAPTable, Statement)} for each standard TA_SCHEMA table</li>
	 * 	<li>{@link #commit()} or {@link #rollback()}</li>
	 * 	<li>{@link #endTransaction()}</li>
	 * </ol>
	 * 
	 * <p><i><b>Important note:
	 * 	If the connection does not support transactions, then there will be merely no transaction.
	 * 	Consequently, any failure (exception/error) will not clean the partial modifications done by this function.
	 * </i></p>
	 * 
	 * @see tap.db.DBConnection#setTAPSchema(tap.metadata.TAPMetadata)
	 */
	public synchronized void setTAPSchema(final TAPMetadata metadata) throws DBException{
		// Starting of new query execution => disable the cancel flag: 
		resetCancel();
			// A. GET THE DEFINITION OF ALL STANDARD TAP TABLES:
			TAPTable[] stdTables = mergeTAPSchemaDefs(metadata);

			startTransaction();

			// B. RE-CREATE THE STANDARD TAP_SCHEMA TABLES:

			// 1. Ensure TAP_SCHEMA exists and drop all its standard TAP tables:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "CLEAN_TAP_SCHEMA", "Cleaning TAP_SCHEMA.", null);
			resetTAPSchema(stmt, stdTables);

			// 2. Create all standard TAP tables:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "CREATE_TAP_SCHEMA", "Creating TAP_SCHEMA tables.", null);
			for(TAPTable table : stdTables)
				createTAPSchemaTable(table, stmt);

			// C. FILL THE NEW TABLE USING THE GIVEN DATA ITERATOR:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "CREATE_TAP_SCHEMA", "Filling TAP_SCHEMA tables.", null);
			fillTAPSchema(metadata);

			// D. CREATE THE INDEXES OF ALL STANDARD TAP TABLES:
			if (logger != null)
				logger.logDB(LogLevel.INFO, this, "CREATE_TAP_SCHEMA", "Creating TAP_SCHEMA tables' indexes.", null);
			for(TAPTable table : stdTables)
				createTAPTableIndexes(table, stmt);

			commit();
		}catch(SQLException se){
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.ERROR, this, "CREATE_TAP_SCHEMA", "Impossible to SET TAP_SCHEMA in DB!", se);
			rollback();
			throw new DBException("Impossible to SET TAP_SCHEMA in DB!", se);
		}finally{
	 * <p>Merge the definition of TAP_SCHEMA tables given in parameter with the definition provided in the TAP standard.</p>
	 * 
	 * <p>
	 * 	The goal is to get in output the list of all standard TAP_SCHEMA tables. But it must take into account the customized
	 * 	definition given in parameter if there is one. Indeed, if a part of TAP_SCHEMA is not provided, it will be completed here by the
	 * 	definition provided in the TAP standard. And so, if the whole TAP_SCHEMA is not provided at all, the returned tables will be those
	 * 	of the IVOA standard.
	 * </p>
	 * 
	 * <p><i><b>Important note:</b>
	 * 	If the TAP_SCHEMA definition is missing or incomplete in the given metadata, it will be added or completed automatically
	 * 	by this function with the definition provided in the IVOA TAP standard.
	 * </i></p>
	 * 
	 * <p><i>Note:
	 * 	Only the standard tables of TAP_SCHEMA are considered. The others are skipped (that's to say: never returned by this function ;
	 *  however, they will stay in the given metadata).
	 * </i></p>
	 * 
	 * <p><i>Note:
	 * 	If schemas are not supported by this DBMS connection, the DB name of schemas is set to NULL and
	 * 	the DB name of tables is prefixed by the schema name.
	 * @param metadata	Metadata (with or without TAP_SCHEMA schema or some of its table). <i>Must not be NULL</i>
	 * @return	The list of all standard TAP_SCHEMA tables, ordered by creation order (see {@link #getCreationOrder(tap.metadata.TAPMetadata.STDTable)}).
	 * 
	 * @see TAPMetadata#resolveStdTable(String)
	 * @see TAPMetadata#getStdSchema(boolean)
	 * @see TAPMetadata#getStdTable(STDTable)
	protected TAPTable[] mergeTAPSchemaDefs(final TAPMetadata metadata){
		// 1. Get the TAP_SCHEMA schema from the given metadata:
		TAPSchema tapSchema = null;
		Iterator<TAPSchema> itSchema = metadata.iterator();
		while(tapSchema == null && itSchema.hasNext()){
			TAPSchema schema = itSchema.next();
			if (schema.getADQLName().equalsIgnoreCase(STDSchema.TAPSCHEMA.label))
				tapSchema = schema;
		}

		// 2. Get the provided definition of the standard TAP tables:
		TAPTable[] customStdTables = new TAPTable[5];
		if (tapSchema != null){

			/* if the schemas are not supported with this DBMS,
			 * remove its DB name: */
			if (!supportsSchema)
				tapSchema.setDBName(null);
			// retrieve only the standard TAP tables:
			Iterator<TAPTable> itTable = tapSchema.iterator();
			while(itTable.hasNext()){
				TAPTable table = itTable.next();
				int indStdTable = getCreationOrder(TAPMetadata.resolveStdTable(table.getADQLName()));
				if (indStdTable > -1)
					customStdTables[indStdTable] = table;
			}
		// 3. Build a common TAPSchema, if needed:
		if (tapSchema == null){

			// build a new TAP_SCHEMA definition based on the standard definition:
			tapSchema = TAPMetadata.getStdSchema(supportsSchema);

			// add the new TAP_SCHEMA definition in the given metadata object:
			metadata.addSchema(tapSchema);
		}

		// 4. Finally, build the join between the standard tables and the custom ones:
		TAPTable[] stdTables = new TAPTable[]{TAPMetadata.getStdTable(STDTable.SCHEMAS),TAPMetadata.getStdTable(STDTable.TABLES),TAPMetadata.getStdTable(STDTable.COLUMNS),TAPMetadata.getStdTable(STDTable.KEYS),TAPMetadata.getStdTable(STDTable.KEY_COLUMNS)};
		for(int i = 0; i < stdTables.length; i++){

			// CASE: no custom definition:
			if (customStdTables[i] == null){
				if (!supportsSchema)
					stdTables[i].setDBName(STDSchema.TAPSCHEMA.label + "_" + stdTables[i].getADQLName());
				// add the table to the fetched or built-in schema:
				tapSchema.addTable(stdTables[i]);
			}
			// CASE: custom definition
			else
				stdTables[i] = customStdTables[i];
	/**
	 * <p>Ensure the TAP_SCHEMA schema exists in the database AND it must especially drop all of its standard tables
	 * (schemas, tables, columns, keys and key_columns), if they exist.</p>
	 * 
	 * <p><i><b>Important note</b>:
	 * 	If TAP_SCHEMA already exists and contains other tables than the standard ones, they will not be dropped and they will stay in place.
	 * </i></p>
	 * 
	 * @param stmt			The statement to use in order to interact with the database.
	 * @param stdTables		List of all standard tables that must be (re-)created.
	 *                      They will be used just to know the name of the standard tables that should be dropped here. 
	 * 
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 */
	protected void resetTAPSchema(final Statement stmt, final TAPTable[] stdTables) throws SQLException{
		DatabaseMetaData dbMeta = connection.getMetaData();

		// 1. Get the qualified DB schema name:
		String dbSchemaName = (supportsSchema ? stdTables[0].getDBSchemaName() : null);

		/* 2. Test whether the schema TAP_SCHEMA exists
		 *    and if it does not, create it: */
		if (dbSchemaName != null){
			// test whether the schema TAP_SCHEMA exists:
			boolean hasTAPSchema = isSchemaExisting(dbSchemaName, dbMeta);

			// create TAP_SCHEMA if it does not exist:
			if (!hasTAPSchema)
				stmt.executeUpdate("CREATE SCHEMA " + translator.getQualifiedSchemaName(stdTables[0]) + ";");
		}

		// 2-bis. Drop all its standard tables:
		dropTAPSchemaTables(stdTables, stmt, dbMeta);
	}

	/**
	 * <p>Remove/Drop all standard TAP_SCHEMA tables given in parameter.</p>
	 * 
	 * <p><i>Note:
	 * 	To test the existence of tables to drop, {@link DatabaseMetaData#getTables(String, String, String, String[])} is called.
	 * 	Then the schema and table names are compared with the case sensitivity defined by the translator.
	 * 	Only tables matching with these comparisons will be dropped.
	 * </i></p>
	 * 
	 * @param stdTables	Tables to drop. (they should be provided ordered by their creation order (see {@link #getCreationOrder(STDTable)})).
	 * @param stmt		Statement to use in order to interact with the database.
	 * @param dbMeta	Database metadata. Used to list all existing tables.
	 * 
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 * 
	 * @see JDBCTranslator#isCaseSensitive(IdentifierField)
	 */
	private void dropTAPSchemaTables(final TAPTable[] stdTables, final Statement stmt, final DatabaseMetaData dbMeta) throws SQLException{
		String[] stdTablesToDrop = new String[]{null,null,null,null,null};

		ResultSet rs = null;
			// Retrieve only the schema name and determine whether the search should be case sensitive:
			String tapSchemaName = stdTables[0].getDBSchemaName();
			boolean schemaCaseSensitive = translator.isCaseSensitive(IdentifierField.SCHEMA);
			boolean tableCaseSensitive = translator.isCaseSensitive(IdentifierField.TABLE);

			// Identify which standard TAP tables must be dropped:
			rs = dbMeta.getTables(null, null, null, null);
			while(rs.next()){
				String rsSchema = nullifyIfNeeded(rs.getString(2)), rsTable = rs.getString(3);
				if (!supportsSchema || (tapSchemaName == null && rsSchema == null) || equals(rsSchema, tapSchemaName, schemaCaseSensitive)){
					int indStdTable;
					indStdTable = getCreationOrder(isStdTable(rsTable, stdTables, tableCaseSensitive));
					if (indStdTable > -1){
						stdTablesToDrop[indStdTable] = (rsSchema != null ? "\"" + rsSchema + "\"." : "") + "\"" + rsTable + "\"";
					}
				}
			}
		}finally{
			close(rs);
		}

		// Drop the existing tables (in the reverse order of creation):
		for(int i = stdTablesToDrop.length - 1; i >= 0; i--){
			if (stdTablesToDrop[i] != null)
				stmt.executeUpdate("DROP TABLE " + stdTablesToDrop[i] + ";");
	/**
	 * <p>Create the specified standard TAP_SCHEMA tables into the database.</p>
	 * 
	 * <p><i><b>Important note:</b>
	 * 	Only standard TAP_SCHEMA tables (schemas, tables, columns, keys and key_columns) can be created here.
	 * 	If the given table is not part of the schema TAP_SCHEMA (comparison done on the ADQL name case-sensitively)
	 * 	and is not a standard TAP_SCHEMA table (comparison done on the ADQL name case-sensitively),
	 * 	this function will do nothing and will throw an exception.
	 * </i></p>
	 * 
	 * <p><i>Note:
	 * 	An extra column is added in TAP_SCHEMA.schemas, TAP_SCHEMA.tables and TAP_SCHEMA.columns: {@value #DB_NAME_COLUMN}.
	 * 	This column is particularly used when getting the TAP metadata from the database to alias some schema, table and/or column names in ADQL. 
	 * </i></p>
	 * 
	 * @param table	Table to create.
	 * @param stmt	Statement to use in order to interact with the database.
	 * 
	 * @throws DBException	If the given table is not a standard TAP_SCHEMA table.
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 */
	protected void createTAPSchemaTable(final TAPTable table, final Statement stmt) throws DBException, SQLException{
		// 1. ENSURE THE GIVEN TABLE IS REALLY A TAP_SCHEMA TABLE (according to the ADQL names):
		if (!table.getADQLSchemaName().equalsIgnoreCase(STDSchema.TAPSCHEMA.label) || TAPMetadata.resolveStdTable(table.getADQLName()) == null)
			throw new DBException("Forbidden table creation: " + table + " is not a standard table of TAP_SCHEMA!");

		// 2. BUILD THE SQL QUERY TO CREATE THE TABLE:
		StringBuffer sql = new StringBuffer("CREATE TABLE ");
		// a. Write the fully qualified table name:
		sql.append(translator.getTableName(table, supportsSchema));

		// b. List all the columns:
		sql.append('(');
		Iterator<TAPColumn> it = table.getColumns();
		while(it.hasNext()){
			TAPColumn col = it.next();

			// column name:
			sql.append(translator.getColumnName(col));

			// column type:
			sql.append(' ').append(convertTypeToDB(col.getDatatype()));
		// b bis. Add the extra dbName column (giving the database name of a schema, table or column): 
		if ((supportsSchema && table.getADQLName().equalsIgnoreCase(STDTable.SCHEMAS.label)) || table.getADQLName().equalsIgnoreCase(STDTable.TABLES.label) || table.getADQLName().equalsIgnoreCase(STDTable.COLUMNS.label))
			sql.append(',').append(DB_NAME_COLUMN).append(" VARCHAR");

		// c. Append the primary key definition, if needed:
		String primaryKey = getPrimaryKeyDef(table.getADQLName());
		if (primaryKey != null)
			sql.append(',').append(primaryKey);

		// d. End the query:
		sql.append(')').append(';');

		// 3. FINALLY CREATE THE TABLE:
		stmt.executeUpdate(sql.toString());
	}

	/**
	 * <p>Get the primary key corresponding to the specified table.</p>
	 * 
	 * <p>If the specified table is not a standard TAP_SCHEMA table, NULL will be returned.</p>
	 * 
	 * @param tableName	ADQL table name.
	 * 
	 * @return	The primary key definition (prefixed by a space) corresponding to the specified table (ex: " PRIMARY KEY(schema_name)"),
	 *          or NULL if the specified table is not a standard TAP_SCHEMA table.
	 */
	private String getPrimaryKeyDef(final String tableName){
		STDTable stdTable = TAPMetadata.resolveStdTable(tableName);
		if (stdTable == null)
			return null;

		boolean caseSensitive = translator.isCaseSensitive(IdentifierField.COLUMN);
		switch(stdTable){
			case SCHEMAS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"schema_name\"" : "schema_name") + ")";
			case TABLES:
				return " PRIMARY KEY(" + (caseSensitive ? "\"table_name\"" : "table_name") + ")";
			case COLUMNS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"table_name\"" : "table_name") + ", " + (caseSensitive ? "\"column_name\"" : "column_name") + ")";
			case KEYS:
			case KEY_COLUMNS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"key_id\"" : "key_id") + ")";
			default:
				return null;
		}
	}

	/**
	 * <p>Create the DB indexes corresponding to the given TAP_SCHEMA table.</p>
	 * 
	 * <p><i><b>Important note:</b>
	 * 	Only standard TAP_SCHEMA tables (schemas, tables, columns, keys and key_columns) can be created here.
	 * 	If the given table is not part of the schema TAP_SCHEMA (comparison done on the ADQL name case-sensitively)
	 * 	and is not a standard TAP_SCHEMA table (comparison done on the ADQL name case-sensitively),
	 * 	this function will do nothing and will throw an exception.
	 * </i></p>
	 * 
	 * @param table	Table whose indexes must be created here.
	 * @param stmt	Statement to use in order to interact with the database.
	 * 
	 * @throws DBException	If the given table is not a standard TAP_SCHEMA table.
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 */
	protected void createTAPTableIndexes(final TAPTable table, final Statement stmt) throws DBException, SQLException{
		// 1. Ensure the given table is really a TAP_SCHEMA table (according to the ADQL names):
		if (!table.getADQLSchemaName().equalsIgnoreCase(STDSchema.TAPSCHEMA.label) || TAPMetadata.resolveStdTable(table.getADQLName()) == null)
			throw new DBException("Forbidden index creation: " + table + " is not a standard table of TAP_SCHEMA!");

		// Build the fully qualified DB name of the table: 
		final String dbTableName = translator.getTableName(table, supportsSchema);

		// Build the name prefix of all the indexes to create: 
		final String indexNamePrefix = "INDEX_" + ((table.getADQLSchemaName() != null) ? (table.getADQLSchemaName() + "_") : "") + table.getADQLName() + "_";

		Iterator<TAPColumn> it = table.getColumns();
		while(it.hasNext()){
			TAPColumn col = it.next();
			// Create an index only for columns that have the 'indexed' flag:
			if (col.isIndexed() && !isPartOfPrimaryKey(col.getADQLName()))
				stmt.executeUpdate("CREATE INDEX " + indexNamePrefix + col.getADQLName() + " ON " + dbTableName + "(" + translator.getColumnName(col) + ");");
		}
	}

	/**
	 * Tell whether the specified column is part of the primary key of its table.
	 * 
	 * @param adqlName	ADQL name of a column.
	 * 
	 * @return	<i>true</i> if the specified column is part of the primary key,
	 *          <i>false</i> otherwise.
	 */
	private boolean isPartOfPrimaryKey(final String adqlName){
		if (adqlName == null)
			return false;
		else
			return (adqlName.equalsIgnoreCase("schema_name") || adqlName.equalsIgnoreCase("table_name") || adqlName.equalsIgnoreCase("column_name") || adqlName.equalsIgnoreCase("key_id"));
	}

	/**
	 * <p>Fill all the standard tables of TAP_SCHEMA (schemas, tables, columns, keys and key_columns).</p>
	 * 
	 * <p>This function just call the following functions:</p>
	 * <ol>
	 * 	<li>{@link #fillSchemas(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillTables(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillColumns(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillKeys(TAPTable, TAPTable, Iterator)}</li>
	 * </ol>
	 * 
	 * @param meta	All schemas and tables to list inside the TAP_SCHEMA tables.
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	protected void fillTAPSchema(final TAPMetadata meta) throws SQLException, DBException{
		TAPTable metaTable;

		// 1. Fill SCHEMAS:
		metaTable = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.SCHEMAS.label);
		Iterator<TAPTable> allTables = fillSchemas(metaTable, meta.iterator());

		// 2. Fill TABLES:
		metaTable = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.TABLES.label);
		Iterator<TAPColumn> allColumns = fillTables(metaTable, allTables);
		allTables = null;

		// Fill COLUMNS:
		metaTable = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.COLUMNS.label);
		Iterator<TAPForeignKey> allKeys = fillColumns(metaTable, allColumns);
		allColumns = null;

		// Fill KEYS and KEY_COLUMNS:
		metaTable = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.KEYS.label);
		TAPTable metaTable2 = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.KEY_COLUMNS.label);
		fillKeys(metaTable, metaTable2, allKeys);
	}

	/**
	 * <p>Fill the standard table TAP_SCHEMA.schemas with the list of all published schemas.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * @param metaTable	Description of TAP_SCHEMA.schemas.
	 * @param itSchemas	Iterator over the list of schemas.
	 * 
	 * @return	Iterator over the full list of all tables (whatever is their schema).
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	private Iterator<TAPTable> fillSchemas(final TAPTable metaTable, final Iterator<TAPSchema> itSchemas) throws SQLException, DBException{
		List<TAPTable> allTables = new ArrayList<TAPTable>();

		// Build the SQL update query:
		StringBuffer sql = new StringBuffer("INSERT INTO ");
		sql.append(translator.getTableName(metaTable, supportsSchema)).append(" (");
		sql.append(translator.getColumnName(metaTable.getColumn("schema_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("description")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("utype")));
		if (supportsSchema){
			sql.append(", ").append(DB_NAME_COLUMN);
			sql.append(") VALUES (?, ?, ?, ?);");
		}else
			sql.append(") VALUES (?, ?, ?);");

		// Prepare the statement:
		PreparedStatement stmt = null;
		try{
			stmt = connection.prepareStatement(sql.toString());

			// Execute the query for each schema:
			int nbRows = 0;
			while(itSchemas.hasNext()){
				TAPSchema schema = itSchemas.next();
				nbRows++;

				// list all tables of this schema:
				appendAllInto(allTables, schema.iterator());

				// add the schema entry into the DB:
				stmt.setString(1, schema.getADQLName());
				stmt.setString(2, schema.getDescription());
				stmt.setString(3, schema.getUtype());
				if (supportsSchema)
					stmt.setString(4, (schema.getDBName() == null || schema.getDBName().equals(schema.getADQLName())) ? null : schema.getDBName());
				executeUpdate(stmt, nbRows);
			}
			executeBatchUpdates(stmt, nbRows);
		}finally{
			close(stmt);
		}

		return allTables.iterator();
	}

	/**
	 * <p>Fill the standard table TAP_SCHEMA.tables with the list of all published tables.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * @param metaTable	Description of TAP_SCHEMA.tables.
	 * @param itTables	Iterator over the list of tables.
	 * 
	 * @return	Iterator over the full list of all columns (whatever is their table).
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	private Iterator<TAPColumn> fillTables(final TAPTable metaTable, final Iterator<TAPTable> itTables) throws SQLException, DBException{
		List<TAPColumn> allColumns = new ArrayList<TAPColumn>();

		// Build the SQL update query:
		StringBuffer sql = new StringBuffer("INSERT INTO ");
		sql.append(translator.getTableName(metaTable, supportsSchema)).append(" (");
		sql.append(translator.getColumnName(metaTable.getColumn("schema_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("table_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("table_type")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("description")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("utype")));
		sql.append(", ").append(DB_NAME_COLUMN);
		sql.append(") VALUES (?, ?, ?, ?, ?, ?);");

		// Prepare the statement:
		PreparedStatement stmt = null;
		try{
			stmt = connection.prepareStatement(sql.toString());

			// Execute the query for each table:
			int nbRows = 0;
			while(itTables.hasNext()){
				TAPTable table = itTables.next();
				nbRows++;

				// list all columns of this table:
				appendAllInto(allColumns, table.getColumns());

				// add the table entry into the DB:
				stmt.setString(1, table.getADQLSchemaName());
				if (table.isInitiallyQualified())
					stmt.setString(2, table.getADQLSchemaName() + "." + table.getADQLName());
				else
					stmt.setString(2, table.getADQLName());
				stmt.setString(3, table.getType().toString());
				stmt.setString(4, table.getDescription());
				stmt.setString(5, table.getUtype());
				stmt.setString(6, (table.getDBName() == null || table.getDBName().equals(table.getADQLName())) ? null : table.getDBName());
				executeUpdate(stmt, nbRows);
			}
			executeBatchUpdates(stmt, nbRows);
		}finally{
			close(stmt);
		}

		return allColumns.iterator();
	}

	/**
	 * <p>Fill the standard table TAP_SCHEMA.columns with the list of all published columns.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * @param metaTable	Description of TAP_SCHEMA.columns.
	 * @param itColumns	Iterator over the list of columns.
	 * 
	 * @return	Iterator over the full list of all foreign keys.
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	private Iterator<TAPForeignKey> fillColumns(final TAPTable metaTable, final Iterator<TAPColumn> itColumns) throws SQLException, DBException{
		List<TAPForeignKey> allKeys = new ArrayList<TAPForeignKey>();

		// Build the SQL update query:
		StringBuffer sql = new StringBuffer("INSERT INTO ");
		sql.append(translator.getTableName(metaTable, supportsSchema)).append(" (");
		sql.append(translator.getColumnName(metaTable.getColumn("table_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("column_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("description")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("unit")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("ucd")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("utype")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("datatype")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("size")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("principal")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("indexed")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("std")));
		sql.append(", ").append(DB_NAME_COLUMN);
		sql.append(") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");

		// Prepare the statement:
		PreparedStatement stmt = null;
		try{
			stmt = connection.prepareStatement(sql.toString());

			// Execute the query for each column:
			int nbRows = 0;
			while(itColumns.hasNext()){
				TAPColumn col = itColumns.next();
				nbRows++;

				// list all foreign keys of this column:
				appendAllInto(allKeys, col.getTargets());

				// add the column entry into the DB:
				if (!(col.getTable() instanceof TAPTable) || ((TAPTable)col.getTable()).isInitiallyQualified())
					stmt.setString(1, col.getTable().getADQLSchemaName() + "." + col.getTable().getADQLName());
				else
					stmt.setString(1, col.getTable().getADQLName());
				stmt.setString(2, col.getADQLName());
				stmt.setString(3, col.getDescription());
				stmt.setString(4, col.getUnit());
				stmt.setString(5, col.getUcd());
				stmt.setString(6, col.getUtype());
				stmt.setString(7, col.getDatatype().type.toString());
				stmt.setInt(8, col.getDatatype().length);
				stmt.setInt(9, col.isPrincipal() ? 1 : 0);
				stmt.setInt(10, col.isIndexed() ? 1 : 0);
				stmt.setInt(11, col.isStd() ? 1 : 0);
				stmt.setString(12, (col.getDBName() == null || col.getDBName().equals(col.getADQLName())) ? null : col.getDBName());
				executeUpdate(stmt, nbRows);
			}
			executeBatchUpdates(stmt, nbRows);
		}finally{
			close(stmt);
		}

		return allKeys.iterator();
	}

	/**
	 * <p>Fill the standard tables TAP_SCHEMA.keys and TAP_SCHEMA.key_columns with the list of all published foreign keys.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * @param metaKeys			Description of TAP_SCHEMA.keys.
	 * @param metaKeyColumns	Description of TAP_SCHEMA.key_columns.
	 * @param itKeys			Iterator over the list of foreign keys.
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	private void fillKeys(final TAPTable metaKeys, final TAPTable metaKeyColumns, final Iterator<TAPForeignKey> itKeys) throws SQLException, DBException{
		// Build the SQL update query for KEYS:
		StringBuffer sqlKeys = new StringBuffer("INSERT INTO ");
		sqlKeys.append(translator.getTableName(metaKeys, supportsSchema)).append(" (");
		sqlKeys.append(translator.getColumnName(metaKeys.getColumn("key_id")));
		sqlKeys.append(", ").append(translator.getColumnName(metaKeys.getColumn("from_table")));
		sqlKeys.append(", ").append(translator.getColumnName(metaKeys.getColumn("target_table")));
		sqlKeys.append(", ").append(translator.getColumnName(metaKeys.getColumn("description")));
		sqlKeys.append(", ").append(translator.getColumnName(metaKeys.getColumn("utype")));
		sqlKeys.append(") VALUES (?, ?, ?, ?, ?);");

		PreparedStatement stmtKeys = null, stmtKeyCols = null;
		try{
			// Prepare the statement for KEYS:
			stmtKeys = connection.prepareStatement(sqlKeys.toString());

			// Build the SQL update query for KEY_COLUMNS:
			StringBuffer sqlKeyCols = new StringBuffer("INSERT INTO ");
			sqlKeyCols.append(translator.getTableName(metaKeyColumns, supportsSchema)).append(" (");
			sqlKeyCols.append(translator.getColumnName(metaKeyColumns.getColumn("key_id")));
			sqlKeyCols.append(", ").append(translator.getColumnName(metaKeyColumns.getColumn("from_column")));
			sqlKeyCols.append(", ").append(translator.getColumnName(metaKeyColumns.getColumn("target_column")));
			sqlKeyCols.append(") VALUES (?, ?, ?);");

			// Prepare the statement for KEY_COLUMNS:
			stmtKeyCols = connection.prepareStatement(sqlKeyCols.toString());

			// Execute the query for each column:
			int nbKeys = 0, nbKeyColumns = 0;
			while(itKeys.hasNext()){
				TAPForeignKey key = itKeys.next();
				nbKeys++;

				// add the key entry into KEYS:
				stmtKeys.setString(1, key.getKeyId());
				if (key.getFromTable().isInitiallyQualified())
					stmtKeys.setString(2, key.getFromTable().getADQLSchemaName() + "." + key.getFromTable().getADQLName());
				else
					stmtKeys.setString(2, key.getFromTable().getADQLName());
				if (key.getTargetTable().isInitiallyQualified())
					stmtKeys.setString(3, key.getTargetTable().getADQLSchemaName() + "." + key.getTargetTable().getADQLName());
				else
					stmtKeys.setString(3, key.getTargetTable().getADQLName());
				stmtKeys.setString(4, key.getDescription());
				stmtKeys.setString(5, key.getUtype());
				executeUpdate(stmtKeys, nbKeys);

				// add the key columns into KEY_COLUMNS:
				Iterator<Map.Entry<String,String>> itAssoc = key.iterator();
				while(itAssoc.hasNext()){
					nbKeyColumns++;
					Map.Entry<String,String> assoc = itAssoc.next();
					stmtKeyCols.setString(1, key.getKeyId());
					stmtKeyCols.setString(2, assoc.getKey());
					stmtKeyCols.setString(3, assoc.getValue());
					executeUpdate(stmtKeyCols, nbKeyColumns);
				}
			}

			executeBatchUpdates(stmtKeys, nbKeys);
			executeBatchUpdates(stmtKeyCols, nbKeyColumns);
		}finally{
			close(stmtKeys);
			close(stmtKeyCols);
		}
	}

	/* ***************** */
	/* UPLOAD MANAGEMENT */
	/* ***************** */

	/**
	 * <p><i><b>Important note:</b>
	 * 	Only tables uploaded by users can be created in the database. To ensure that, the schema name of this table MUST be {@link STDSchema#UPLOADSCHEMA} ("TAP_UPLOAD") in ADQL.
	 * 	If it has another ADQL name, an exception will be thrown. Of course, the DB name of this schema MAY be different.
	 * </i></p>
	 * 
	 * <p><i><b>Important note:</b>
	 * 	This function may modify the given {@link TAPTable} object if schemas are not supported by this connection.
	 * 	In this case, this function will prefix the table's DB name by the schema's DB name directly inside the given
	 * 	{@link TAPTable} object. Then the DB name of the schema will be set to NULL.
	 * </i></p>
	 * 
	 * <p><i>Note:
	 * 	If the upload schema does not already exist in the database, it will be created.
	 * </i></p>
	 * 
	 * @see tap.db.DBConnection#addUploadedTable(tap.metadata.TAPTable, tap.data.TableIterator)
	 * @see #checkUploadedTableDef(TAPTable)
	 */
	@Override
	public synchronized boolean addUploadedTable(TAPTable tableDef, TableIterator data) throws DBException, DataReadException{
		// If no table to upload, consider it has been dropped and return TRUE:
		if (tableDef == null)
			return true;

		// Starting of new query execution => disable the cancel flag: 
		resetCancel();

		// Check the table is well defined (and particularly the schema is well set with an ADQL name = TAP_UPLOAD):
		checkUploadedTableDef(tableDef);

		try{

			// Start a transaction:
			startTransaction();
			// ...create a statement:

			DatabaseMetaData dbMeta = connection.getMetaData();

			// 1. Create the upload schema, if it does not already exist:
			if (!isSchemaExisting(tableDef.getDBSchemaName(), dbMeta)){
				stmt.executeUpdate("CREATE SCHEMA " + translator.getQualifiedSchemaName(tableDef) + ";");
				if (logger != null)
					logger.logDB(LogLevel.INFO, this, "SCHEMA_CREATED", "Schema \"" + tableDef.getADQLSchemaName() + "\" (in DB: " + translator.getQualifiedSchemaName(tableDef) + ") created.", null);
			// 1bis. Ensure the table does not already exist and if it is the case, throw an understandable exception:
			else if (isTableExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), dbMeta)){
				DBException de = new DBException("Impossible to create the user uploaded table in the database: " + translator.getTableName(tableDef, supportsSchema) + "! This table already exists.");
				if (logger != null)
					logger.logDB(LogLevel.ERROR, this, "ADD_UPLOAD_TABLE", de.getMessage(), de);

			// 2. Create the table:
			// ...build the SQL query:
			StringBuffer sqlBuf = new StringBuffer("CREATE TABLE ");
			sqlBuf.append(translator.getTableName(tableDef, supportsSchema)).append(" (");
			Iterator<TAPColumn> it = tableDef.getColumns();
			while(it.hasNext()){
				TAPColumn col = it.next();
				// column name:
				sqlBuf.append(translator.getColumnName(col));
				// column type:
				sqlBuf.append(' ').append(convertTypeToDB(col.getDatatype()));
				// last column ?
				if (it.hasNext())
					sqlBuf.append(',');
			}
			sqlBuf.append(");");
			// ...execute the update query:
			stmt.executeUpdate(sqlBuf.toString());

			// 3. Fill the table:
			fillUploadedTable(tableDef, data);

			// Commit the transaction:
			commit();

				logger.logDB(LogLevel.INFO, this, "TABLE_CREATED", "Table \"" + tableDef.getADQLName() + "\" (in DB: " + translator.getTableName(tableDef, supportsSchema) + ") created.", null);
			return true;

		}catch(SQLException se){
			rollback();
			if (!isCancelled() && logger != null)
				logger.logDB(LogLevel.WARNING, this, "ADD_UPLOAD_TABLE", "Impossible to create the uploaded table: " + translator.getTableName(tableDef, supportsSchema) + "!", se);
			throw new DBException("Impossible to create the uploaded table: " + translator.getTableName(tableDef, supportsSchema) + "!", se);
		}catch(DBException de){
			rollback();
			throw de;
		}catch(DataReadException dre){
			rollback();
			throw dre;
		}finally{