Skip to content
Snippets Groups Projects
Select Git revision
  • 168b7958219b4be34a1922fa27c92a50a1f9d8d2
  • master default protected
  • rocky_8
  • rocky_9
  • pasture
  • testing
  • query
  • v0.2.9
  • v0.2.8
  • v0.2.7
  • v0.2.6
  • v0.2.5
  • v0.2.4
  • v0.2.3
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.2
  • v0.1.1
  • v0.1
20 results

task_executor.py

Blame
  • task_executor.py 1.37 KiB
    #!/usr/bin/env python
    #
    # This file is part of vospace-transfer-service
    # Copyright (C) 2021 Istituto Nazionale di Astrofisica
    # SPDX-License-Identifier: GPL-3.0-or-later
    #
    
    import time
    
    from multiprocessing import Process
    
    from config import Config
    from job_queue import JobQueue
    
    
    class TaskExecutor(Process):
    
        def __init__(self):
            config = Config("/etc/vos_ts/vos_ts.conf")
            params = config.loadSection("scheduling")
            self.maxPendingJobs = params.getint("max_pending_jobs")
            self.maxReadyJobs = params.getint("max_ready_jobs")
            self.maxTerminatedJobs = params.getint("max_terminated_jobs")
            self.maxCleanJobs = params.getint("max_clean_jobs")
            self.execWaitTime = params.getint("exec_wait_time")
            if self.execWaitTime < 10:
                self.execWaitTime = 10
            self.srcQueue = None
            self.destQueue = None
            super(TaskExecutor, self).__init__()
    
        def wait(self, timeout = None):
            if timeout is None:
                time.sleep(self.execWaitTime)
            else:
                time.sleep(timeout)
    
        def setSourceQueueName(self, srcQueueName):
            self.srcQueue = JobQueue(srcQueueName)
    
        def setDestinationQueueName(self, destQueueName):
            self.destQueue = JobQueue(destQueueName)
    
        def run(self):
            """
            This method must be implemented by
            inherited classes
            """
            pass