Multithreaded queue class
From DreamsteepWiki
This is a rewrite of code I found on the net
http://code.activestate.com/recipes/496982-render-farm-script/?in=user-126911
This is a very cool peice of python and I learned a lot going through it.
import os import threading import time
class Worker(threading.Thread):
def __init__(self, jobqueue, machine, workdir):
threading.Thread.__init__(self)
self.machine = machine
self.jobqueue = jobqueue
self.workdir = workdir
self.busy = True
# Each worker grabs a new job as soon as he finishes the previous
# one. This allows mixing of slower and faster worker machines; each
# works at capacity.
def run(self):
global all_workers_stop
while not all_workers_stop:
job = self.jobqueue.get()
if job is None:
# no jobs left in the queue, we're finished
self.busy = False
return
try:
print '#running render job '+job.smajobname+' on machine',
job.go(self.machine, self.workdir)
print self.machine, self.workdir
time.sleep(5)
except:
all_workers_stop = True
raise
class JobQueue:
worker_list = []
def __init__(self):
self.worker_pool = [ ]
self.jobqueue = [ ]
self._lock = threading.Lock()
if len(self.worker_list)==0:
print 'JobQueue:: warn no workers in list'
##
print self.worker_list
for machine, workdir in self.worker_list:
self.worker_pool.append(Worker(self, machine, workdir))
#SUBMIT`EM (send whole RJOB folder to single host )
def append(self, job):
self._lock.acquire() # thread safety
self.jobqueue.append(job)
self._lock.release()
def get(self):
self._lock.acquire() # thread safety
try:
r = self.jobqueue.pop(0)
except IndexError:
r = None
self._lock.release()
return r
def start(self):
for worker in self.worker_pool:
worker.start()
def wait(self):
busy_workers = 1
while busy_workers > 0:
time.sleep(2) #.5
busy_workers = 0
for worker in self.worker_pool:
if worker.busy:
busy_workers += 1
if all_workers_stop:
raise Exception
##START THE QUEUE RUNNING
# print '#ACTIVATING QUEUE... BEGIN RENDERING ... '
#QUEUEOBJECT.start()
# QUEUEOBJECT.wait()
class smaJob:
def __init__(self,smajobname ):
self.smajobname = smajobname #path and name /foo/etc/name
def go(self, machine, workdir):
local = machine in ('localhost', '127.0.0.1')
def worker_do(cmd):
if DEBUG: print '[[%s]]' % machine,
if local:
# do stuff on this machine
# do(cmd, howfarback=1)
print '#debbug worker exe local command '+cmd
#do(cmd, howfarback=1)
else:
# do stuff on a remote machine
#do('ssh %s "%s"' % (machine, cmd), howfarback=1)
print '#worker exe remote command '+cmd
if OVERRIDE_SSH ==0:
do('ssh %s "%s"' % (machine, cmd), howfarback=1)
if OVERRIDE_SSH:
print '#call to ssh skipped . to turn it back on set OVERRIDE_SSH to 0'
def do(cmd, howfarback=0):
if DEBUG:
if False:
try:
raise Exception
except:
tb = sys.exc_info()[2]
f = tb.tb_frame.f_back
for i in range(howfarback):
f = f.f_back
print f.f_code.co_filename, f.f_code.co_name, f.f_lineno
print cmd
if DRYRUN:
print '#>>>>DO DRY RUN ',
print cmd
if DRYRUN==0:
# if os.system(cmd) != 0:
# raise Exception(cmd)
try:
"""
#THIS WORKS !!
if os.system(cmd) != 0:
raise Exception(cmd)
"""
pid = os.system(cmd)
print 'DEBUG PID IS '
#print pid
if pid != 0:
raise Exception(cmd)
except ValueError:
print 'ERROR THREAD CRASHED '
TO USE
QUEUEOBJECT = JobQueue() job = smaJob( 'foobar') QUEUEOBJECT.append(job)

