Multithreaded queue class

From DreamsteepWiki

Jump to: navigation, search

This is a rewrite of code I found on the net

http://code.activestate.com/recipes/496982-render-farm-script/?in=user-126911

This is a very cool peice of python and I learned a lot going through it.



import os
import threading
import time




class Worker(threading.Thread):

    def __init__(self, jobqueue, machine, workdir):
        threading.Thread.__init__(self)
        self.machine = machine
        self.jobqueue = jobqueue
        self.workdir = workdir
        self.busy = True


    # Each worker grabs a new job  as soon as he finishes the previous
    # one. This allows mixing of slower and faster worker machines; each
    # works at capacity.
    def run(self):
        global all_workers_stop
        while not all_workers_stop:
            job = self.jobqueue.get()
            if job is None:
                # no jobs left in the queue, we're finished
                self.busy = False
                return
            try:
                print '#running render job '+job.smajobname+' on machine',
                job.go(self.machine, self.workdir)
		print self.machine, self.workdir
                time.sleep(5)

            except:
                all_workers_stop = True
                raise



class JobQueue:

    worker_list = []


    def __init__(self):
        self.worker_pool = [ ]
        self.jobqueue = [ ]
        self._lock = threading.Lock()

        if len(self.worker_list)==0:
          print 'JobQueue:: warn no workers in list'
        ##
        print self.worker_list
        for machine, workdir in self.worker_list:
            self.worker_pool.append(Worker(self, machine, workdir))

    #SUBMIT`EM (send whole RJOB folder to single host )
    def append(self, job):
        self._lock.acquire()   # thread safety
        self.jobqueue.append(job)
        self._lock.release()

    def get(self):
        self._lock.acquire()   # thread safety
        try:
            r = self.jobqueue.pop(0)
        except IndexError:
            r = None
        self._lock.release()
        return r


    def start(self):
        for worker in self.worker_pool:
            worker.start()

    def wait(self):
        busy_workers = 1
        while busy_workers > 0:
            time.sleep(2) #.5
            busy_workers = 0
            for worker in self.worker_pool:
                if worker.busy:
                    busy_workers += 1
            if all_workers_stop:
                raise Exception
                

    ##START THE QUEUE RUNNING
   # print '#ACTIVATING QUEUE... BEGIN RENDERING ... '
    #QUEUEOBJECT.start()
   # QUEUEOBJECT.wait()
      



class smaJob:


    def __init__(self,smajobname ):
        self.smajobname    = smajobname #path and name /foo/etc/name

    def go(self, machine, workdir):
        local = machine in ('localhost', '127.0.0.1')

        def worker_do(cmd):
            if DEBUG: print '[[%s]]' % machine,
            if local:
                # do stuff on this machine
                # do(cmd, howfarback=1)
               print '#debbug worker exe local command '+cmd
               #do(cmd, howfarback=1)

            else:
                # do stuff on a remote machine
                 #do('ssh %s "%s"' % (machine, cmd), howfarback=1)
               print '#worker exe remote command '+cmd

               if OVERRIDE_SSH ==0:
                 do('ssh %s "%s"' % (machine, cmd), howfarback=1)
               if OVERRIDE_SSH:
                 print '#call to ssh skipped . to turn it back on set OVERRIDE_SSH to 0'
                 



def do(cmd, howfarback=0):
    if DEBUG:
        if False:
            try:
                raise Exception
            except:
                tb = sys.exc_info()[2]
                f = tb.tb_frame.f_back
                for i in range(howfarback):
                    f = f.f_back
                print f.f_code.co_filename, f.f_code.co_name, f.f_lineno
        print cmd

    if DRYRUN:
      print '#>>>>DO DRY RUN ',
      print cmd

    if DRYRUN==0:
#      if os.system(cmd) != 0:
#          raise Exception(cmd)

      try:
          """
          #THIS WORKS !!
          if os.system(cmd) != 0:
             raise Exception(cmd)
          """
          pid = os.system(cmd)
          print 'DEBUG PID IS '
          #print pid

          if pid != 0:
             raise Exception(cmd)

      except ValueError:
             print 'ERROR THREAD CRASHED   '


TO USE


QUEUEOBJECT = JobQueue()
job = smaJob( 'foobar')
QUEUEOBJECT.append(job)

Personal tools