diff --git a/src/main/java/it/inaf/oats/vospace/persistence/JobDAO.java b/src/main/java/it/inaf/oats/vospace/persistence/JobDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..b3781a8d1be2456da9c4ba5669c34558df5de307 --- /dev/null +++ b/src/main/java/it/inaf/oats/vospace/persistence/JobDAO.java @@ -0,0 +1,148 @@ +package it.inaf.oats.vospace.persistence; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; +import java.util.Optional; +import javax.sql.DataSource; +import net.ivoa.xml.uws.v1.ExecutionPhase; +import net.ivoa.xml.uws.v1.JobSummary; +import net.ivoa.xml.uws.v1.ResultReference; +import net.ivoa.xml.vospace.v2.Transfer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +@Repository +public class JobDAO { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final JdbcTemplate jdbcTemplate; + + @Autowired + public JobDAO(DataSource dataSource) { + jdbcTemplate = new JdbcTemplate(dataSource); + } + + public void createJob(JobSummary jobSummary) { + + Object jobPayload = getJobPayload(jobSummary); + + String sql = "INSERT INTO job(job_id, owner_id, job_type, phase, job_info) VALUES (?, ?, ?, ?, ?)"; + + jdbcTemplate.update(sql, ps -> { + int i = 0; + ps.setString(++i, jobSummary.getJobId()); + ps.setString(++i, jobSummary.getOwnerId()); + ps.setObject(++i, getJobType(jobPayload), Types.OTHER); + ps.setObject(++i, jobSummary.getPhase().value(), Types.OTHER); + ps.setObject(++i, toJson(jobPayload), Types.OTHER); + }); + } + + private Object getJobPayload(JobSummary jobSummary) { + + List payload = jobSummary.getJobInfo().getAny(); + if (payload.isEmpty()) { + throw new IllegalArgumentException("Empty JobInfo"); + } + if (payload.size() > 1) { + throw new UnsupportedOperationException("JobInfo as list not supported"); + } + + return payload.get(0); + } + + private String getJobType(Object jobPayload) { + + if (jobPayload instanceof Transfer) { + Transfer transfer = (Transfer) jobPayload; + return transfer.getDirection(); + } + + throw new UnsupportedOperationException("JobInfo of type " + jobPayload + " not supported"); + } + + public Optional getJob(String jobId) { + + String sql = "SELECT * FROM job WHERE job_id = ?"; + + JobSummary jobSummary = jdbcTemplate.query(sql, + ps -> { + ps.setString(1, jobId); + }, rs -> { + if (rs.next()) { + return getJobSummaryFromResultSet(rs); + } + return null; + }); + + return Optional.ofNullable(jobSummary); + } + + private JobSummary getJobSummaryFromResultSet(ResultSet rs) throws SQLException { + + JobSummary jobSummary = new JobSummary(); + + jobSummary.setJobId(rs.getString("job_id")); + jobSummary.setOwnerId(rs.getString("owner_id")); + jobSummary.setPhase(ExecutionPhase.fromValue(rs.getString("phase"))); + + String jobType = rs.getString("job_type"); + + Object jobPayload = getJobPayload(jobType, rs.getString("job_info")); + JobSummary.JobInfo jobInfo = new JobSummary.JobInfo(); + jobInfo.getAny().add(jobPayload); + jobSummary.setJobInfo(jobInfo); + + jobSummary.setResults(getResults(rs.getString("results"))); + + return jobSummary; + } + + private Object getJobPayload(String jobType, String json) { + try { + // TODO: switch on jobType + return MAPPER.readValue(json, Transfer.class); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + + private List getResults(String json) { + if (json == null) { + return null; + } + try { + JavaType type = MAPPER.getTypeFactory().constructCollectionType(List.class, ResultReference.class); + return MAPPER.readValue(json, type); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + + public void updateJob(JobSummary job) { + + String sql = "UPDATE job SET phase = ?, results = ? WHERE job_id = ?"; + + jdbcTemplate.update(sql, ps -> { + int i = 0; + ps.setObject(++i, job.getPhase().name(), Types.OTHER); + ps.setObject(++i, toJson(job.getResults()), Types.OTHER); + ps.setString(++i, job.getJobId()); + }); + } + + private String toJson(Object data) { + try { + return MAPPER.writeValueAsString(data); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } +}