Skip to content
JDBCConnection.java 115 KiB
Newer Older
	/**
	 * <p>Ensure the TAP_SCHEMA schema exists in the database AND it must especially drop all of its standard tables
	 * (schemas, tables, columns, keys and key_columns), if they exist.</p>
	 * 
	 * <p><i><b>Important note</b>:
	 * 	If TAP_SCHEMA already exists and contains other tables than the standard ones, they will not be dropped and they will stay in place.
	 * </i></p>
	 * 
	 * @param stmt			The statement to use in order to interact with the database.
	 * @param stdTables		List of all standard tables that must be (re-)created.
	 *                      They will be used just to know the name of the standard tables that should be dropped here. 
	 * 
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 */
	protected void resetTAPSchema(final Statement stmt, final TAPTable[] stdTables) throws SQLException{
		DatabaseMetaData dbMeta = connection.getMetaData();

		// 1. Get the qualified DB schema name:
		String dbSchemaName = (supportsSchema ? stdTables[0].getDBSchemaName() : null);

		/* 2. Test whether the schema TAP_SCHEMA exists
		 *    and if it does not, create it: */
		if (dbSchemaName != null){
			// test whether the schema TAP_SCHEMA exists:
			boolean hasTAPSchema = isSchemaExisting(dbSchemaName, dbMeta);

			// create TAP_SCHEMA if it does not exist:
			if (!hasTAPSchema)
				stmt.executeUpdate("CREATE SCHEMA " + translator.getQualifiedSchemaName(stdTables[0]) + ";");
		}

		// 2-bis. Drop all its standard tables:
		dropTAPSchemaTables(stdTables, stmt, dbMeta);
	}

	/**
	 * <p>Remove/Drop all standard TAP_SCHEMA tables given in parameter.</p>
	 * 
	 * <p><i>Note:
	 * 	To test the existence of tables to drop, {@link DatabaseMetaData#getTables(String, String, String, String[])} is called.
	 * 	Then the schema and table names are compared with the case sensitivity defined by the translator.
	 * 	Only tables matching with these comparisons will be dropped.
	 * </i></p>
	 * 
	 * @param stdTables	Tables to drop. (they should be provided ordered by their creation order (see {@link #getCreationOrder(STDTable)})).
	 * @param stmt		Statement to use in order to interact with the database.
	 * @param dbMeta	Database metadata. Used to list all existing tables.
	 * 
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 * 
	 * @see ADQLTranslator#isCaseSensitive(IdentifierField)
	 */
	private void dropTAPSchemaTables(final TAPTable[] stdTables, final Statement stmt, final DatabaseMetaData dbMeta) throws SQLException{
		String[] stdTablesToDrop = new String[]{null,null,null,null,null};

		ResultSet rs = null;
			// Retrieve only the schema name and determine whether the search should be case sensitive:
			String tapSchemaName = stdTables[0].getDBSchemaName();
			boolean schemaCaseSensitive = translator.isCaseSensitive(IdentifierField.SCHEMA);
			boolean tableCaseSensitive = translator.isCaseSensitive(IdentifierField.TABLE);

			// Identify which standard TAP tables must be dropped:
			rs = dbMeta.getTables(null, null, null, null);
			while(rs.next()){
				String rsSchema = nullifyIfNeeded(rs.getString(2)), rsTable = rs.getString(3);
				if (!supportsSchema || (tapSchemaName == null && rsSchema == null) || equals(rsSchema, tapSchemaName, schemaCaseSensitive)){
					int indStdTable;
					indStdTable = getCreationOrder(isStdTable(rsTable, stdTables, tableCaseSensitive));
					if (indStdTable > -1){
						stdTablesToDrop[indStdTable] = (rsSchema != null ? "\"" + rsSchema + "\"." : "") + "\"" + rsTable + "\"";
					}
				}
			}
		}finally{
			close(rs);
		}

		// Drop the existing tables (in the reverse order of creation):
		for(int i = stdTablesToDrop.length - 1; i >= 0; i--){
			if (stdTablesToDrop[i] != null)
				stmt.executeUpdate("DROP TABLE " + stdTablesToDrop[i] + ";");
	/**
	 * <p>Create the specified standard TAP_SCHEMA tables into the database.</p>
	 * 
	 * <p><i><b>Important note:</b>
	 * 	Only standard TAP_SCHEMA tables (schemas, tables, columns, keys and key_columns) can be created here.
	 * 	If the given table is not part of the schema TAP_SCHEMA (comparison done on the ADQL name case-sensitively)
	 * 	and is not a standard TAP_SCHEMA table (comparison done on the ADQL name case-sensitively),
	 * 	this function will do nothing and will throw an exception.
	 * </i></p>
	 * 
	 * <p><i>Note:
	 * 	An extra column is added in TAP_SCHEMA.schemas, TAP_SCHEMA.tables and TAP_SCHEMA.columns: {@value #DB_NAME_COLUMN}.
	 * 	This column is particularly used when getting the TAP metadata from the database to alias some schema, table and/or column names in ADQL. 
	 * </i></p>
	 * 
	 * @param table	Table to create.
	 * @param stmt	Statement to use in order to interact with the database.
	 * 
	 * @throws DBException	If the given table is not a standard TAP_SCHEMA table.
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 */
	protected void createTAPSchemaTable(final TAPTable table, final Statement stmt) throws DBException, SQLException{
		// 1. ENSURE THE GIVEN TABLE IS REALLY A TAP_SCHEMA TABLE (according to the ADQL names):
		if (!table.getADQLSchemaName().equalsIgnoreCase(STDSchema.TAPSCHEMA.label) || TAPMetadata.resolveStdTable(table.getADQLName()) == null)
			throw new DBException("Forbidden table creation: " + table + " is not a standard table of TAP_SCHEMA!");

		// 2. BUILD THE SQL QUERY TO CREATE THE TABLE:
		StringBuffer sql = new StringBuffer("CREATE TABLE ");
		// a. Write the fully qualified table name:
		sql.append(translator.getTableName(table, supportsSchema));

		// b. List all the columns:
		sql.append('(');
		Iterator<TAPColumn> it = table.getColumns();
		while(it.hasNext()){
			TAPColumn col = it.next();

			// column name:
			sql.append(translator.getColumnName(col));

			// column type:
			sql.append(' ').append(convertTypeToDB(col.getDatatype()));
		// b bis. Add the extra dbName column (giving the database name of a schema, table or column): 
		if ((supportsSchema && table.getADQLName().equalsIgnoreCase(STDTable.SCHEMAS.label)) || table.getADQLName().equalsIgnoreCase(STDTable.TABLES.label) || table.getADQLName().equalsIgnoreCase(STDTable.COLUMNS.label))
			sql.append(',').append(DB_NAME_COLUMN).append(" VARCHAR");

		// c. Append the primary key definition, if needed:
		String primaryKey = getPrimaryKeyDef(table.getADQLName());
		if (primaryKey != null)
			sql.append(',').append(primaryKey);

		// d. End the query:
		sql.append(')').append(';');

		// 3. FINALLY CREATE THE TABLE:
		stmt.executeUpdate(sql.toString());
	}

	/**
	 * <p>Get the primary key corresponding to the specified table.</p>
	 * 
	 * <p>If the specified table is not a standard TAP_SCHEMA table, NULL will be returned.</p>
	 * 
	 * @param tableName	ADQL table name.
	 * 
	 * @return	The primary key definition (prefixed by a space) corresponding to the specified table (ex: " PRIMARY KEY(schema_name)"),
	 *          or NULL if the specified table is not a standard TAP_SCHEMA table.
	 */
	private String getPrimaryKeyDef(final String tableName){
		STDTable stdTable = TAPMetadata.resolveStdTable(tableName);
		if (stdTable == null)
			return null;

		boolean caseSensitive = translator.isCaseSensitive(IdentifierField.COLUMN);
		switch(stdTable){
			case SCHEMAS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"schema_name\"" : "schema_name") + ")";
			case TABLES:
				return " PRIMARY KEY(" + (caseSensitive ? "\"table_name\"" : "table_name") + ")";
			case COLUMNS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"table_name\"" : "table_name") + ", " + (caseSensitive ? "\"column_name\"" : "column_name") + ")";
			case KEYS:
			case KEY_COLUMNS:
				return " PRIMARY KEY(" + (caseSensitive ? "\"key_id\"" : "key_id") + ")";
			default:
				return null;
		}
	}

	/**
	 * <p>Create the DB indexes corresponding to the given TAP_SCHEMA table.</p>
	 * 
	 * <p><i><b>Important note:</b>
	 * 	Only standard TAP_SCHEMA tables (schemas, tables, columns, keys and key_columns) can be created here.
	 * 	If the given table is not part of the schema TAP_SCHEMA (comparison done on the ADQL name case-sensitively)
	 * 	and is not a standard TAP_SCHEMA table (comparison done on the ADQL name case-sensitively),
	 * 	this function will do nothing and will throw an exception.
	 * </i></p>
	 * 
	 * @param table	Table whose indexes must be created here.
	 * @param stmt	Statement to use in order to interact with the database.
	 * 
	 * @throws DBException	If the given table is not a standard TAP_SCHEMA table.
	 * @throws SQLException	If any error occurs while querying or updating the database.
	 */
	protected void createTAPTableIndexes(final TAPTable table, final Statement stmt) throws DBException, SQLException{
		// 1. Ensure the given table is really a TAP_SCHEMA table (according to the ADQL names):
		if (!table.getADQLSchemaName().equalsIgnoreCase(STDSchema.TAPSCHEMA.label) || TAPMetadata.resolveStdTable(table.getADQLName()) == null)
			throw new DBException("Forbidden index creation: " + table + " is not a standard table of TAP_SCHEMA!");

		// Build the fully qualified DB name of the table: 
		final String dbTableName = translator.getTableName(table, supportsSchema);

		// Build the name prefix of all the indexes to create: 
		final String indexNamePrefix = "INDEX_" + ((table.getADQLSchemaName() != null) ? (table.getADQLSchemaName() + "_") : "") + table.getADQLName() + "_";

		Iterator<TAPColumn> it = table.getColumns();
		while(it.hasNext()){
			TAPColumn col = it.next();
			// Create an index only for columns that have the 'indexed' flag:
			if (col.isIndexed() && !isPartOfPrimaryKey(col.getADQLName()))
				stmt.executeUpdate("CREATE INDEX " + indexNamePrefix + col.getADQLName() + " ON " + dbTableName + "(" + translator.getColumnName(col) + ");");
		}
	}

	/**
	 * Tell whether the specified column is part of the primary key of its table.
	 * 
	 * @param adqlName	ADQL name of a column.
	 * 
	 * @return	<i>true</i> if the specified column is part of the primary key,
	 *          <i>false</i> otherwise.
	 */
	private boolean isPartOfPrimaryKey(final String adqlName){
		if (adqlName == null)
			return false;
		else
			return (adqlName.equalsIgnoreCase("schema_name") || adqlName.equalsIgnoreCase("table_name") || adqlName.equalsIgnoreCase("column_name") || adqlName.equalsIgnoreCase("key_id"));
	}

	/**
	 * <p>Fill all the standard tables of TAP_SCHEMA (schemas, tables, columns, keys and key_columns).</p>
	 * 
	 * <p>This function just call the following functions:</p>
	 * <ol>
	 * 	<li>{@link #fillSchemas(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillTables(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillColumns(TAPTable, Iterator)}</li>
	 * 	<li>{@link #fillKeys(TAPTable, TAPTable, Iterator)}</li>
	 * </ol>
	 * 
	 * @param meta	All schemas and tables to list inside the TAP_SCHEMA tables.
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	protected void fillTAPSchema(final TAPMetadata meta) throws SQLException, DBException{
		TAPTable metaTable;

		// 1. Fill SCHEMAS:
		metaTable = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.SCHEMAS.label);
		Iterator<TAPTable> allTables = fillSchemas(metaTable, meta.iterator());

		// 2. Fill TABLES:
		metaTable = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.TABLES.label);
		Iterator<TAPColumn> allColumns = fillTables(metaTable, allTables);
		allTables = null;

		// Fill COLUMNS:
		metaTable = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.COLUMNS.label);
		Iterator<TAPForeignKey> allKeys = fillColumns(metaTable, allColumns);
		allColumns = null;

		// Fill KEYS and KEY_COLUMNS:
		metaTable = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.KEYS.label);
		TAPTable metaTable2 = meta.getTable(STDSchema.TAPSCHEMA.label, STDTable.KEY_COLUMNS.label);
		fillKeys(metaTable, metaTable2, allKeys);
	}

	/**
	 * <p>Fill the standard table TAP_SCHEMA.schemas with the list of all published schemas.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * @param metaTable	Description of TAP_SCHEMA.schemas.
	 * @param itSchemas	Iterator over the list of schemas.
	 * 
	 * @return	Iterator over the full list of all tables (whatever is their schema).
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	private Iterator<TAPTable> fillSchemas(final TAPTable metaTable, final Iterator<TAPSchema> itSchemas) throws SQLException, DBException{
		List<TAPTable> allTables = new ArrayList<TAPTable>();

		// Build the SQL update query:
		StringBuffer sql = new StringBuffer("INSERT INTO ");
		sql.append(translator.getTableName(metaTable, supportsSchema)).append(" (");
		sql.append(translator.getColumnName(metaTable.getColumn("schema_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("description")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("utype")));
		if (supportsSchema){
			sql.append(", ").append(DB_NAME_COLUMN);
			sql.append(") VALUES (?, ?, ?, ?);");
		}else
			sql.append(") VALUES (?, ?, ?);");

		// Prepare the statement:
		PreparedStatement stmt = null;
		try{
			stmt = connection.prepareStatement(sql.toString());

			// Execute the query for each schema:
			int nbRows = 0;
			while(itSchemas.hasNext()){
				TAPSchema schema = itSchemas.next();
				nbRows++;

				// list all tables of this schema:
				appendAllInto(allTables, schema.iterator());

				// add the schema entry into the DB:
				stmt.setString(1, schema.getADQLName());
				stmt.setString(2, schema.getDescription());
				stmt.setString(3, schema.getUtype());
				if (supportsSchema)
					stmt.setString(4, (schema.getDBName() == null || schema.getDBName().equals(schema.getADQLName())) ? null : schema.getDBName());
				executeUpdate(stmt, nbRows);
			}
			executeBatchUpdates(stmt, nbRows);
		}finally{
			close(stmt);
		}

		return allTables.iterator();
	}

	/**
	 * <p>Fill the standard table TAP_SCHEMA.tables with the list of all published tables.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * @param metaTable	Description of TAP_SCHEMA.tables.
	 * @param itTables	Iterator over the list of tables.
	 * 
	 * @return	Iterator over the full list of all columns (whatever is their table).
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	private Iterator<TAPColumn> fillTables(final TAPTable metaTable, final Iterator<TAPTable> itTables) throws SQLException, DBException{
		List<TAPColumn> allColumns = new ArrayList<TAPColumn>();

		// Build the SQL update query:
		StringBuffer sql = new StringBuffer("INSERT INTO ");
		sql.append(translator.getTableName(metaTable, supportsSchema)).append(" (");
		sql.append(translator.getColumnName(metaTable.getColumn("schema_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("table_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("table_type")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("description")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("utype")));
		sql.append(", ").append(DB_NAME_COLUMN);
		sql.append(") VALUES (?, ?, ?, ?, ?, ?);");

		// Prepare the statement:
		PreparedStatement stmt = null;
		try{
			stmt = connection.prepareStatement(sql.toString());

			// Execute the query for each table:
			int nbRows = 0;
			while(itTables.hasNext()){
				TAPTable table = itTables.next();
				nbRows++;

				// list all columns of this table:
				appendAllInto(allColumns, table.getColumns());

				// add the table entry into the DB:
				stmt.setString(1, table.getADQLSchemaName());
				if (table.isInitiallyQualified())
					stmt.setString(2, table.getADQLSchemaName() + "." + table.getADQLName());
				else
					stmt.setString(2, table.getADQLName());
				stmt.setString(3, table.getType().toString());
				stmt.setString(4, table.getDescription());
				stmt.setString(5, table.getUtype());
				stmt.setString(6, (table.getDBName() == null || table.getDBName().equals(table.getADQLName())) ? null : table.getDBName());
				executeUpdate(stmt, nbRows);
			}
			executeBatchUpdates(stmt, nbRows);
		}finally{
			close(stmt);
		}

		return allColumns.iterator();
	}

	/**
	 * <p>Fill the standard table TAP_SCHEMA.columns with the list of all published columns.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * @param metaTable	Description of TAP_SCHEMA.columns.
	 * @param itColumns	Iterator over the list of columns.
	 * 
	 * @return	Iterator over the full list of all foreign keys.
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	private Iterator<TAPForeignKey> fillColumns(final TAPTable metaTable, final Iterator<TAPColumn> itColumns) throws SQLException, DBException{
		List<TAPForeignKey> allKeys = new ArrayList<TAPForeignKey>();

		// Build the SQL update query:
		StringBuffer sql = new StringBuffer("INSERT INTO ");
		sql.append(translator.getTableName(metaTable, supportsSchema)).append(" (");
		sql.append(translator.getColumnName(metaTable.getColumn("table_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("column_name")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("description")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("unit")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("ucd")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("utype")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("datatype")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("size")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("principal")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("indexed")));
		sql.append(", ").append(translator.getColumnName(metaTable.getColumn("std")));
		sql.append(", ").append(DB_NAME_COLUMN);
		sql.append(") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");

		// Prepare the statement:
		PreparedStatement stmt = null;
		try{
			stmt = connection.prepareStatement(sql.toString());

			// Execute the query for each column:
			int nbRows = 0;
			while(itColumns.hasNext()){
				TAPColumn col = itColumns.next();
				nbRows++;

				// list all foreign keys of this column:
				appendAllInto(allKeys, col.getTargets());

				// add the column entry into the DB:
				if (!(col.getTable() instanceof TAPTable) || ((TAPTable)col.getTable()).isInitiallyQualified())
					stmt.setString(1, col.getTable().getADQLSchemaName() + "." + col.getTable().getADQLName());
				else
					stmt.setString(1, col.getTable().getADQLName());
				stmt.setString(2, col.getADQLName());
				stmt.setString(3, col.getDescription());
				stmt.setString(4, col.getUnit());
				stmt.setString(5, col.getUcd());
				stmt.setString(6, col.getUtype());
				stmt.setString(7, col.getDatatype().type.toString());
				stmt.setInt(8, col.getDatatype().length);
				stmt.setInt(9, col.isPrincipal() ? 1 : 0);
				stmt.setInt(10, col.isIndexed() ? 1 : 0);
				stmt.setInt(11, col.isStd() ? 1 : 0);
				stmt.setString(12, (col.getDBName() == null || col.getDBName().equals(col.getADQLName())) ? null : col.getDBName());
				executeUpdate(stmt, nbRows);
			}
			executeBatchUpdates(stmt, nbRows);
		}finally{
			close(stmt);
		}

		return allKeys.iterator();
	}

	/**
	 * <p>Fill the standard tables TAP_SCHEMA.keys and TAP_SCHEMA.key_columns with the list of all published foreign keys.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * @param metaKeys			Description of TAP_SCHEMA.keys.
	 * @param metaKeyColumns	Description of TAP_SCHEMA.key_columns.
	 * @param itKeys			Iterator over the list of foreign keys.
	 * 
	 * @throws DBException	If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException	If any other SQL exception occurs.
	 */
	private void fillKeys(final TAPTable metaKeys, final TAPTable metaKeyColumns, final Iterator<TAPForeignKey> itKeys) throws SQLException, DBException{
		// Build the SQL update query for KEYS:
		StringBuffer sqlKeys = new StringBuffer("INSERT INTO ");
		sqlKeys.append(translator.getTableName(metaKeys, supportsSchema)).append(" (");
		sqlKeys.append(translator.getColumnName(metaKeys.getColumn("key_id")));
		sqlKeys.append(", ").append(translator.getColumnName(metaKeys.getColumn("from_table")));
		sqlKeys.append(", ").append(translator.getColumnName(metaKeys.getColumn("target_table")));
		sqlKeys.append(", ").append(translator.getColumnName(metaKeys.getColumn("description")));
		sqlKeys.append(", ").append(translator.getColumnName(metaKeys.getColumn("utype")));
		sqlKeys.append(") VALUES (?, ?, ?, ?, ?);");

		PreparedStatement stmtKeys = null, stmtKeyCols = null;
		try{
			// Prepare the statement for KEYS:
			stmtKeys = connection.prepareStatement(sqlKeys.toString());

			// Build the SQL update query for KEY_COLUMNS:
			StringBuffer sqlKeyCols = new StringBuffer("INSERT INTO ");
			sqlKeyCols.append(translator.getTableName(metaKeyColumns, supportsSchema)).append(" (");
			sqlKeyCols.append(translator.getColumnName(metaKeyColumns.getColumn("key_id")));
			sqlKeyCols.append(", ").append(translator.getColumnName(metaKeyColumns.getColumn("from_column")));
			sqlKeyCols.append(", ").append(translator.getColumnName(metaKeyColumns.getColumn("target_column")));
			sqlKeyCols.append(") VALUES (?, ?, ?);");

			// Prepare the statement for KEY_COLUMNS:
			stmtKeyCols = connection.prepareStatement(sqlKeyCols.toString());

			// Execute the query for each column:
			int nbKeys = 0, nbKeyColumns = 0;
			while(itKeys.hasNext()){
				TAPForeignKey key = itKeys.next();
				nbKeys++;

				// add the key entry into KEYS:
				stmtKeys.setString(1, key.getKeyId());
				if (key.getFromTable().isInitiallyQualified())
					stmtKeys.setString(2, key.getFromTable().getADQLSchemaName() + "." + key.getFromTable().getADQLName());
				else
					stmtKeys.setString(2, key.getFromTable().getADQLName());
				if (key.getTargetTable().isInitiallyQualified())
					stmtKeys.setString(3, key.getTargetTable().getADQLSchemaName() + "." + key.getTargetTable().getADQLName());
				else
					stmtKeys.setString(3, key.getTargetTable().getADQLName());
				stmtKeys.setString(4, key.getDescription());
				stmtKeys.setString(5, key.getUtype());
				executeUpdate(stmtKeys, nbKeys);

				// add the key columns into KEY_COLUMNS:
				Iterator<Map.Entry<String,String>> itAssoc = key.iterator();
				while(itAssoc.hasNext()){
					nbKeyColumns++;
					Map.Entry<String,String> assoc = itAssoc.next();
					stmtKeyCols.setString(1, key.getKeyId());
					stmtKeyCols.setString(2, assoc.getKey());
					stmtKeyCols.setString(3, assoc.getValue());
					executeUpdate(stmtKeyCols, nbKeyColumns);
				}
			}

			executeBatchUpdates(stmtKeys, nbKeys);
			executeBatchUpdates(stmtKeyCols, nbKeyColumns);
		}finally{
			close(stmtKeys);
			close(stmtKeyCols);
		}
	}

	/* ***************** */
	/* UPLOAD MANAGEMENT */
	/* ***************** */

	/**
	 * <p><i><b>Important note:</b>
	 * 	Only tables uploaded by users can be created in the database. To ensure that, the schema name of this table MUST be {@link STDSchema#UPLOADSCHEMA} ("TAP_UPLOAD") in ADQL.
	 * 	If it has another ADQL name, an exception will be thrown. Of course, the DB name of this schema MAY be different.
	 * </i></p>
	 * 
	 * <p><i><b>Important note:</b>
	 * 	This function may modify the given {@link TAPTable} object if schemas are not supported by this connection.
	 * 	In this case, this function will prefix the table's DB name by the schema's DB name directly inside the given {@link TAPTable} object
	 * 	(building the prefix with {@link #getTablePrefix(String)}). Then the DB name of the schema will be set to NULL.
	 * </i></p>
	 * 
	 * <p><i>Note:
	 * 	If the upload schema does not already exist in the database, it will be created.
	 * </i></p>
	 * 
	 * @see tap.db.DBConnection#addUploadedTable(tap.metadata.TAPTable, tap.data.TableIterator)
	 * @see #checkUploadedTableDef(TAPTable)
	 */
	@Override
	public boolean addUploadedTable(TAPTable tableDef, TableIterator data) throws DBException, DataReadException{
		// If no table to upload, consider it has been dropped and return TRUE:
		if (tableDef == null)
			return true;

		// Check the table is well defined (and particularly the schema is well set with an ADQL name = TAP_UPLOAD):
		checkUploadedTableDef(tableDef);

		Statement stmt = null;
		try{

			// Start a transaction:
			startTransaction();
			// ...create a statement:
			stmt = connection.createStatement();

			DatabaseMetaData dbMeta = connection.getMetaData();

			// 1. Create the upload schema, if it does not already exist:
			if (!isSchemaExisting(tableDef.getDBSchemaName(), dbMeta)){
				stmt.executeUpdate("CREATE SCHEMA " + translator.getQualifiedSchemaName(tableDef) + ";");
				if (logger != null)
					logger.logDB(LogLevel.INFO, this, "SCHEMA_CREATED", "Schema \"" + tableDef.getADQLSchemaName() + "\" (in DB: " + translator.getQualifiedSchemaName(tableDef) + ") created.", null);
			// 1bis. Ensure the table does not already exist and if it is the case, throw an understandable exception:
			else if (isTableExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), dbMeta)){
				DBException de = new DBException("Impossible to create the user uploaded table in the database: " + translator.getTableName(tableDef, supportsSchema) + "! This table already exists.");
				if (logger != null)
					logger.logDB(LogLevel.ERROR, this, "ADD_UPLOAD_TABLE", de.getMessage(), de);

			// 2. Create the table:
			// ...build the SQL query:
			StringBuffer sqlBuf = new StringBuffer("CREATE TABLE ");
			sqlBuf.append(translator.getTableName(tableDef, supportsSchema)).append(" (");
			Iterator<TAPColumn> it = tableDef.getColumns();
			while(it.hasNext()){
				TAPColumn col = it.next();
				// column name:
				sqlBuf.append(translator.getColumnName(col));
				// column type:
				sqlBuf.append(' ').append(convertTypeToDB(col.getDatatype()));
				// last column ?
				if (it.hasNext())
					sqlBuf.append(',');
			}
			sqlBuf.append(");");
			// ...execute the update query:
			stmt.executeUpdate(sqlBuf.toString());

			// 3. Fill the table:
			fillUploadedTable(tableDef, data);

			// Commit the transaction:
			commit();

				logger.logDB(LogLevel.INFO, this, "TABLE_CREATED", "Table \"" + tableDef.getADQLName() + "\" (in DB: " + translator.getTableName(tableDef, supportsSchema) + ") created.", null);
			return true;

		}catch(SQLException se){
			rollback();
				logger.logDB(LogLevel.WARNING, this, "ADD_UPLOAD_TABLE", "Impossible to create the uploaded table: " + translator.getTableName(tableDef, supportsSchema) + "!", se);
			throw new DBException("Impossible to create the uploaded table: " + translator.getTableName(tableDef, supportsSchema) + "!", se);
		}catch(DBException de){
			rollback();
			throw de;
		}catch(DataReadException dre){
			rollback();
			throw dre;
		}finally{
			close(stmt);
			endTransaction();
		}
	}

	/**
	 * <p>Fill the table uploaded by the user with the given data.</p>
	 * 
	 * <p><i>Note:
	 * 	Batch updates may be done here if its supported by the DBMS connection.
	 * 	In case of any failure while using this feature, it will be flagged as unsupported and one-by-one updates will be processed.
	 * </i></p>
	 * 
	 * <p><i>Note:
	 * 	This function proceeds to a formatting of TIMESTAMP and GEOMETRY (point, circle, box, polygon) values.
	 * </i></p>
	 * 
	 * @param metaTable	Description of the updated table.
	 * @param data		Iterator over the rows to insert.
	 * 
	 * @return	Number of inserted rows.
	 * 
	 * @throws DBException			If rows can not be inserted because the SQL update query has failed.
	 * @throws SQLException			If any other SQL exception occurs.
	 * @throws DataReadException	If there is any error while reading the data from the given {@link TableIterator} (and particularly if a limit - in byte or row - has been reached).
	 */
	protected int fillUploadedTable(final TAPTable metaTable, final TableIterator data) throws SQLException, DBException, DataReadException{
		// 1. Build the SQL update query:
		StringBuffer sql = new StringBuffer("INSERT INTO ");
		StringBuffer varParam = new StringBuffer();
		// ...table name:
		sql.append(translator.getTableName(metaTable, supportsSchema)).append(" (");
		// ...list of columns:
		TAPColumn[] cols = data.getMetadata();
		for(int c = 0; c < cols.length; c++){
			if (c > 0){
				sql.append(", ");
				varParam.append(", ");
			}
			sql.append(translator.getColumnName(cols[c]));
			varParam.append('?');
		}
		// ...values pattern:
		sql.append(") VALUES (").append(varParam).append(");");

		// 2. Prepare the statement:
		PreparedStatement stmt = null;
		int nbRows = 0;
		try{
			stmt = connection.prepareStatement(sql.toString());

			// 3. Execute the query for each given row:
			while(data.nextRow()){
				nbRows++;
				int c = 1;
				while(data.hasNextCol()){
					Object val = data.nextCol();
					if (val != null && cols[c - 1] != null){
						/* TIMESTAMP FORMATTING */
						if (cols[c - 1].getDatatype().type == DBDatatype.TIMESTAMP){
							try{
								val = new Timestamp(ISO8601Format.parse(val.toString()));
							}catch(ParseException pe){
								if (logger != null)
									logger.logDB(LogLevel.ERROR, this, "UPLOAD", "Unexpected date format for the " + c + "-th column (" + val + ")! A date formatted in ISO8601 was expected.", pe);
								throw new DBException("Unexpected date format for the " + c + "-th column (" + val + ")! A date formatted in ISO8601 was expected.", pe);
							}
						}
						/* GEOMETRY FORMATTING */
						else if (cols[c - 1].getDatatype().type == DBDatatype.POINT || cols[c - 1].getDatatype().type == DBDatatype.REGION){
							Region region;
							try{
								// parse the region as an STC-S expression:
								region = STCS.parseRegion(val.toString());
								// translate this STC region into the corresponding column value:
								val = translator.translateGeometryToDB(region);
							}catch(adql.parser.ParseException e){
								if (logger != null)
									logger.logDB(LogLevel.ERROR, this, "UPLOAD", "Incorrect STC-S syntax for the geometrical value \"" + val + "\"!", e);
								throw new DataReadException("Incorrect STC-S syntax for the geometrical value \"" + val + "\"!", e);
							}
						/* BOOLEAN CASE (more generally, type incompatibility) */
						else if (val != null && cols[c - 1].getDatatype().type == DBDatatype.SMALLINT && val instanceof Boolean)
							val = ((Boolean)val) ? (short)1 : (short)0;
				executeUpdate(stmt, nbRows);
			}
			executeBatchUpdates(stmt, nbRows);

			return nbRows;

		}finally{
			close(stmt);
		}
	}

	/**
	 * <p><i><b>Important note:</b>
	 * 	Only tables uploaded by users can be dropped from the database. To ensure that, the schema name of this table MUST be {@link STDSchema#UPLOADSCHEMA} ("TAP_UPLOAD") in ADQL.
	 * 	If it has another ADQL name, an exception will be thrown. Of course, the DB name of this schema MAY be different.
	 * </i></p>
	 * 
	 * <p><i><b>Important note:</b>
	 * 	This function may modify the given {@link TAPTable} object if schemas are not supported by this connection.
	 * 	In this case, this function will prefix the table's DB name by the schema's DB name directly inside the given {@link TAPTable} object
	 * 	(building the prefix with {@link #getTablePrefix(String)}). Then the DB name of the schema will be set to NULL.
	 * </i></p>
	 * 
	 * <p><i>Note:
	 * 	This implementation is able to drop only one uploaded table. So if this function finds more than one table matching to the given one,
	 * 	an exception will be thrown and no table will be dropped.
	 * </i></p>
	 * 
	 * @see tap.db.DBConnection#dropUploadedTable(tap.metadata.TAPTable)
	 * @see #checkUploadedTableDef(TAPTable)
	 */
	@Override
	public boolean dropUploadedTable(final TAPTable tableDef) throws DBException{
		// If no table to upload, consider it has been dropped and return TRUE:
		if (tableDef == null)
			return true;

		// Check the table is well defined (and particularly the schema is well set with an ADQL name = TAP_UPLOAD):
		checkUploadedTableDef(tableDef);

		Statement stmt = null;
		try{

			// Check the existence of the table to drop:
			if (!isTableExisting(tableDef.getDBSchemaName(), tableDef.getDBName(), connection.getMetaData()))
				return true;

			// Execute the update:
			stmt = connection.createStatement();
			int cnt = stmt.executeUpdate("DROP TABLE " + translator.getTableName(tableDef, supportsSchema) + ";");
				if (cnt >= 0)
					logger.logDB(LogLevel.INFO, this, "TABLE_DROPPED", "Table \"" + tableDef.getADQLName() + "\" (in DB: " + translator.getTableName(tableDef, supportsSchema) + ") dropped.", null);
					logger.logDB(LogLevel.ERROR, this, "TABLE_DROPPED", "Table \"" + tableDef.getADQLName() + "\" (in DB: " + translator.getTableName(tableDef, supportsSchema) + ") NOT dropped.", null);
			// Ensure the update is successful:
				logger.logDB(LogLevel.WARNING, this, "DROP_UPLOAD_TABLE", "Impossible to drop the uploaded table: " + translator.getTableName(tableDef, supportsSchema) + "!", se);
			throw new DBException("Impossible to drop the uploaded table: " + translator.getTableName(tableDef, supportsSchema) + "!", se);
		}finally{
			close(stmt);
		}
	}

	/**
	 * <p>Ensures that the given table MUST be inside the upload schema in ADQL.</p>
	 * 
	 * <p>Thus, the following cases are taken into account:</p>
	 * <ul>
	 * 	<li>
	 * 		The schema name of the given table MUST be {@link STDSchema#UPLOADSCHEMA} ("TAP_UPLOAD") in ADQL.
	 * 		If it has another ADQL name, an exception will be thrown. Of course, the DB name of this schema MAY be different.
	 * 	</li>
	 * 	<li>
	 * 		If schemas are not supported by this connection, this function will prefix the table DB name by the schema DB name directly
	 * 		inside the given {@link TAPTable} object (building the prefix with {@link #getTablePrefix(String)}). Then the DB name
	 * 		of the schema will be set to NULL.
	 * 	</li>
	 * </ul>
	 * 
	 * @param tableDef	Definition of the table to create/drop.
	 * 
	 * @throws DBException	If the given table is not in a schema
	 *                    	or if the ADQL name of this schema is not {@link STDSchema#UPLOADSCHEMA} ("TAP_UPLOAD").
	 */
	protected void checkUploadedTableDef(final TAPTable tableDef) throws DBException{
		// If the table has no defined schema or if the ADQL name of the schema is not TAP_UPLOAD, throw an exception:
		if (tableDef.getSchema() == null || !tableDef.getSchema().getADQLName().equals(STDSchema.UPLOADSCHEMA.label))
			throw new DBException("Missing upload schema! An uploaded table must be inside a schema whose the ADQL name is strictly equals to \"" + STDSchema.UPLOADSCHEMA.label + "\" (but the DB name may be different).");

		if (!supportsSchema){
			if (tableDef.getADQLSchemaName() != null && tableDef.getADQLSchemaName().trim().length() > 0 && !tableDef.getDBName().startsWith(tableDef.getADQLSchemaName() + "_"))
				tableDef.setDBName(tableDef.getADQLSchemaName() + "_" + tableDef.getDBName());
			if (tableDef.getSchema() != null)
				tableDef.getSchema().setDBName(null);
		}
	}

	/* ************** */
	/* TOOL FUNCTIONS */
	/* ************** */

	/**
	 * <p>Convert the given TAP type into the corresponding DBMS column type.</p>
	 * 
	 * <p>
	 * 	This function tries first the type conversion using the translator ({@link JDBCTranslator#convertTypeToDB(DBType)}).
	 * 	If it fails, a default conversion is done considering all the known types of the following DBMS:
	 * 	PostgreSQL, SQLite, MySQL, Oracle and JavaDB/Derby.
	 * </p>
	 * 
	 * @param type	TAP type to convert.
	 * 
	 * @return	The corresponding DBMS type.
	 * 
	 * @see JDBCTranslator#convertTypeToDB(DBType)
	 * @see #defaultTypeConversion(DBType)
	 */
	protected String convertTypeToDB(final DBType type){
		String dbmsType = translator.convertTypeToDB(type);
		return (dbmsType == null) ? defaultTypeConversion(type) : dbmsType;
	}

	/**
	 * <p>Get the DBMS compatible datatype corresponding to the given column {@link DBType}.</p>
	 * 
	 * <p><i>Note 1:
	 * 	This function is able to generate a DB datatype compatible with the currently used DBMS.
	 * 	In this current implementation, only Postgresql, Oracle, SQLite, MySQL and Java DB/Derby have been considered.
	 * 	Most of the TAP types have been tested only with Postgresql and SQLite without any problem.
	 * 	If the DBMS you are using has not been considered, note that this function will return the TAP type expression by default.
	 * </i></p>
	 * 
	 * <p><i>Note 2:
	 * 	In case the given datatype is NULL or not managed here, the DBMS type corresponding to "VARCHAR" will be returned.
	 * </i></p>
	 * 
	 * <p><i>Note 3:
	 * 	The special TAP types POINT and REGION are converted into the DBMS type corresponding to "VARCHAR".
	 * </i></p>
	 * 
	 * @param datatype	Column TAP type.
	 * 
	 * @return	The corresponding DB type, or NULL if the given type is not managed or is NULL.
	 */
	protected String defaultTypeConversion(DBType datatype){
			datatype = new DBType(DBDatatype.VARCHAR);

		switch(datatype.type){

			case SMALLINT:
				return dbms.equals("sqlite") ? "INTEGER" : "SMALLINT";

			case INTEGER:
			case REAL:
				return datatype.type.toString();

			case BIGINT:
				if (dbms.equals("oracle"))
					return "NUMBER(19,0)";
				else if (dbms.equals("sqlite"))
					return "INTEGER";
				else
					return "BIGINT";

			case DOUBLE:
				if (dbms.equals("postgresql") || dbms.equals("oracle"))
					return "DOUBLE PRECISION";
				else if (dbms.equals("sqlite"))
					return "REAL";
				else
					return "DOUBLE";

			case BINARY:
				if (dbms.equals("postgresql"))
					return "bytea";
				else if (dbms.equals("sqlite"))
					return "BLOB";
				else if (dbms.equals("oracle"))
					return "RAW" + (datatype.length > 0 ? "(" + datatype.length + ")" : "");
				else if (dbms.equals("derby"))
					return "CHAR" + (datatype.length > 0 ? "(" + datatype.length + ")" : "") + " FOR BIT DATA";
				else
					return datatype.type.toString();

			case VARBINARY:
				if (dbms.equals("postgresql"))
					return "bytea";
				else if (dbms.equals("sqlite"))
					return "BLOB";
				else if (dbms.equals("oracle"))
					return "LONG RAW" + (datatype.length > 0 ? "(" + datatype.length + ")" : "");
				else if (dbms.equals("derby"))
					return "VARCHAR" + (datatype.length > 0 ? "(" + datatype.length + ")" : "") + " FOR BIT DATA";
				else
					return datatype.type.toString();

			case CHAR:
				if (dbms.equals("sqlite"))
					return "TEXT";
				else
					return "CHAR";

			case BLOB:
				if (dbms.equals("postgresql"))
					return "bytea";
				else
					return "BLOB";

			case CLOB:
				if (dbms.equals("postgresql") || dbms.equals("mysql") || dbms.equals("sqlite"))
					return "TEXT";
				else
					return "CLOB";

			case TIMESTAMP:
				if (dbms.equals("sqlite"))
					return "TEXT";
				else
					return "TIMESTAMP";

			case POINT:
			case REGION:
			case VARCHAR:
			default:
				if (dbms.equals("sqlite"))
					return "TEXT";
				else
					return "VARCHAR";
		}
	}

	/**
	 * <p>Start a transaction.</p>
	 * 
	 * <p>
	 * 	Basically, if transactions are supported by this connection, the flag AutoCommit is just turned off.
	 * 	It will be turned on again when {@link #endTransaction()} is called.
	 * </p>
	 * 
	 * <p>If transactions are not supported by this connection, nothing is done.</p>
	 * 
	 * <p><b><i>Important note:</b>
	 * 	If any error interrupts the START TRANSACTION operation, transactions will be afterwards considered as not supported by this connection.
	 * 	So, subsequent call to this function (and any other transaction related function) will never do anything.
	 * </i></p>
	 * 
	 * @throws DBException	If it is impossible to start a transaction though transactions are supported by this connection.
	 *                    	If these are not supported, this error can never be thrown.
	 */