package uws.job; /* * This file is part of UWSLibrary. * * UWSLibrary is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * UWSLibrary is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with UWSLibrary. If not, see . * * Copyright 2012-2014 - UDS/Centre de Données astronomiques de Strasbourg (CDS), * Astronomisches Rechen Institut (ARI) */ import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Vector; import javax.servlet.ServletOutputStream; import uws.ISO8601Format; import uws.UWSException; import uws.UWSExceptionFactory; import uws.UWSToolBox; import uws.job.manager.ExecutionManager; import uws.job.parameters.UWSParameters; import uws.job.serializer.UWSSerializer; import uws.job.user.JobOwner; import uws.service.UWS; import uws.service.UWSFactory; import uws.service.UWSUrl; import uws.service.file.UWSFileManager; import uws.service.log.UWSLog; import uws.service.log.UWSLog.LogLevel; import uws.service.request.UploadFile; /** *

Brief description

* *

Default implementation of a job of the UWS pattern.

* *

Some attributes comments

* * * *

More details

* * * * @author Grégory Mantelet (CDS;ARI) * @version 4.1 (12/2014) */ public class UWSJob extends SerializableUWSObject { private static final long serialVersionUID = 1L; /* ********* */ /* CONSTANTS */ /* ********* */ /** Name of the parameter ACTION. */ public static final String PARAM_ACTION = "ACTION"; /** Name of the DELETE action. */ public static final String ACTION_DELETE = "DELETE"; /** Name of the parameter jobId. */ public static final String PARAM_JOB_ID = "jobId"; /** Name of the parameter runId. */ public static final String PARAM_RUN_ID = "runId"; /** Name of the parameter owner. */ public static final String PARAM_OWNER = "owner"; /** Name of the parameter phase. */ public static final String PARAM_PHASE = "phase"; /** Value of the parameter phase which starts the job. */ public static final String PHASE_RUN = "RUN"; /** Value of the parameter phase which aborts the job. */ public static final String PHASE_ABORT = "ABORT"; /** Name of the parameter quote. */ public static final String PARAM_QUOTE = "quote"; /** Name of the parameter startTime. */ public static final String PARAM_START_TIME = "startTime"; /** Name of the parameter endTime. */ public static final String PARAM_END_TIME = "endTime"; /** Name of the parameter executionDuration. */ public static final String PARAM_EXECUTION_DURATION = "executionDuration"; /** Name of the parameter destructionTime. */ public static final String PARAM_DESTRUCTION_TIME = "destruction"; /** Name of the parameter errorSummary. */ public static final String PARAM_ERROR_SUMMARY = "error"; /** Name of the parameter otherParameters. */ public static final String PARAM_PARAMETERS = "parameters"; /** Name of the parameter results. */ public static final String PARAM_RESULTS = "results"; /** Default value of {@link #owner} if no ID are given at the job creation. */ public final static String ANONYMOUS_OWNER = "anonymous"; /** Default date format pattern. * @deprecated Replaced by {@link ISO8601Format}.*/ @Deprecated public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; /** The quote value that indicates the quote of this job is not known. */ public static final long QUOTE_NOT_KNOWN = -1; /** The duration that implies an unlimited execution duration. */ public final static long UNLIMITED_DURATION = 0; /* ********* */ /* VARIABLES */ /* ********* */ /** The last generated job ID. It SHOULD be used ONLY by the function {@link #generateJobId()} ! */ protected static String lastId = System.currentTimeMillis() + "A"; /** The identifier of the job (it MUST be different from any other job).
* Note: It is assigned automatically at the job creation in any job constructor * by the function {@link #generateJobId()}. * To change the way this ID is generated or its format you must override this function. */ protected final String jobId; /** The identifier of the creator of this job.
* Note: This object will not exist for all invocations of the UWS conformant protocol, * but only in cases where the access to the service is authenticated. */ protected final JobOwner owner; /** The jobs list which is supposed to managed this job. */ private JobList myJobList = null; /** *

The current phase of the job.

* Remember: A job is treated as a state machine thanks to this attribute. * */ private JobPhase phase; /** The used date formatter. * @deprecated Replaced by {@link ISO8601Format}. */ @Deprecated public static final DateFormat dateFormat = new SimpleDateFormat(DEFAULT_DATE_FORMAT); /** * This time (in seconds) predicts when the job is likely to complete.
* It CAN NOT be changed after the job creation !
* By default if no ID is given, {@link #quote} is set to {@link #QUOTE_NOT_KNOWN} (= {@value #QUOTE_NOT_KNOWN}).
*/ private long quote = QUOTE_NOT_KNOWN; /** The time at which the job execution started. */ private Date startTime = null; /** The time at which the job execution ended. */ private Date endTime = null; /**

This error summary gives a human-readable error message for the underlying job.

* Note: This object is intended to be a detailed error message, and consequently, * might be a large piece of text such as a stack trace. */ protected ErrorSummary errorSummary = null; /** This is a list of all results of this job. */ protected Map results; /** List of all input parameters (UWS standard and non-standard parameters). */ protected final UWSParameters inputParams; /** The thread to start for executing the job. */ protected transient JobThread thread = null; /** The time (in ms) to wait the end of the thread after an interruption. */ protected long waitForStop = 1000; /** Objects which want to be notified at each modification of the execution phase of this job. */ private Vector observers = new Vector(); /** If this job has been restored, this attribute should be set with the date of its restoration. */ private final Date restorationDate; /* ************ */ /* CONSTRUCTORS */ /* ************ */ /** *

Builds a job with no owner from a map of all parameters (UWS and additional parameters).

* *

Note: if the parameter {@link UWSJob#PARAM_PHASE} (phase) is given with the value {@link UWSJob#PHASE_RUN} * the job execution starts immediately after the job has been added to a job list or after {@link #applyPhaseParam(JobOwner)} is called.

* * @param params UWS standard and non-standard parameters. * * @see UWSJob#UWSJob(JobOwner, UWSParameters) */ public UWSJob(final UWSParameters params){ this(null, params); } /** *

Builds a job of the given owner and from a map of all parameters (UWS and additional parameters).

* *

Note: if the parameter {@link #PARAM_PHASE} (phase) is given with the value {@link #PHASE_RUN} * the job execution starts immediately after the job has been added to a job list or after {@link #applyPhaseParam(JobOwner)} is called.

* * @param owner Job.owner ({@link #PARAM_OWNER}). * @param params UWS standard and non-standard parameters. * * @see UWSParameters#init() */ public UWSJob(JobOwner owner, final UWSParameters params){ this.owner = owner; phase = new JobPhase(this); results = new HashMap(); inputParams = params; inputParams.init(); jobId = generateJobId(); restorationDate = null; // Move all uploaded files in a location related with this job: Iterator files = inputParams.getFiles(); while(files.hasNext()){ try{ files.next().move(this); }catch(IOException ioe){} } } /** *

CONSTRUCTOR TO USE TO RESTORE A JOB whatever is its phase.

* *

Builds a job of the given owner with all the given parameter.

* *

* Note: The job phase is automatically set in function of the last parameters (startTime, endTime, results and error). * Only the following execution phase are possible: PENDING, ABORTED, ERROR and COMPLETED. *

* * @param jobID The ID of this job (NOT NULL). * @param owner Its owner. * @param params UWS standard and non-standard parameters. * @param quote Its quote (in seconds). * @param startTime Its start time if it has already been started. * @param endTime Its end time if it is already finished. * @param results Its results (if phase=COMPLETED). * @param error Its error (if phase=ERROR). * * @throws NullPointerException If the given ID is NULL. */ public UWSJob(final String jobID, final JobOwner owner, final UWSParameters params, final long quote, final long startTime, final long endTime, final List results, final ErrorSummary error) throws NullPointerException{ if (jobID == null) throw new NullPointerException("Missing job ID => impossible to build a Job without a valid ID!"); this.jobId = jobID; this.owner = owner; this.quote = quote; if (startTime > 0) this.startTime = new Date(startTime); if (endTime > 0) this.endTime = new Date(endTime); this.results = new HashMap(); if (results != null){ for(Result r : results){ if (r != null) this.results.put(r.getId(), r); } } errorSummary = error; this.phase = new JobPhase(this); inputParams = params; params.init(); ExecutionPhase p = ExecutionPhase.PENDING; if (startTime > 0 && endTime > 0){ if (this.results.isEmpty() && this.errorSummary == null) p = ExecutionPhase.ABORTED; else if (!this.results.isEmpty()) p = ExecutionPhase.COMPLETED; else if (this.errorSummary != null) p = ExecutionPhase.ERROR; } if (phase != null){ try{ setPhase(p, true); }catch(UWSException ue){ // Can never append because the "force" parameter is true! } } restorationDate = new Date(); } /** *

This function lets generating a unique ID.

* *

By default: System.currentTimeMillis()+UpperCharacter (UpperCharacter: one upper-case character: A, B, C, ....)

* *

note: DO NOT USE in this function any of the following functions: {@link #getLogger()}, * {@link #getFileManager()} and {@link #getFactory()}. All of them will return NULL, because this job does not * yet know its jobs list (which is needed to know the UWS and so, all of the objects returned by these functions).

* * @return A unique job identifier. */ protected String generateJobId(){ synchronized(lastId){ String generatedId = System.currentTimeMillis() + "A"; if (lastId != null){ while(lastId.equals(generatedId)) generatedId = generatedId.substring(0, generatedId.length() - 1) + (char)(generatedId.charAt(generatedId.length() - 1) + 1); } lastId = generatedId; return generatedId; } } /** *

Gets the value of the specified parameter.

* *

note: No case sensitivity for the UWS parameters ON THE CONTRARY TO the names of the additional parameters (which are case sensitive).

* * @param name Name of the parameter to get. * * @return Its value or null if there is no parameter with the given name or if the value is null. * * @see UWSParameters#get(String) */ public Object getParameter(String name){ if (name == null || name.trim().isEmpty()) return null; name = name.trim(); if (name.equalsIgnoreCase(PARAM_JOB_ID)) return jobId; else if (name.equalsIgnoreCase(PARAM_OWNER)) return owner; else if (name.equalsIgnoreCase(PARAM_PHASE)) return phase.getPhase(); else if (name.equalsIgnoreCase(PARAM_QUOTE)) return quote; else if (name.equalsIgnoreCase(PARAM_START_TIME)) return startTime; else if (name.equalsIgnoreCase(PARAM_END_TIME)) return endTime; else return inputParams.get(name); } /** *

Looks for an additional parameters which corresponds to the Execution Phase. If it exists and:

*
    *
  • is equals to {@link UWSJob#PHASE_RUN RUN} => remove it from the attribute {@link #inputParams} and start the job.
  • *
  • is equals to {@link UWSJob#PHASE_ABORT ABORT} => remove it from the attribute {@link #inputParams} and abort the job.
  • *
  • is another value => the attribute is though removed from the attribute {@link #inputParams} but nothing is done.
  • *
* * @param user The user who asks to apply the phase parameter (start/abort). (may be NULL) * * @throws UWSException If it is impossible the state of this job (into EXECUTING or ABORTED) * or if the given user is not allowed to execute this job. * * @see UWSParameters#hasInputPhase() * @see UWSParameters#getInputPhase() * @see #start() * @see #abort() */ public void applyPhaseParam(final JobOwner user) throws UWSException{ synchronized(inputParams){ if (inputParams.hasInputPhase()){ String inputPhase = inputParams.getInputPhase(); if (inputPhase.equalsIgnoreCase(PHASE_RUN)){ // Forbids the execution if the user has not the required permission: if (user != null && !user.equals(owner) && !user.hasExecutePermission(this)) throw new UWSException(UWSException.PERMISSION_DENIED, UWSExceptionFactory.executePermissionDenied(user, jobId)); start(); }else if (inputPhase.equalsIgnoreCase(PHASE_ABORT)){ // Forbids the execution if the user has not the required permission: if (user != null && !user.equals(owner) && !user.hasExecutePermission(this)) throw new UWSException(UWSException.PERMISSION_DENIED, UWSExceptionFactory.executePermissionDenied(user, jobId)); abort(); } } } } /* ***************** */ /* GETTERS & SETTERS */ /* ***************** */ /** * Gets the file manager used in this job. * * @return Its file manager or null if this job is not into a {@link JobList} or if this jobs list is not into a {@link UWS}. * * @see JobList#getUWS() * @see uws.service.UWS#getFileManager() */ public final UWSFileManager getFileManager(){ if (myJobList != null && myJobList.getUWS() != null) return myJobList.getUWS().getFileManager(); else return null; } /** * Gets the logger of its UWS or a default one if the job list or the UWS is unknown. * * @return A logger. * * @see JobList#getUWS() * @see uws.service.UWS#getLogger() * @see UWSToolBox#getDefaultLogger() */ public UWSLog getLogger(){ if (myJobList != null && myJobList.getUWS() != null) return myJobList.getUWS().getLogger(); else return UWSToolBox.getDefaultLogger(); } /** * Gets the factory to use to create the thread to execute when this job starts. * * @return The factory to use to create a {@link JobThread}. */ public final UWSFactory getFactory(){ if (myJobList != null && myJobList.getUWS() != null) return myJobList.getUWS().getFactory(); else return null; } /** * Gets the date of the restoration of this job. * * @return Date of its restoration. */ public final Date getRestorationDate(){ return restorationDate; } /** * Gets the phase in which this job is now. * * @return The current phase of this job. * * @see JobPhase#getPhase() */ public final ExecutionPhase getPhase(){ return phase.getPhase(); } /** *

Sets the current phase of this job.

* *

IMPORTANT: *

  • The order of all phases must be respected: BY DEFAULT
    {@link ExecutionPhase#PENDING PENDING} ---> {@link ExecutionPhase#QUEUED QUEUED} ---> {@link ExecutionPhase#EXECUTING EXECUTING} ---> {@link ExecutionPhase#COMPLETED COMPLETED}.
  • *
  • The only way to go to the {@link ExecutionPhase#EXECUTING EXECUTING} phase is by sending a POST query with the value {@link UWSJob#PHASE_RUN RUN} for the parameter {@link UWSJob#PARAM_PHASE PHASE}.
  • *
  • The only way to go to the {@link ExecutionPhase#ABORTED ABORTED} phase is by sending a POST query with the value {@link UWSJob#PHASE_ABORT ABORT} for the parameter {@link UWSJob#PARAM_PHASE PHASE}.
  • *
  • The start time and the end time are set automatically when the phase is set to {@link ExecutionPhase#EXECUTING EXECUTING} and {@link ExecutionPhase#COMPLETED COMPLETED}, {@link ExecutionPhase#ABORTED ABORTED} or {@link ExecutionPhase#ERROR ERROR}
  • *

* * @param p The phase to set for this job. * * @throws UWSException If the given phase does not respect the job's phases order. * * @see #setPhase(ExecutionPhase, boolean) */ public final void setPhase(ExecutionPhase p) throws UWSException{ setPhase(p, false); } /** *

Sets the current phase of this job, respecting or not the imposed order.

* *

IMPORTANT: *

  • If the parameter force is false, the order of all phases must be respected:
    {@link ExecutionPhase#PENDING PENDING} ---> {@link ExecutionPhase#QUEUED QUEUED} ---> {@link ExecutionPhase#EXECUTING EXECUTING} ---> {@link ExecutionPhase#COMPLETED COMPLETED}.
  • *
  • The only way to go to the {@link ExecutionPhase#EXECUTING EXECUTING} phase is by sending a POST query with the value {@link UWSJob#PHASE_RUN RUN} for the parameter {@link UWSJob#PARAM_PHASE PARAM_PHASE}.
  • *
  • The only way to go to the {@link ExecutionPhase#ABORTED ABORTED} phase is by sending a POST query with the value {@link UWSJob#PHASE_ABORT ABORT} for the parameter {@link UWSJob#PARAM_PHASE PARAM_PHASE}.
  • *
  • The start time and the end time are set automatically when the phase is set to {@link ExecutionPhase#EXECUTING EXECUTING} and {@link ExecutionPhase#COMPLETED COMPLETED}, {@link ExecutionPhase#ABORTED ABORTED} or {@link ExecutionPhase#ERROR ERROR}
  • *

* * @param p The phase to set for this job. * @param force true to impose the given execution phase, false to take into account the order of all phases. * * @throws UWSException If the given phase does not respect the job's phases order. * * @see JobPhase#setPhase(ExecutionPhase, boolean) * @see JobPhase#isFinished() * @see ExecutionManager#remove(UWSJob) * @see #notifyObservers(ExecutionPhase) */ public final void setPhase(ExecutionPhase p, boolean force) throws UWSException{ synchronized(phase){ ExecutionPhase oldPhase = phase.getPhase(); phase.setPhase(p, force); if (!force) getLogger().logJob(LogLevel.INFO, this, "CHANGE_PHASE", "The job \"" + getJobId() + "\" goes from " + oldPhase + " to " + p, null); // Notify the execution manager: if (phase.isFinished() && getJobList() != null) getJobList().getExecutionManager().remove(this); // Notify all the observers: notifyObservers(oldPhase); } } /** *

Gets the phase manager of this job.

* *

Note: The phase manager manages all the transitions between all the execution phases.

* * @return Its phase manager. */ public final JobPhase getPhaseManager(){ return phase; } /** *

Sets the phase manager of this job.

* *

Note: The phase manager manages all the transitions between all the execution phases.

* * @param jobPhase Its new phase manager (if null this function does nothing). */ public final void setPhaseManager(JobPhase jobPhase){ if (jobPhase != null){ synchronized(phase){ phase = jobPhase; } } } /** * Gets the time at which the job execution has started. * * @return The start time of the execution of this job. */ public final Date getStartTime(){ return startTime; } /** * Sets the time at which the job execution has started. * * @param newDateTime The start time of the execution of this job. */ protected final void setStartTime(Date newDateTime){ startTime = newDateTime; } /** * Gets the time at which the job execution has finished. * * @return The end time of the execution of this job. */ public final Date getEndTime(){ return endTime; } /** * Sets the time at which the job execution has finished. * * @param newDateTime The end time of the execution of this job. */ protected final void setEndTime(Date newDateTime){ endTime = newDateTime; // Save the owner jobs list: if (phase.isFinished() && owner != null && getJobList() != null && getJobList().getUWS() != null && getJobList().getUWS().getBackupManager() != null) getJobList().getUWS().getBackupManager().saveOwner(owner); // Log the end of this job: getLogger().logJob(LogLevel.INFO, this, "END", "Job \"" + jobId + "\" ended with the status " + phase, null); } /** * Gets the duration (in seconds) for which this job shall run. * * @return The execution duration of this job. * * @see UWSParameters#getExecutionDuration() */ public final long getExecutionDuration(){ return inputParams.getExecutionDuration(); } /** *

Sets the duration (in seconds) for which this job shall run ONLY IF the job can updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}).

* *

Note: A duration of 0 (or less) implies unlimited execution duration.

* * @param executionDuration The execution duration of this job. * * @see UWSParameters#set(String, Object) */ public final void setExecutionDuration(long executionDuration){ if (phase.isJobUpdatable()){ try{ inputParams.set(PARAM_EXECUTION_DURATION, executionDuration); }catch(UWSException ue){ ; } } } /** * Gets the instant when the job shall be destroyed. * * @return The destruction time of this job. * * @see UWSParameters#getDestructionTime() */ public final Date getDestructionTime(){ return inputParams.getDestructionTime(); } /** *

* Sets the instant when the job shall be destroyed ONLY IF the job can updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}). * If known the jobs list is notify of this destruction time update. *

* * @param destructionTime The destruction time of this job. MUST NOT be NULL * * @see JobList#updateDestruction(UWSJob) * @see UWSParameters#set(String, Object) */ public final void setDestructionTime(Date destructionTime){ if (destructionTime != null && phase.isJobUpdatable()){ try{ inputParams.set(PARAM_DESTRUCTION_TIME, destructionTime); if (myJobList != null) myJobList.updateDestruction(this); }catch(UWSException ue){ getLogger().logJob(LogLevel.WARNING, this, "SET_DESTRUCTION", "Can not set the destruction time of the job \"" + getJobId() + "\" to \"" + destructionTime + "\"!", ue); } } } /** * Gets the error that occurs during the execution of this job. * * @return A summary of the error. */ public final ErrorSummary getErrorSummary(){ return errorSummary; } /** *

Sets the error that occurs during the execution of this job.

* *

IMPORTANT: This function will have no effect if the job is finished, that is to say if the current phase is * {@link ExecutionPhase#ABORTED ABORTED}, {@link ExecutionPhase#ERROR ERROR} or {@link ExecutionPhase#COMPLETED COMPLETED}..

* * @param errorSummary A summary of the error. MUST NOT be NULL * * @throws UWSException If the job execution is finished that is to say if the phase is ABORTED, ERROR or COMPLETED. * * @see #isFinished() */ public final void setErrorSummary(ErrorSummary errorSummary) throws UWSException{ if (errorSummary == null) return; else if (!isFinished()) this.errorSummary = errorSummary; else{ getLogger().logJob(LogLevel.ERROR, this, "SET_ERROR", "Can not set an error summary when the job is finished (or not yet started)! The current phase is: " + getPhase() + " ; the summary of the error to set is: \"" + errorSummary.message + "\".", null); throw new UWSException(UWSException.NOT_ALLOWED, UWSExceptionFactory.jobModificationForbidden(jobId, getPhase(), "ERROR SUMMARY")); } } /** * Gets the ID of this job (this ID MUST be unique). * * @return The job ID (unique). */ public final String getJobId(){ return jobId; } /** *

Gets the RunID of this job given by the UWS user (presumed to be the owner of this job). * This ID isn't the one used to access to this job thanks to the jobs list: it is more likely a label/name than an ID => it is not unique.

* *

Warning: This ID may be used by other jobs BUT their job id (cf {@link UWSJob#getJobId()}) must be different.

* * @return The Run ID (a kind of job name/label). * * @see UWSParameters#getRunId() */ public final String getRunId(){ return inputParams.getRunId(); } /** *

Sets the RunID of this job ONLY IF the job can updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}).

* * @param name Its name/label. * * @see JobPhase#isJobUpdatable() * * @see UWSParameters#set(String, Object) */ public final void setRunId(String name){ if (!phase.isFinished()){ try{ inputParams.set(PARAM_RUN_ID, name); }catch(UWSException ue){ ; } } } /** * Gets the owner of this job. * * @return The owner. */ public final JobOwner getOwner(){ return owner; } /** * Get the quote attribute of this job. * * @return The estimated duration of the job execution (in seconds). */ public final long getQuote(){ return quote; } /** *

Sets the quote attribute of this job ONLY IF the job can updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}).

* * @param nbSeconds The estimated duration of the job execution (in seconds). * * @see JobPhase#isJobUpdatable() */ public final void setQuote(long nbSeconds){ if (!phase.isFinished()) quote = nbSeconds; } /** * Gets the list of parameters' name. * * @return The additional parameters of this job. * * @see UWSParameters#getNames() */ public final Set getAdditionalParameters(){ return inputParams.getAdditionalParameters().keySet(); } /** * Gets the number of additional parameters. * * @return Number of additional parameters. */ public final int getNbAdditionalParameters(){ return inputParams.getAdditionalParameters().size(); } /** * Gets the value of the specified additional parameter. * * @param paramName The name of the parameter whose the value is wanted. * @return The value of the specified parameter or null if it doesn't exist. */ public final Object getAdditionalParameterValue(String paramName){ return inputParams.getAdditionalParameters().get(paramName); } /** * Adds or updates the specified parameter with the given value ONLY IF the job can be updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}). * * @param paramName The name of the parameter to add or to update. * @param paramValue The (new) value of the specified parameter. * * @return
  • true if the parameter has been successfully added/updated,
  • *
  • false otherwise (particularly if paramName=null or paramName="" or paramValue=null).
* * @throws UWSException If a parameter value is incorrect. * * @see JobPhase#isJobUpdatable() */ public final boolean addOrUpdateParameter(String paramName, Object paramValue) throws UWSException{ return addOrUpdateParameter(paramName, paramValue, null); } /** * Adds or updates the specified parameter with the given value ONLY IF the job can be updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}). * * @param paramName The name of the parameter to add or to update. * @param paramValue The (new) value of the specified parameter. * @param user The user who asks for this update. * * @return
  • true if the parameter has been successfully added/updated,
  • *
  • false otherwise (particularly if paramName=null or paramName="" or paramValue=null).
* * @throws UWSException If a parameter value is incorrect. * * @since 4.1 * * @see JobPhase#isJobUpdatable() */ public final boolean addOrUpdateParameter(String paramName, Object paramValue, final JobOwner user) throws UWSException{ if (paramValue != null && !phase.isFinished()){ // Set the parameter: inputParams.set(paramName, paramValue); // If it is a file or an array containing files, they must be moved in a location related to this job: try{ if (paramValue instanceof UploadFile) ((UploadFile)paramValue).move(this); else if (paramValue.getClass().isArray()){ for(Object o : (Object[])paramValue){ if (o != null && o instanceof UploadFile) ((UploadFile)o).move(this); } } }catch(IOException ioe){ getLogger().logJob(LogLevel.WARNING, this, "MOVE_UPLOAD", "Can not move an uploaded file in the job \"" + jobId + "\"!", ioe); return false; } // Apply the retrieved phase: applyPhaseParam(user); return true; }else return false; } /** *

Adds or updates the given parameters ONLY IF the job can be updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}).

* *

At the end of this function, the method {@link #applyPhaseParam(JobOwner)} is called so that if there is an additional parameter {@link #PARAM_PHASE} with the value: *

    *
  • {@link UWSJob#PHASE_RUN RUN} then the job is starting and the phase goes to {@link ExecutionPhase#EXECUTING EXECUTING}.
  • *
  • {@link UWSJob#PHASE_ABORT ABORT} then the job is aborting.
  • *
  • otherwise the parameter {@link UWSJob#PARAM_PHASE PARAM_PHASE} is removed from {@link UWSJob#inputParams inputParams} and nothing is done.
  • *

* * @param params A list of parameters to add/update. * @return
  • true if all the given parameters have been successfully added/updated,
  • *
  • false if some parameters have not been managed.
* * @throws UWSException If a parameter value is incorrect. * * @see #addOrUpdateParameters(UWSParameters, JobOwner) */ public boolean addOrUpdateParameters(UWSParameters params) throws UWSException{ return addOrUpdateParameters(params, null); } /** *

Adds or updates the given parameters ONLY IF the job can be updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}).

* *

At the end of this function, the method {@link #applyPhaseParam(JobOwner)} is called so that if there is an additional parameter {@link #PARAM_PHASE} with the value: *

    *
  • {@link UWSJob#PHASE_RUN RUN} then the job is starting and the phase goes to {@link ExecutionPhase#EXECUTING EXECUTING}.
  • *
  • {@link UWSJob#PHASE_ABORT ABORT} then the job is aborting.
  • *
  • otherwise the parameter {@link UWSJob#PARAM_PHASE PARAM_PHASE} is removed from {@link UWSJob#inputParams inputParams} and nothing is done.
  • *

* * @param params The UWS parameters to update. * @param user The user who asks for this update. * * @return
  • true if all the given parameters have been successfully added/updated,
  • *
  • false if some parameters have not been managed.
* * @throws UWSException If a parameter value is incorrect or if the given user can not update or execute this job. * * @see JobPhase#isJobUpdatable() * @see #applyPhaseParam(JobOwner) */ public boolean addOrUpdateParameters(UWSParameters params, final JobOwner user) throws UWSException{ // The job can be modified ONLY IF in PENDING phase: if (!phase.isJobUpdatable()) throw new UWSException(UWSException.FORBIDDEN, "Forbidden parameters modification: the job is not any more in the PENDING phase!"); // Forbids the update if the user has not the required permission: if (user != null && !user.equals(owner) && !user.hasWritePermission(this)) throw new UWSException(UWSException.PERMISSION_DENIED, UWSExceptionFactory.writePermissionDenied(user, false, getJobId())); // Load all parameters: String[] updated = inputParams.update(params); // If the destruction time has been updated, the modification must be propagated to the jobs list: Object newValue; for(String updatedParam : updated){ // CASE DESTRUCTION_TIME: update the thread dedicated to the destruction: if (updatedParam.equals(PARAM_DESTRUCTION_TIME)){ if (myJobList != null) myJobList.updateDestruction(this); } // DEFAULT: test whether the parameter is a file, and if yes, move it in a location related to this job: else{ newValue = inputParams.get(updatedParam); if (newValue != null && newValue instanceof UploadFile){ try{ ((UploadFile)newValue).move(this); }catch(IOException ioe){ getLogger().logJob(LogLevel.WARNING, this, "MOVE_UPLOAD", "Can not move an uploaded file in the job \"" + jobId + "\"!", ioe); inputParams.remove(updatedParam); } } } } // Apply the retrieved phase: applyPhaseParam(user); return (updated.length == params.size()); } /** * Removes the specified additional parameter ONLY IF the job can be updated (considering its current execution phase, see {@link JobPhase#isJobUpdatable()}). * * @param paramName The name of the parameter to remove. * * @return true if the parameter has been successfully removed, false otherwise. * * @see JobPhase#isJobUpdatable() * @see UWSParameters#remove(String) */ public final boolean removeAdditionalParameter(String paramName){ if (phase.isFinished() || paramName == null) return false; else{ // Remove the parameter from the map: Object removed = inputParams.remove(paramName); // If the parameter value was an uploaded file, delete it physically: if (removed != null && removed instanceof UploadFile){ try{ ((UploadFile)removed).deleteFile(); }catch(IOException ioe){ getLogger().logJob(LogLevel.WARNING, this, "MOVE_UPLOAD", "Can not delete the uploaded file \"" + paramName + "\" of the job \"" + jobId + "\"!", ioe); } } return true; } } /** * Gets the results list of this job. * * @return An iterator on the results list. */ public final Iterator getResults(){ return results.values().iterator(); } /** * Gets the specified result. * * @param resultId ID of the result to return. * * @return The corresponding result. */ public final Result getResult(String resultId){ return results.get(resultId); } /** * Gets the total number of results. * * @return The number of results. */ public final int getNbResults(){ return results.size(); } /** *

Adds the given result in the results list of this job.

* *

IMPORTANT: This function will throw an error if the job is finished.

* * @param res The result to add (not null). * * @return true if the result has been successfully added, false otherwise (for instance, if a result has the same ID). * * @throws UWSException If the job execution is finished that is to say if the phase is ABORTED, ERROR or COMPLETED. * * @see #isFinished() */ public boolean addResult(Result res) throws UWSException{ if (res == null) return false; else if (isFinished()){ UWSException ue = new UWSException(UWSException.NOT_ALLOWED, UWSExceptionFactory.jobModificationForbidden(getJobId(), getPhase(), "RESULT")); getLogger().logJob(LogLevel.ERROR, this, "ADD_RESULT", "Can not add the result \"" + res.getId() + "\" to the job \"" + getJobId() + "\": this job is already finished (or not yet started). Current phase: " + getPhase(), ue); throw ue; }else{ synchronized(results){ if (results.containsKey(res.getId())) return false; else{ results.put(res.getId(), res); return true; } } } } /** * Gets the execution manager of this job, if any. * * @return Its execution manager (may be null). */ public final ExecutionManager getExecutionManager(){ return getJobList().getExecutionManager(); } /** * Gets its jobs list, if known. * * @return Its jobs list (may be null). */ public final JobList getJobList(){ return myJobList; } /** *

Sets its jobs list.

* *

note 1: a job can change its jobs list ONLY WHILE PENDING !

*

note 2: this job is removed from its previous job list, if there is one.

*

note 3: this job is NOT automatically added into the new jobs list. Indeed, this function should be called by {@link JobList#addNewJob(UWSJob)}.

* * @param jobList Its new jobs list. note: if NULL, nothing is done ! * * @throws IllegalStateException If this job is not PENDING. * * @see JobList#removeJob(String) * @see JobList#getJob(String) */ protected final void setJobList(final JobList jobList) throws IllegalStateException{ if (jobList == null) return; else if (myJobList != null && jobList.equals(myJobList)) return; else if (myJobList == null || phase.getPhase() == ExecutionPhase.PENDING){ if (myJobList != null && myJobList.getJob(jobId) != null) myJobList.removeJob(jobId); myJobList = jobList; }else throw new IllegalStateException("Impossible to move a job (here: " + jobId + ") from a jobs list (here: " + ((myJobList == null) ? "null" : myJobList.getName()) + ") to another (here: " + ((jobList == null) ? "null" : jobList.getName()) + ") if the job is not PENDING !"); } /** * Gets the UWS URL of this job in function of its jobs list. * * @return Its corresponding UWSUrl. * * @see JobList#getUrl() * @see UWSUrl#jobSummary(String, String) */ public final UWSUrl getUrl(){ if (myJobList != null){ UWSUrl url = myJobList.getUrl(); if (url != null) return url.jobSummary(myJobList.getName(), jobId); } return null; } /* ******************** */ /* EXECUTION MANAGEMENT */ /* ******************** */ /** * Gets the time to wait for the end of the thread after an interruption. * * @return The time to wait for the end of the thread (a negative or null value means no wait for the end of the thread). */ public final long getTimeToWaitForEnd(){ return waitForStop; } /** * Sets the time to wait for the end of the thread after an interruption. * * @param timeToWait The new time to wait for the end of the thread (a negative or null value means no wait for the end of the thread). */ public final void setTimeToWaitForEnd(long timeToWait){ waitForStop = timeToWait; } /** *

Starts the job by using the execution manager if any.

* * @throws UWSException */ public final void start() throws UWSException{ start(getJobList() != null); } /** *

Starts the job.

* *

Note: This function does nothing if the job is already running !

* * @param useManager true to let the execution manager deciding whether the job starts immediately or whether it must be put in a queue until enough resources are available, false to start the execution immediately. * * @throws NullPointerException If this job is not associated with a job list or the associated job list is not part of a UWS service or if no thread is created. * @throws UWSException If there is an error while changing the execution phase or when starting the corresponding thread. * * @see #isRunning() * @see UWSFactory#createJobThread(UWSJob) * @see ExecutionManager#execute(UWSJob) * @see #setPhase(ExecutionPhase) * @see #isFinished() * @see #startTime */ public void start(boolean useManager) throws UWSException{ // This job must know its jobs list and this jobs list must know its UWS: if (myJobList == null || myJobList.getUWS() == null) throw new IllegalStateException("A UWSJob can not start if it is not linked to a job list or if its job list is not linked to a UWS."); // If already running do nothing: else if (isRunning()) return; // If asked propagate this request to the execution manager: else if (useManager){ getJobList().getExecutionManager().execute(this); }// Otherwise start directly the execution: else{ // Create its corresponding thread: thread = getFactory().createJobThread(this); if (thread == null) throw new NullPointerException("Missing job work! The thread created by the factory is NULL => The job can't be executed!"); // Change the job phase: setPhase(ExecutionPhase.EXECUTING); // Set the start time: setStartTime(new Date()); // Run the job: thread.start(); (new JobTimeOut()).start(); // Log the start of this job: getLogger().logJob(LogLevel.INFO, this, "START", "Job \"" + jobId + "\" started.", null); } } /** * Stop/Cancel this job when its maximum execution duration has been reached. * * @author Grégory Mantelet (CDS;ARI) * @version 4.1 (09/2014) */ protected final class JobTimeOut extends Thread { public JobTimeOut(){ super(JobThread.tg, "TimeOut_" + jobId); } @Override public void run(){ long maxDuration = getExecutionDuration(); if (thread != null && thread.isAlive() && maxDuration != UNLIMITED_DURATION && maxDuration > 0){ try{ thread.join(maxDuration * 1000); if (!isFinished()) UWSJob.this.abort(); }catch(InterruptedException ie){ /* Not needed to report any interruption while waiting. */ }catch(UWSException ue){ getLogger().logJob(LogLevel.WARNING, UWSJob.this, "EXECUTING", "Unexpected error while waiting the end of the execution of the job \"" + jobId + "\" (thread ID: " + thread.getId() + ")!", ue); } } } } /** *

Tells whether the job is still running.

* *

Note: This function tests the execution phase (see {@link JobPhase#isExecuting()}) AND the status of the thread (see {@link #isStopped()}).

* * @return true if the job is still running, false otherwise. * * @see JobPhase#isExecuting() * @see #isStopped() */ public final boolean isRunning(){ return phase.isExecuting() && !isStopped(); } /** *

Tells whether the job is already finished (completed, aborted, error, ...).

* *

Note: This function test the execution phase (see {@link JobPhase#isFinished()}) AND the status of the thread (see {@link #isStopped()})

* * @return true if the job is finished, false otherwise. * * @see JobPhase#isFinished() * @see #isStopped() */ public final boolean isFinished(){ return phase.isFinished() && isStopped(); } /** *

Stops immediately the job, sets its phase to {@link ExecutionPhase#ABORTED ABORTED} and sets its end time.

* *

IMPORTANT: If the thread does not stop immediately the phase and the end time are not modified. However it can be done by calling one more time {@link #abort()}. * Besides you should check that you test regularly the interrupted flag of the thread in {@link JobThread#jobWork()} !

* * @throws UWSException If there is an error while changing the execution phase. * * @see #stop() * @see #isStopped() * @see #setPhase(ExecutionPhase) * @see #setEndTime(Date) */ public void abort() throws UWSException{ // Interrupt the corresponding thread: stop(); if (isStopped()){ if (!phase.isFinished()){ // Try to change the phase: setPhase(ExecutionPhase.ABORTED); // Set the end time: setEndTime(new Date()); }else if (thread == null || (thread != null && !thread.isAlive())) throw new UWSException(UWSException.BAD_REQUEST, UWSExceptionFactory.incorrectPhaseTransition(getJobId(), phase.getPhase(), ExecutionPhase.ABORTED)); }else getLogger().logJob(LogLevel.WARNING, this, "ABORT", "Abortion of the job \"" + getJobId() + "\" asked but not yet effective (after having waited " + waitForStop + "ms)!", null); } /** *

Stops immediately the job, sets its error summary, sets its phase to {@link ExecutionPhase#ERROR} and sets its end time.

* *

IMPORTANT: If the thread does not stop immediately the phase, the error summary and the end time are not modified. * However it can be done by calling one more time {@link #error(ErrorSummary)}. * Besides you should check that you test regularly the interrupted flag of the thread in {@link JobThread#jobWork()} !

* * @param error The error that has interrupted this job. * * @throws UWSException If there is an error while setting the error summary or while changing the phase. * * @see #stop() * @see #isStopped() * @see JobPhase#isFinished() * @see #setErrorSummary(ErrorSummary) * @see #setPhase(ExecutionPhase) * @see #setEndTime(Date) */ public void error(ErrorSummary error) throws UWSException{ // Interrupt the corresponding thread: stop(); if (isStopped()){ if (!phase.isFinished()){ // Set the error summary: setErrorSummary(error); // Try to change phase: setPhase(ExecutionPhase.ERROR); // Set the end time: setEndTime(new Date()); }else if (thread != null && !thread.isAlive()) throw new UWSException(UWSException.BAD_REQUEST, UWSExceptionFactory.incorrectPhaseTransition(jobId, phase.getPhase(), ExecutionPhase.ERROR)); }else getLogger().logJob(LogLevel.WARNING, this, "ERROR", "Stopping of the job \"" + getJobId() + "\" with error asked but not yet effective (after having waited " + waitForStop + "ms)!", null); } /** Used by the thread to known whether the {@link #stop()} method has already been called, and so, that the job is stopping. */ protected boolean stopping = false; /** * Stops the thread that executes the work of this job. */ protected void stop(){ if (!isStopped()){ synchronized(thread){ stopping = true; // Interrupts the thread: thread.interrupt(); // Wait a little for its end: if (waitForStop > 0){ try{ thread.join(waitForStop); }catch(InterruptedException ie){ getLogger().logJob(LogLevel.WARNING, this, "END", "Unexpected InterruptedException while waiting for the end of the execution of the job \"" + jobId + "\" (thread ID: " + thread.getId() + ")!", ie); } } } } } /** * Tells whether the thread is different from null, is not alive, is interrupted or is finished (see {@link JobThread#isFinished()}). * * @return true if the thread is not still running, false otherwise. */ protected final boolean isStopped(){ return thread == null || !thread.isAlive() || thread.isInterrupted() || thread.isFinished(); } /** *

Stops the job if running, removes the job from the execution manager, stops the timer for the execution duration * and may clear all files or any other resources associated to this job.

* *

By default the job is aborted, the {@link UWSJob#thread} attribute is set to null, the timers are stopped and uploaded files, results and the error summary are deleted.

*/ public void clearResources(){ // If still running, abort/stop the job: if (isRunning()){ try{ abort(); }catch(UWSException e){ getLogger().logJob(LogLevel.WARNING, this, "CLEAR_RESOURCES", "Impossible to abort the job \"" + jobId + "\" => trying to stop it...", e); stop(); } } // Remove this job from its execution manager: if (getJobList() != null) getJobList().getExecutionManager().remove(this); thread = null; // Clear all uploaded files: Iterator files = inputParams.getFiles(); UploadFile upl; while(files.hasNext()){ upl = files.next(); try{ upl.deleteFile(); }catch(IOException ioe){ getLogger().logJob(LogLevel.ERROR, this, "CLEAR_RESOURCES", "Impossible to delete the file uploaded as parameter \"" + upl.paramName + "\" (" + upl.getLocation() + ") of the job \"" + jobId + "\"!", null); } } // Clear all results file: for(Result r : results.values()){ try{ getFileManager().deleteResult(r, this); }catch(IOException ioe){ getLogger().logJob(LogLevel.ERROR, this, "CLEAR_RESOURCES", "Impossible to delete the file associated with the result '" + r.getId() + "' of the job \"" + jobId + "\"!", ioe); } } // Clear the error file: if (errorSummary != null && errorSummary.hasDetail()){ try{ getFileManager().deleteError(errorSummary, this); }catch(IOException ioe){ getLogger().logJob(LogLevel.ERROR, this, "CLEAR_RESOURCES", "Impossible to delete the file associated with the error '" + errorSummary.message + "' of the job \"" + jobId + "\"!", ioe); } } getLogger().logJob(LogLevel.INFO, this, "CLEAR_RESOURCES", "Resources associated with the job \"" + getJobId() + "\" have been successfully freed.", null); } /* ******************* */ /* OBSERVER MANAGEMENT */ /* ******************* */ /** * Lets adding an observer of this job. The observer will be notified each time the execution phase changes. * * @param observer A new observer of this job. * * @return true if the given object has been successfully added as observer of this job, false otherwise. */ public final boolean addObserver(JobObserver observer){ if (observer != null && !observers.contains(observer)){ observers.add(observer); return true; }else return false; } /** * Gets the total number of observers this job has. * * @return Number of its observers. */ public final int getNbObservers(){ return observers.size(); } /** * Gets the observers of this job. * * @return An iterator on the list of its observers. */ public final Iterator getObservers(){ return observers.iterator(); } /** * Lets removing the given object from the list of observers of this job. * * @param observer The object which must not be considered as observer of this job. * * @return true if the given object is not any more an observer of this job, false otherwise. */ public final boolean removeObserver(JobObserver observer){ return observers.remove(observer); } /** * Lets removing all observers of this job. */ public final void removeAllObservers(){ observers.clear(); } /** * Notifies all the observer of this job that its phase has changed. * * @param oldPhase The former phase of this job. * @throws UWSException If at least one observer can not have been updated. */ public final void notifyObservers(ExecutionPhase oldPhase){ int i = 0; JobObserver observer = null; String errors = null; while(i < observers.size()){ // Gets the observer: if (i == 0 && observer == null) observer = observers.get(i); else if (observer.equals(observers.get(i))){ i++; if (i < observers.size()) observer = observers.get(i); else return; } // Update this observer: try{ observer.update(this, oldPhase, getPhase()); }catch(UWSException ex){ if (errors == null) errors = "\t* " + ex.getMessage(); else errors += "\n\t* " + ex.getMessage(); } } if (errors != null) getLogger().logJob(LogLevel.WARNING, this, "NOTIFY", "Some observers of the job \"" + jobId + "\" can not have been updated:\n" + errors, null); } /* **************** */ /* ERROR MANAGEMENT */ /* **************** */ /** *

Gets the error (if any) which has occurred during the job execution.

* *

Note: In the case an error summary can not have been published, the job has no error summary. * However the last {@link UWSException} caught during the execution of a {@link JobThread} is saved and is available thanks to {@link JobThread#getError()}. * In that case, the {@link UWSJob#getWorkError() getWorkError()} method can be used to get back the occurred error.

* * @return The error which interrupts the thread or null if there was no error or if the job is still running. */ public final UWSException getWorkError(){ return (thread == null || !thread.isAlive()) ? null : thread.getError(); } /* ************* */ /* SERIALIZATION */ /* ************* */ @Override public String serialize(UWSSerializer serializer, JobOwner user) throws UWSException, Exception{ if (user != null && !user.equals(getOwner()) && !user.hasReadPermission(this)) throw new UWSException(UWSException.PERMISSION_DENIED, UWSExceptionFactory.readPermissionDenied(user, false, getJobId())); return serializer.getJob(this, true); } /** * Serializes the specified attribute of this job by using the given serializer. * * @param attributes All the given attributes (may be null or empty). * @param serializer The serializer to use. * * @return The serialized job attribute (or the whole job if attributes is an empty array or is null). * * @throws Exception If there is an unexpected error during the serialization. * * @see UWSSerializer#getJob(UWSJob, String[], boolean) */ public String serialize(String[] attributes, UWSSerializer serializer) throws Exception{ return serializer.getJob(this, attributes, true); } /** * Serializes the specified attribute of this job in the given output stream by using the given serializer. * * @param output The output stream in which the job attribute must be serialized. * @param attributes The name of the attribute to serialize (if null, the whole job will be serialized). * @param serializer The serializer to use. * * @throws Exception If there is an unexpected error during the serialization. * * @see #serialize(String[], UWSSerializer) */ public void serialize(ServletOutputStream output, String[] attributes, UWSSerializer serializer) throws UWSException, IOException, Exception{ String errorMsgPart = null; if (attributes == null || attributes.length <= 0) errorMsgPart = "the job \"" + getJobId() + "\""; else errorMsgPart = "the given attribute \"" + attributes[0] + "\" of the job \"" + getJobId() + "\""; if (output == null) throw new NullPointerException("Missing serialization output stream when serializing " + errorMsgPart + "!"); String serialization = serialize(attributes, serializer); if (serialization == null){ getLogger().logJob(LogLevel.ERROR, this, "SERIALIZE", "Error while serializing " + errorMsgPart + ": NULL was returned.", null); throw new UWSException(UWSException.INTERNAL_SERVER_ERROR, "Incorrect serialization value (=NULL) ! => impossible to serialize " + errorMsgPart + "."); }else{ output.print(serialization); output.flush(); } } @Override public String toString(){ return "JOB {jobId: " + jobId + "; phase: " + phase + "; runId: " + getRunId() + "; ownerId: " + owner + "; executionDuration: " + getExecutionDuration() + "; destructionTime: " + getDestructionTime() + "; quote: " + quote + "; NbResults: " + results.size() + "; " + ((errorSummary != null) ? errorSummary.toString() : "No error") + " }"; } @Override public int hashCode(){ return jobId.hashCode(); } /** *

2 instances of AbstractJob are equals ONLY IF their ID are equals.

* *

Note: If the given object is not an AbstractJob, FALSE is returned.

* * @see java.lang.Object#equals(java.lang.Object) */ @Override public boolean equals(Object anotherJob){ if (anotherJob instanceof UWSJob) return jobId.equals(((UWSJob)anotherJob).jobId); else return false; } }