#!/usr/bin/env python # Improved build dispatcher. Invoked on server-side from dopackages. # We try to build leaf packages (those # which can be built immediately without requiring additional # dependencies to be built) in the order such that the ones required # by the longest dependency chains are built first. # # This has the effect of favouring deep parts of the package tree and # evening out the depth over time, hopefully avoiding the situation # where the entire cluster waits for a deep part of the tree to # build on a small number of machines # # We can dynamically respond to changes in build machine availability, # since the queue manager will block jobs that cannot be immediately # satisfied and will unblock us when a job slot becomes available. # # When a package build fails, it is requeued with a lower priority # such that it will rebuild again as soon as no "phase 1" packages # are available to build. This prevents the cluster staying idle # until the last phase 1 package builds. # # Other advantages are that this system is easily customizable and in # the future will let us customize things like the matching policy of # jobs to machines. For example, we could avoid dispatching multiple # openoffice builds to the same system. # # TODO: # * Combine build prep stages? # - initial check for file up-to-date # * check mtime for package staleness (cf make) # * option to skip phase 2 import os import sys pbc = os.getenv('PORTBUILD_CHECKOUT') \ if os.getenv('PORTBUILD_CHECKOUT') else "/a/portbuild" pbd = os.getenv('PORTBUILD_DATA') \ if os.getenv('PORTBUILD_DATA') else "/a/portbuild" sys.path.insert(0, '%s/admin/lib/python' % pbc) from qmanagerclient import * from freebsd_config import * import string, threading, time, subprocess from itertools import chain from stat import * from Queue import Queue from heapq import * CONFIG_SUBDIR="conf" CONFIG_FILENAME="server.conf" config = getConfig( pbc, CONFIG_SUBDIR, CONFIG_FILENAME ) QMANAGER_MAX_JOB_ATTEMPTS = int( \ config.get( 'QMANAGER_MAX_JOB_ATTEMPTS' ) ) QMANAGER_PRIORITY_PACKAGES = string.split( \ config.get( 'QMANAGER_PRIORITY_PACKAGES' ) ) QMANAGER_RUNAWAY_PERCENTAGE = float( \ config.get( 'QMANAGER_RUNAWAY_PERCENTAGE' ) ) QMANAGER_RUNAWAY_THRESHOLD = int( \ config.get( 'QMANAGER_RUNAWAY_THRESHOLD' ) ) # debug controls DEBUG_PACKAGEBUILD = True DEBUG_RETCODE = True # argument handling ARG_UNLIMITED_ERRORS = "unlimited-errors" # global variables categories = {} ports = {} pkg_sufx = None # When a build fails we requeue it with a lower priority such that it # will never preempt a phase 1 build but will build when spare # capacity is available. PHASE2_BASE_PRIO=1000 # Process success quickly so other jobs are started SUCCESS_PRIO = -1000 # Failure should be a less common event :) FAILURE_PRIO = -900 # Port status codes PENDING = 1 # Yet to build PHASE2 = 2 # Failed once class PriorityQueue(Queue): """Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data) This class can be found at: Python-2.6a3/Lib/Queue.py """ maxsize = 0 def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item, heappush=heappush): heappush(self.queue, item) def _get(self, heappop=heappop): return heappop(self.queue) class Index(object): def __init__(self, indexfile): self.indexfile = indexfile def parse(self, targets = None): print "[MASTER] Read index" f = file(self.indexfile) index = f.readlines() f.close() f = None del f lines=[] print "[MASTER] Phase 1" for i in index: try: (name, path, prefix, comment, descr, maintainer, categories, bdep, rdep, www, edep, pdep, fdep) = i.rstrip().split("|") if path == "/usr/ports/ports-mgmt/pkg": pkgdep = name if targets is None or name in targets: lines.append((name, bdep, rdep, edep, pdep, fdep)) Port(name, path, "", "", "", "", categories, "") except Exception, e: print "packagebuild.parse: bad line in INDEX: " print i index = None del index print "[MASTER] Phase 2" for (name, bdep, rdep, edep, pdep, fdep) in lines: if not pkgng: ports[name].setdeps(bdep, rdep, edep, pdep, fdep, "") continue if name == pkgdep: ports[name].setdeps(bdep, rdep, edep, pdep, fdep, "") else: ports[name].setdeps(bdep, rdep, edep, pdep, fdep, pkgdep) lines = None del lines print "[MASTER] Done" def depthindex(targets): """ Initial population of depth tree """ for i in targets: i.depth_recursive() class Port(object): def __init__(self, name, path, prefix, comment, descr, maintainer, cats, www): __slots__ = ["name", "path", "prefix", "comment", "descr", "maintainer", "www", "bdep", "rdep", "edep", "pdep", "fdep", "pkgdep", "alldep", "parents", "depth", "categories"] self.name = name self.path = path self.prefix = prefix self.comment = comment self.descr = descr self.maintainer = maintainer self.www = www self.sufx = pkg_sufx # Populated later self.bdep = [] self.rdep = [] self.edep = [] self.pdep = [] self.fdep = [] self.pkgdep = [] self.alldep = [] self.parents = [] self.id = None # XXX self.status = PENDING self.attempts = 0 # Whether the package build has completed and is hanging around # to resolve dependencies for others XXX use status self.done = False # Depth is the maximum length of the dependency chain of this port self.depth = None self.categories=[] scats = cats.split() if len(scats) != len(set(scats)): print "[MASTER] Warning: port %s includes duplicated categories: %s" % (name, cats) for c in set(scats): try: cat = categories[c] except KeyError: cat = Category(c) self.categories.append(cat) cat.add(self) ports[name] = self def remove(self): """ Clean ourselves up but don't touch references in other objects; they still need to know about us as dependencies etc """ self.fdep = None self.edep = None self.pdep = None self.bdep = None self.rdep = None self.pkgdep = None self.alldep = None self.parents = None for cat in self.categories: cat.remove(self) ports[self.name] = None del ports[self.name] del self def destroy(self): """ Remove a package and all references to it """ for pkg in self.alldep: if pkg.parents is not None: # Already removed but not destroyed try: pkg.parents.remove(self) except ValueError: continue for pkg in self.parents: try: pkg.fdep.remove(self) except ValueError: pass try: pkg.edep.remove(self) except ValueError: pass try: pkg.pdep.remove(self) except ValueError: pass try: pkg.bdep.remove(self) except ValueError: pass try: pkg.rdep.remove(self) except ValueError: pass try: pkg.pkgdep.remove(self) except ValueError: pass pkg.alldep.remove(self) sys.exc_clear() self.remove() def setdeps(self, bdep, rdep, edep, pdep, fdep, pkgdep): self.pkgdep = [ports[p] for p in pkgdep.split()] self.fdep = [ports[p] for p in fdep.split()] self.edep = [ports[p] for p in edep.split()] self.pdep = [ports[p] for p in pdep.split()] self.bdep = [ports[p] for p in bdep.split()] self.rdep = [ports[p] for p in rdep.split()] self.alldep = list(set(chain(self.fdep, self.edep, self.pdep, self.bdep, self.rdep, self.pkgdep))) for p in self.alldep: p.parents.append(self) def depth_recursive(self): """ Recursively populate the depth tree up from a given package through dependencies, assuming empty values on entries not yet visited """ if self.depth is None: if len(self.parents) > 0: max = 0 for i in self.parents: w = i.depth_recursive() if w > max: max = w self.depth = max + 1 else: self.depth = 1 for port in QMANAGER_PRIORITY_PACKAGES: if self.name.startswith(port): # Artificial boost to try and get it building earlier self.depth = 100 return self.depth def destroy_recursive(self): """ Remove a port and everything that depends on it """ parents=set([self]) while len(parents) > 0: pkg = parents.pop() assert pkg.depth is not None parents.update(pkg.parents) pkg.destroy() def success(self): """ Build succeeded and possibly uncovered some new leaves """ parents = self.parents[:] self.done = True self.remove() newleafs = [p for p in parents if all(c.done for c in p.alldep)] return newleafs def failure(self): """ Build failed """ self.destroy_recursive() def packagename(self, arch, branch, buildid): """ Return the path where a package may be found""" return "%s/%s/%s/builds/%s/packages/All/%s%s" \ % (pbd, arch, branch, buildid, self.name, self.sufx) def is_stale(self, arch, branch, buildid): """ Does a package need to be (re)-built? Returns: False: if it exists and has newer mtime than all of its dependencies. True: otherwise """ my_pkgname = self.packagename(arch, branch, buildid) pkg_exists = os.path.exists(my_pkgname) if pkg_exists: my_mtime = os.stat(my_pkgname)[ST_MTIME] dep_packages = [pkg.packagename(arch, branch, buildid) for pkg in self.alldep] deps_exist = all(os.path.exists(pkg) for pkg in dep_packages) return not (pkg_exists and deps_exist and all(os.stat(pkg)[ST_MTIME] <= my_mtime for pkg in dep_packages)) class Category(object): def __init__(self, name): self.name = name self.ports = {} categories[name] = self def add(self, port): self.ports[port] = port def remove(self, port): self.ports[port]=None del self.ports[port] def gettargets(targets): """ split command line arguments into list of packages to build. Returns set or iterable of all ports that will be built including dependencies """ plist = set() if len(targets) == 0: targets = ["all"] for i in targets: if i == "all": return ports.itervalues() if i.endswith("-all"): cat = i.rpartition("-")[0] plist.update(p.name for p in categories[cat].ports) elif i.rstrip(pkg_sufx) in ports: plist.update([ports[i.rstrip(pkg_sufx)].name]) else: raise KeyError, i # Compute transitive closure of all dependencies pleft=plist.copy() while len(pleft) > 0: pkg = pleft.pop() new = [p.name for p in ports[pkg].alldep] plist.update(new) pleft.update(new) for p in set(ports.keys()).difference(plist): ports[p].destroy() return [ports[p] for p in plist] class worker(threading.Thread): # Protects threads lock = threading.Lock() # Running threads, used for collecting status threads = {} def __init__(self, mach, job, arch, branch, buildid, queue): threading.Thread.__init__(self) self.machine = mach self.job = job self.arch = arch self.branch = branch self.buildid = buildid self.queue = queue self.setDaemon(True) # NOTE 20130211: only use ${pbc}/scripts here, *not* ${pbc}/admin/scripts, # for privilege separation purposes. def run(self): pkg = self.job print "[MASTER] Running job %s" % (pkg.name), if pkg.status == PHASE2: print " (phase 2)" else: print try: runenv={'HOME':"/root", 'PATH':'/sbin:/bin:/usr/sbin:/usr/bin:/usr/games:/usr/local/sbin:/usr/local/bin:%s/scripts' + pbc, 'FD':" ".join([p.name + p.sufx for p in pkg.fdep]), 'ED':" ".join([p.name + p.sufx for p in pkg.edep]), 'PD':" ".join([p.name + p.sufx for p in pkg.pdep]), 'BD':" ".join([p.name + p.sufx for p in pkg.bdep]), 'RD':" ".join([p.name + p.sufx for p in pkg.rdep]), 'PKGD':" ".join([p.name + p.sufx for p in pkg.pkgdep])} for var in ["NOCLEAN", "NO_RESTRICTED", "NOPLISTCHECK", "NO_DISTFILES", "FETCH_ORIGINAL", "TRYBROKEN", "PORTBUILD_CHECKOUT", "PORTBUILD_DATA" ]: if var in os.environ: runenv[var] = os.environ.get(var) build = subprocess.Popen( ["/bin/sh", "%s/scripts/pdispatch" % pbc, self.arch, self.branch, self.buildid, self.machine, "/tmp/%s/scripts/portbuild" % self.buildid, pkg.name + pkg.sufx, pkg.path], env=runenv, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, bufsize=0) except OSError, e: print >>sys.stderr, "[%s:%s]: Execution failed: %s" % \ (pkg.id, pkg.name, e) while True: try: line = build.stdout.readline() except: print "[%s:%s]: Failed reading from build script" % \ (pkg.id, pkg.name) break if line == "": break print "[%s:%s] %s" % (pkg.id, pkg.name, line.rstrip()) retcode = build.wait() # time.sleep(random.randint(0,60)) # # r = random.random() # if r < 0.1: # retcode = 1 # elif r < 0.15: # retcode = 254 # else: # retcode = 0 conn = QManagerClientConn(stderr = sys.stderr) timeout = 1 try: (code, vars) = conn.command("release", {'id':pkg.id}) except RequestError, e: print "[MASTER] Error releasing job %s (%s): %s" % (pkg.name, pkg.id, e.value) if DEBUG_RETCODE: print "[MASTER] got retcode %d from pkg %s" % (retcode, pkg.name) if retcode == 254: # Requeue soft failure at original priority # XXX exponential backoff? time.sleep(60) # print "Requeueing %s" % pkg.id self.queue.put((-pkg.depth, pkg)) elif retcode == 253: # setting up a machine, we should immediately retry self.queue.put((-pkg.depth, pkg)) elif retcode == 0: self.queue.put((SUCCESS_PRIO, pkg)) else: self.queue.put((FAILURE_PRIO, pkg)) # Clean up worker.lock.acquire() worker.threads[self]=None del worker.threads[self] worker.lock.release() @staticmethod def dispatch(mach, job, arch, branch, buildid, queue): wrk = worker(mach, job, arch, branch, buildid, queue) worker.lock.acquire() worker.threads[wrk] = wrk worker.lock.release() wrk.start() def main(arch, branch, buildid, args): global index, pkg_sufx, pkgng pkgng = False basedir=os.path.realpath(pbd+"/"+arch+"/"+branch+"/builds/"+buildid) buildid=basedir.split("/")[-1] portsdir=basedir+"/ports" # get the major branch number. branchbase = branch.split("-")[ 0 ] # XXX ERWLA - Ugly hack branchbase = branchbase.split(".")[ 0 ] indexfile=portsdir+"/INDEX-"+branchbase archconfig = getConfig(pbd, arch, "portbuild.conf") try: branchconfig = getConfig(pbd, "%s/%s" % (arch, branch), "portbuild.conf") archconfig.merge(branchconfig) except: pass pkg_sufx = archconfig.get('pkg_sufx') if not pkg_sufx: print "error: pkg_sufx not defined in portbuild.conf" sys.exit(1) if pkg_sufx == ".txz": pkgng = True print "[MASTER] Parsing INDEX..." index = Index(indexfile) index.parse() print "[MASTER] length = %s" % len(ports) unlimited_errors = False print "[MASTER] Parsing arguments..." targets = [] for arg in args: if arg.find( "-" ) == 0: if ( arg[ 1 : ] == ARG_UNLIMITED_ERRORS): unlimited_errors = True else: print "[MASTER] arg %s not recognized, ignoring." % arg[ 1 : ] else: targets.append( arg ) print "[MASTER] Finding targets..." targets = gettargets(targets) print "[MASTER] Calculating depth..." depthindex(targets) print "[MASTER] Pruning duds..." dudsfile=basedir+"/duds" for line in file(dudsfile): try: dud = ports[line.rstrip()] except KeyError: continue print "[MASTER] Skipping %s (duds)" % dud.name dud.destroy_recursive() queue = PriorityQueue() # XXX can do this while parsing index if we prune targets/duds # first for pkg in ports.itervalues(): if len(pkg.alldep) == 0: queue.put((-pkg.depth, pkg)) # XXX check osversion, pool mdl=["arch = %s" % arch] # Main work loop completed_jobs = 0 failed_jobs = 0 while len(ports) > 0: print "[MASTER] Ports remaining=%s, Queue length=%s" % (len(ports), queue.qsize()) if len(ports) < 10: print "[MASTER] Remaining ports: %s" % ports.keys() (prio, job) = queue.get() if DEBUG_PACKAGEBUILD: print "[MASTER] Job %s pulled from queue with prio %d" % ( job.name, prio ) if prio == SUCCESS_PRIO: print "[MASTER] Job %s succeeded" % job.name for new in job.success(): queue.put((-new.depth, new)) completed_jobs = completed_jobs + 1 continue elif prio == FAILURE_PRIO: if job.status == PHASE2: print "[MASTER] Job %s failed" % job.name job.failure() continue else: completed_jobs = completed_jobs + 1 failed_jobs = failed_jobs + 1 if DEBUG_PACKAGEBUILD: print "[MASTER] jobs: %d failed jobs out of %d:" % \ ( failed_jobs, completed_jobs ) if not unlimited_errors and \ completed_jobs > QMANAGER_RUNAWAY_THRESHOLD and \ float( failed_jobs ) / completed_jobs > QMANAGER_RUNAWAY_PERCENTAGE: print "[MASTER] ERROR: runaway build detected: %d failed jobs out of %d:" % \ ( failed_jobs, completed_jobs ) print "[MASTER] RUN TERMINATED." break job.attempts = job.attempts + 1 # XXX MCL in theory, if all this code worked correctly, # this condition would never trigger. In practice, # however, it does, so bomb out before filling someone's # mbox. # XXX MCL 20110422 perhaps this code has been fixed now; # XXX it did not use to work: if job.attempts > QMANAGER_MAX_JOB_ATTEMPTS: print "[MASTER] Job %s failed %d times; RUN TERMINATED." % ( job.name, job.attempts ) break else: # Requeue at low priority print "[MASTER] Job %s failed (requeued for phase 2)" % job.name job.status = PHASE2 queue.put((PHASE2_BASE_PRIO-job.depth, job)) continue elif job.status == PHASE2: depth = -(prio - PHASE2_BASE_PRIO) else: depth = -prio print "[MASTER] Working on job %s, depth %d" % (job.name, depth) if job.is_stale(arch, branch, buildid): conn = QManagerClientConn(stderr = sys.stderr) (code, vars) = conn.command("acquire", {"name":job.name, "type":"%s/%s/%s package" % \ (arch, branch, buildid), "priority":10, "mdl":mdl}) if code[0] == "2": machine=vars['machine'] job.id=vars['id'] # print "Got ID %s" % job.id worker.dispatch(machine, job, arch, branch, buildid, queue) else: print "[MASTER] Error acquiring job %s: %s" % (pkg.name, code) else: print "[MASTER] Skipping %s since it already exists" % job.name for new in job.success(): queue.put((-new.depth, new)) print "[MASTER] Waiting for threads" threads = worker.threads.copy() for t in threads: print "[MASTER] Outstanding thread: %s" % t.job.name for t in threads: print "[MASTER] Waiting for thread %s" % t.job.name t.join() print "[MASTER] Finished" if __name__ == "__main__": try: main(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4:]) sys.exit( 0 ) except Exception, e: # XXX MCL TODO move this above print "packagebuild: Exception:" try: print str( e ) except: pass sys.exit( 1 )