From 57c527d630ce8230fd17f3cdb33d388ced34c63b Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Wed, 3 Jul 2019 16:22:15 +0100 Subject: bitbake: runqueue: Merge scenequeue and real task queue code together Merge the unique functions from the Tasks and Scenequeue Tasks classes into the common base class. (Bitbake rev: 7539fe22bc831bb835901e3aca77985ab4ebc4c7) Signed-off-by: Richard Purdie --- bitbake/lib/bb/runqueue.py | 728 ++++++++++++++++++++++----------------------- 1 file changed, 364 insertions(+), 364 deletions(-) diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 3cc804de45..80cc60ed7f 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -1839,66 +1839,6 @@ class RunQueueExecute: can_start = active < self.number_tasks return can_start -class RunQueueExecuteDummy(RunQueueExecute): - def __init__(self, rq): - self.rq = rq - self.stats = RunQueueStats(0) - - def finish(self): - self.rq.state = runQueueComplete - return - -class RunQueueExecuteTasks(RunQueueExecute): - def __init__(self, rq): - RunQueueExecute.__init__(self, rq) - - self.stampcache = {} - - # Mark initial buildable tasks - for tid in self.rqdata.runtaskentries: - if len(self.rqdata.runtaskentries[tid].depends) == 0: - self.runq_buildable.add(tid) - if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): - self.rq.scenequeue_covered.add(tid) - - found = True - while found: - found = False - for tid in self.rqdata.runtaskentries: - if tid in self.rq.scenequeue_covered: - continue - logger.debug(1, 'Considering %s: %s' % (tid, str(self.rqdata.runtaskentries[tid].revdeps))) - - if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): - if tid in self.rq.scenequeue_notcovered: - continue - found = True - self.rq.scenequeue_covered.add(tid) - - logger.debug(1, 'Skip list %s', sorted(self.rq.scenequeue_covered)) - - for task in self.rq.scenequeue_notcovered: - logger.debug(1, 'Not skipping task %s', task) - - for mc in self.rqdata.dataCaches: - target_pairs = [] - for tid in self.rqdata.target_tids: - (tidmc, fn, taskname, _) = split_tid_mcfn(tid) - if tidmc == mc: - target_pairs.append((fn, taskname)) - - event.fire(bb.event.StampUpdate(target_pairs, self.rqdata.dataCaches[mc].stamp), self.cfgData) - - schedulers = self.get_schedulers() - for scheduler in schedulers: - if self.scheduler == scheduler.name: - self.sched = scheduler(self, self.rqdata) - logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) - break - else: - bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % - (self.scheduler, ", ".join(obj.name for obj in schedulers))) - def get_schedulers(self): schedulers = set(obj for obj in globals().values() if type(obj) is type and @@ -2100,125 +2040,323 @@ class RunQueueExecuteTasks(RunQueueExecute): #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) return taskdepdata -class SQData(object): - def __init__(self): - # SceneQueue dependencies - self.sq_deps = {} - # SceneQueue reverse dependencies - self.sq_revdeps = {} - # Copy of reverse dependencies used by sq processing code - self.sq_revdeps2 = {} - # Injected inter-setscene task dependencies - self.sq_harddeps = {} - # Cache of stamp files so duplicates can't run in parallel - self.stamps = {} - # Setscene tasks directly depended upon by the build - self.unskippable = [] - # List of setscene tasks which aren't present - self.outrightfail = [] - # A list of normal tasks a setscene task covers - self.sq_covered_tasks = {} + def scenequeue_updatecounters(self, task, fail = False): + for dep in self.sqdata.sq_deps[task]: + if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: + logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) + self.scenequeue_updatecounters(dep, fail) + continue + if task not in self.sqdata.sq_revdeps2[dep]: + # May already have been removed by the fail case above + continue + self.sqdata.sq_revdeps2[dep].remove(task) + if len(self.sqdata.sq_revdeps2[dep]) == 0: + self.sq_buildable.add(dep) -def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): + def sq_task_completeoutright(self, task): + """ + Mark a task as completed + Look at the reverse dependencies and mark any task with + completed dependencies as buildable + """ - sq_revdeps = {} - sq_revdeps_new = {} - sq_revdeps_squash = {} + logger.debug(1, 'Found task %s which could be accelerated', task) + self.scenequeue_covered.add(task) + self.scenequeue_updatecounters(task) - # We need to construct a dependency graph for the setscene functions. Intermediate - # dependencies between the setscene tasks only complicate the code. This code - # therefore aims to collapse the huge runqueue dependency tree into a smaller one - # only containing the setscene functions. + def sq_check_taskfail(self, task): + if self.rqdata.setscenewhitelist is not None: + realtask = task.split('_setscene')[0] + (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] + if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): + logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) + self.rq.state = runQueueCleanUp - rqdata.init_progress_reporter.next_stage() + def sq_task_complete(self, task): + self.sq_stats.taskCompleted() + bb.event.fire(sceneQueueTaskCompleted(task, self.sq_stats, self.rq), self.cfgData) + self.sq_task_completeoutright(task) - # First process the chains up to the first setscene task. - endpoints = {} - for tid in rqdata.runtaskentries: - sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) - sq_revdeps_new[tid] = set() - if (len(sq_revdeps[tid]) == 0) and tid not in rqdata.runq_setscene_tids: - #bb.warn("Added endpoint %s" % (tid)) - endpoints[tid] = set() + def sq_task_fail(self, task, result): + self.sq_stats.taskFailed() + bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) + self.scenequeue_notcovered.add(task) + self.scenequeue_updatecounters(task, True) + self.sq_check_taskfail(task) - rqdata.init_progress_reporter.next_stage() + def sq_task_failoutright(self, task): + self.sq_running.add(task) + self.sq_buildable.add(task) + self.sq_stats.taskSkipped() + self.sq_stats.taskCompleted() + self.scenequeue_notcovered.add(task) + self.scenequeue_updatecounters(task, True) - # Secondly process the chains between setscene tasks. - for tid in rqdata.runq_setscene_tids: - #bb.warn("Added endpoint 2 %s" % (tid)) - for dep in rqdata.runtaskentries[tid].depends: - if tid in sq_revdeps[dep]: - sq_revdeps[dep].remove(tid) - if dep not in endpoints: - endpoints[dep] = set() - #bb.warn(" Added endpoint 3 %s" % (dep)) - endpoints[dep].add(tid) + def sq_task_skip(self, task): + self.sq_running.add(task) + self.sq_buildable.add(task) + self.sq_task_completeoutright(task) + self.sq_stats.taskSkipped() + self.sq_stats.taskCompleted() - rqdata.init_progress_reporter.next_stage() + def sq_execute(self): + """ + Run the tasks in a queue prepared by prepare_runqueue + """ - def process_endpoints(endpoints): - newendpoints = {} - for point, task in endpoints.items(): - tasks = set() - if task: - tasks |= task - if sq_revdeps_new[point]: - tasks |= sq_revdeps_new[point] - sq_revdeps_new[point] = set() - if point in rqdata.runq_setscene_tids: - sq_revdeps_new[point] = tasks - tasks = set() - continue - for dep in rqdata.runtaskentries[point].depends: - if point in sq_revdeps[dep]: - sq_revdeps[dep].remove(point) - if tasks: - sq_revdeps_new[dep] |= tasks - if len(sq_revdeps[dep]) == 0 and dep not in rqdata.runq_setscene_tids: - newendpoints[dep] = task - if len(newendpoints) != 0: - process_endpoints(newendpoints) + self.rq.read_workers() - process_endpoints(endpoints) + task = None + if self.can_start_task(): + # Find the next setscene to run + for nexttask in self.rqdata.runq_setscene_tids: + if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): + if nexttask in self.sqdata.unskippable: + logger.debug(2, "Setscene task %s is unskippable" % nexttask) + if nexttask not in self.sqdata.unskippable and len(self.sqdata.sq_revdeps[nexttask]) > 0 and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): + fn = fn_from_tid(nexttask) + foundtarget = False - rqdata.init_progress_reporter.next_stage() + if nexttask in self.rqdata.target_tids: + foundtarget = True + if not foundtarget: + logger.debug(2, "Skipping setscene for task %s" % nexttask) + self.sq_task_skip(nexttask) + self.scenequeue_notneeded.add(nexttask) + return True + if nexttask in self.sqdata.outrightfail: + self.sq_task_failoutright(nexttask) + return True + task = nexttask + break + if task is not None: + (mc, fn, taskname, taskfn) = split_tid_mcfn(task) + taskname = taskname + "_setscene" + if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): + logger.debug(2, 'Stamp for underlying task %s is current, so skipping setscene variant', task) + self.sq_task_failoutright(task) + return True - # Build a list of setscene tasks which are "unskippable" - # These are direct endpoints referenced by the build - endpoints2 = {} - sq_revdeps2 = {} - sq_revdeps_new2 = {} - def process_endpoints2(endpoints): - newendpoints = {} - for point, task in endpoints.items(): - tasks = set([point]) - if task: - tasks |= task - if sq_revdeps_new2[point]: - tasks |= sq_revdeps_new2[point] - sq_revdeps_new2[point] = set() - if point in rqdata.runq_setscene_tids: - sq_revdeps_new2[point] = tasks - for dep in rqdata.runtaskentries[point].depends: - if point in sq_revdeps2[dep]: - sq_revdeps2[dep].remove(point) - if tasks: - sq_revdeps_new2[dep] |= tasks - if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in rqdata.runq_setscene_tids: - newendpoints[dep] = tasks - if len(newendpoints) != 0: - process_endpoints2(newendpoints) - for tid in rqdata.runtaskentries: - sq_revdeps2[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) - sq_revdeps_new2[tid] = set() - if (len(sq_revdeps2[tid]) == 0) and tid not in rqdata.runq_setscene_tids: - endpoints2[tid] = set() - process_endpoints2(endpoints2) - for tid in rqdata.runq_setscene_tids: - if sq_revdeps_new2[tid]: - sqdata.unskippable.append(tid) + if self.cooker.configuration.force: + if task in self.rqdata.target_tids: + self.sq_task_failoutright(task) + return True - rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) + if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): + logger.debug(2, 'Setscene stamp current task %s, so skip it and its dependencies', task) + self.sq_task_skip(task) + return True + + if self.cooker.configuration.skipsetscene: + logger.debug(2, 'No setscene tasks should be executed. Skipping %s', task) + self.sq_task_failoutright(task) + return True + + startevent = sceneQueueTaskStarted(task, self.sq_stats, self.rq) + bb.event.fire(startevent, self.cfgData) + + taskdepdata = self.sq_build_taskdepdata(task) + + taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] + taskhash = self.rqdata.get_task_hash(task) + unihash = self.rqdata.get_task_unihash(task) + if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: + if not mc in self.rq.fakeworker: + self.rq.start_fakeworker(self, mc) + self.rq.fakeworker[mc].process.stdin.write(b"" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"") + self.rq.fakeworker[mc].process.stdin.flush() + else: + self.rq.worker[mc].process.stdin.write(b"" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"") + self.rq.worker[mc].process.stdin.flush() + + self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) + self.build_stamps2.append(self.build_stamps[task]) + self.sq_running.add(task) + self.sq_live.add(task) + self.sq_stats.taskActive() + if self.can_start_task(): + return True + + if self.sq_stats.active > 0: + self.rq.read_workers() + return self.rq.active_fds() + + #for tid in self.sqdata.sq_revdeps: + # if tid not in self.sq_running: + # buildable = tid in self.sq_buildable + # revdeps = self.sqdata.sq_revdeps[tid] + # bb.warn("Found we didn't run %s %s %s" % (tid, buildable, str(revdeps))) + + self.rq.scenequeue_covered = self.scenequeue_covered + self.rq.scenequeue_notcovered = self.scenequeue_notcovered + + logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.rq.scenequeue_covered))) + + self.rq.state = runQueueRunInit + + completeevent = sceneQueueComplete(self.sq_stats, self.rq) + bb.event.fire(completeevent, self.cfgData) + + return True + + def sq_build_taskdepdata(self, task): + def getsetscenedeps(tid): + deps = set() + (mc, fn, taskname, _) = split_tid_mcfn(tid) + realtid = tid + "_setscene" + idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends + for (depname, idependtask) in idepends: + if depname not in self.rqdata.taskData[mc].build_targets: + continue + + depfn = self.rqdata.taskData[mc].build_targets[depname][0] + if depfn is None: + continue + deptid = depfn + ":" + idependtask.replace("_setscene", "") + deps.add(deptid) + return deps + + taskdepdata = {} + next = getsetscenedeps(task) + next.add(task) + while next: + additional = [] + for revdep in next: + (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] + deps = getsetscenedeps(revdep) + provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] + taskhash = self.rqdata.runtaskentries[revdep].hash + unihash = self.rqdata.runtaskentries[revdep].unihash + taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] + for revdep2 in deps: + if revdep2 not in taskdepdata: + additional.append(revdep2) + next = additional + + #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) + return taskdepdata + +class SQData(object): + def __init__(self): + # SceneQueue dependencies + self.sq_deps = {} + # SceneQueue reverse dependencies + self.sq_revdeps = {} + # Copy of reverse dependencies used by sq processing code + self.sq_revdeps2 = {} + # Injected inter-setscene task dependencies + self.sq_harddeps = {} + # Cache of stamp files so duplicates can't run in parallel + self.stamps = {} + # Setscene tasks directly depended upon by the build + self.unskippable = [] + # List of setscene tasks which aren't present + self.outrightfail = [] + # A list of normal tasks a setscene task covers + self.sq_covered_tasks = {} + +def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): + + sq_revdeps = {} + sq_revdeps_new = {} + sq_revdeps_squash = {} + + # We need to construct a dependency graph for the setscene functions. Intermediate + # dependencies between the setscene tasks only complicate the code. This code + # therefore aims to collapse the huge runqueue dependency tree into a smaller one + # only containing the setscene functions. + + rqdata.init_progress_reporter.next_stage() + + # First process the chains up to the first setscene task. + endpoints = {} + for tid in rqdata.runtaskentries: + sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) + sq_revdeps_new[tid] = set() + if (len(sq_revdeps[tid]) == 0) and tid not in rqdata.runq_setscene_tids: + #bb.warn("Added endpoint %s" % (tid)) + endpoints[tid] = set() + + rqdata.init_progress_reporter.next_stage() + + # Secondly process the chains between setscene tasks. + for tid in rqdata.runq_setscene_tids: + #bb.warn("Added endpoint 2 %s" % (tid)) + for dep in rqdata.runtaskentries[tid].depends: + if tid in sq_revdeps[dep]: + sq_revdeps[dep].remove(tid) + if dep not in endpoints: + endpoints[dep] = set() + #bb.warn(" Added endpoint 3 %s" % (dep)) + endpoints[dep].add(tid) + + rqdata.init_progress_reporter.next_stage() + + def process_endpoints(endpoints): + newendpoints = {} + for point, task in endpoints.items(): + tasks = set() + if task: + tasks |= task + if sq_revdeps_new[point]: + tasks |= sq_revdeps_new[point] + sq_revdeps_new[point] = set() + if point in rqdata.runq_setscene_tids: + sq_revdeps_new[point] = tasks + tasks = set() + continue + for dep in rqdata.runtaskentries[point].depends: + if point in sq_revdeps[dep]: + sq_revdeps[dep].remove(point) + if tasks: + sq_revdeps_new[dep] |= tasks + if len(sq_revdeps[dep]) == 0 and dep not in rqdata.runq_setscene_tids: + newendpoints[dep] = task + if len(newendpoints) != 0: + process_endpoints(newendpoints) + + process_endpoints(endpoints) + + rqdata.init_progress_reporter.next_stage() + + # Build a list of setscene tasks which are "unskippable" + # These are direct endpoints referenced by the build + endpoints2 = {} + sq_revdeps2 = {} + sq_revdeps_new2 = {} + def process_endpoints2(endpoints): + newendpoints = {} + for point, task in endpoints.items(): + tasks = set([point]) + if task: + tasks |= task + if sq_revdeps_new2[point]: + tasks |= sq_revdeps_new2[point] + sq_revdeps_new2[point] = set() + if point in rqdata.runq_setscene_tids: + sq_revdeps_new2[point] = tasks + for dep in rqdata.runtaskentries[point].depends: + if point in sq_revdeps2[dep]: + sq_revdeps2[dep].remove(point) + if tasks: + sq_revdeps_new2[dep] |= tasks + if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in rqdata.runq_setscene_tids: + newendpoints[dep] = tasks + if len(newendpoints) != 0: + process_endpoints2(newendpoints) + for tid in rqdata.runtaskentries: + sq_revdeps2[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) + sq_revdeps_new2[tid] = set() + if (len(sq_revdeps2[tid]) == 0) and tid not in rqdata.runq_setscene_tids: + endpoints2[tid] = set() + process_endpoints2(endpoints2) + for tid in rqdata.runq_setscene_tids: + if sq_revdeps_new2[tid]: + sqdata.unskippable.append(tid) + + rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) for taskcounter, tid in enumerate(rqdata.runtaskentries): if tid in rqdata.runq_setscene_tids: @@ -2346,6 +2484,66 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): sqdata.outrightfail.append(tid) +class RunQueueExecuteDummy(RunQueueExecute): + def __init__(self, rq): + self.rq = rq + self.stats = RunQueueStats(0) + + def finish(self): + self.rq.state = runQueueComplete + return + +class RunQueueExecuteTasks(RunQueueExecute): + def __init__(self, rq): + RunQueueExecute.__init__(self, rq) + + self.stampcache = {} + + # Mark initial buildable tasks + for tid in self.rqdata.runtaskentries: + if len(self.rqdata.runtaskentries[tid].depends) == 0: + self.runq_buildable.add(tid) + if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): + self.rq.scenequeue_covered.add(tid) + + found = True + while found: + found = False + for tid in self.rqdata.runtaskentries: + if tid in self.rq.scenequeue_covered: + continue + logger.debug(1, 'Considering %s: %s' % (tid, str(self.rqdata.runtaskentries[tid].revdeps))) + + if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): + if tid in self.rq.scenequeue_notcovered: + continue + found = True + self.rq.scenequeue_covered.add(tid) + + logger.debug(1, 'Skip list %s', sorted(self.rq.scenequeue_covered)) + + for task in self.rq.scenequeue_notcovered: + logger.debug(1, 'Not skipping task %s', task) + + for mc in self.rqdata.dataCaches: + target_pairs = [] + for tid in self.rqdata.target_tids: + (tidmc, fn, taskname, _) = split_tid_mcfn(tid) + if tidmc == mc: + target_pairs.append((fn, taskname)) + + event.fire(bb.event.StampUpdate(target_pairs, self.rqdata.dataCaches[mc].stamp), self.cfgData) + + schedulers = self.get_schedulers() + for scheduler in schedulers: + if self.scheduler == scheduler.name: + self.sched = scheduler(self, self.rqdata) + logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) + break + else: + bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % + (self.scheduler, ", ".join(obj.name for obj in schedulers))) + class RunQueueExecuteScenequeue(RunQueueExecute): def __init__(self, rq): RunQueueExecute.__init__(self, rq) @@ -2368,204 +2566,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute): self.rq.state = runQueueSceneRun - def scenequeue_updatecounters(self, task, fail = False): - for dep in self.sqdata.sq_deps[task]: - if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: - logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) - self.scenequeue_updatecounters(dep, fail) - continue - if task not in self.sqdata.sq_revdeps2[dep]: - # May already have been removed by the fail case above - continue - self.sqdata.sq_revdeps2[dep].remove(task) - if len(self.sqdata.sq_revdeps2[dep]) == 0: - self.sq_buildable.add(dep) - - def sq_task_completeoutright(self, task): - """ - Mark a task as completed - Look at the reverse dependencies and mark any task with - completed dependencies as buildable - """ - - logger.debug(1, 'Found task %s which could be accelerated', task) - self.scenequeue_covered.add(task) - self.scenequeue_updatecounters(task) - - def sq_check_taskfail(self, task): - if self.rqdata.setscenewhitelist is not None: - realtask = task.split('_setscene')[0] - (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): - logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) - self.rq.state = runQueueCleanUp - - def sq_task_complete(self, task): - self.sq_stats.taskCompleted() - bb.event.fire(sceneQueueTaskCompleted(task, self.sq_stats, self.rq), self.cfgData) - self.sq_task_completeoutright(task) - - def sq_task_fail(self, task, result): - self.sq_stats.taskFailed() - bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) - self.scenequeue_notcovered.add(task) - self.scenequeue_updatecounters(task, True) - self.sq_check_taskfail(task) - - def sq_task_failoutright(self, task): - self.sq_running.add(task) - self.sq_buildable.add(task) - self.sq_stats.taskSkipped() - self.sq_stats.taskCompleted() - self.scenequeue_notcovered.add(task) - self.scenequeue_updatecounters(task, True) - - def sq_task_skip(self, task): - self.sq_running.add(task) - self.sq_buildable.add(task) - self.sq_task_completeoutright(task) - self.sq_stats.taskSkipped() - self.sq_stats.taskCompleted() - - def sq_execute(self): - """ - Run the tasks in a queue prepared by prepare_runqueue - """ - - self.rq.read_workers() - - task = None - if self.can_start_task(): - # Find the next setscene to run - for nexttask in self.rqdata.runq_setscene_tids: - if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): - if nexttask in self.sqdata.unskippable: - logger.debug(2, "Setscene task %s is unskippable" % nexttask) - if nexttask not in self.sqdata.unskippable and len(self.sqdata.sq_revdeps[nexttask]) > 0 and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): - fn = fn_from_tid(nexttask) - foundtarget = False - - if nexttask in self.rqdata.target_tids: - foundtarget = True - if not foundtarget: - logger.debug(2, "Skipping setscene for task %s" % nexttask) - self.sq_task_skip(nexttask) - self.scenequeue_notneeded.add(nexttask) - return True - if nexttask in self.sqdata.outrightfail: - self.sq_task_failoutright(nexttask) - return True - task = nexttask - break - if task is not None: - (mc, fn, taskname, taskfn) = split_tid_mcfn(task) - taskname = taskname + "_setscene" - if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): - logger.debug(2, 'Stamp for underlying task %s is current, so skipping setscene variant', task) - self.sq_task_failoutright(task) - return True - - if self.cooker.configuration.force: - if task in self.rqdata.target_tids: - self.sq_task_failoutright(task) - return True - - if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): - logger.debug(2, 'Setscene stamp current task %s, so skip it and its dependencies', task) - self.sq_task_skip(task) - return True - - if self.cooker.configuration.skipsetscene: - logger.debug(2, 'No setscene tasks should be executed. Skipping %s', task) - self.sq_task_failoutright(task) - return True - - startevent = sceneQueueTaskStarted(task, self.sq_stats, self.rq) - bb.event.fire(startevent, self.cfgData) - - taskdepdata = self.sq_build_taskdepdata(task) - - taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] - taskhash = self.rqdata.get_task_hash(task) - unihash = self.rqdata.get_task_unihash(task) - if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: - if not mc in self.rq.fakeworker: - self.rq.start_fakeworker(self, mc) - self.rq.fakeworker[mc].process.stdin.write(b"" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"") - self.rq.fakeworker[mc].process.stdin.flush() - else: - self.rq.worker[mc].process.stdin.write(b"" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"") - self.rq.worker[mc].process.stdin.flush() - - self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) - self.build_stamps2.append(self.build_stamps[task]) - self.sq_running.add(task) - self.sq_live.add(task) - self.sq_stats.taskActive() - if self.can_start_task(): - return True - - if self.sq_stats.active > 0: - self.rq.read_workers() - return self.rq.active_fds() - - #for tid in self.sqdata.sq_revdeps: - # if tid not in self.sq_running: - # buildable = tid in self.sq_buildable - # revdeps = self.sqdata.sq_revdeps[tid] - # bb.warn("Found we didn't run %s %s %s" % (tid, buildable, str(revdeps))) - - self.rq.scenequeue_covered = self.scenequeue_covered - self.rq.scenequeue_notcovered = self.scenequeue_notcovered - - logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.rq.scenequeue_covered))) - - self.rq.state = runQueueRunInit - - completeevent = sceneQueueComplete(self.sq_stats, self.rq) - bb.event.fire(completeevent, self.cfgData) - - return True - - def sq_build_taskdepdata(self, task): - def getsetscenedeps(tid): - deps = set() - (mc, fn, taskname, _) = split_tid_mcfn(tid) - realtid = tid + "_setscene" - idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends - for (depname, idependtask) in idepends: - if depname not in self.rqdata.taskData[mc].build_targets: - continue - - depfn = self.rqdata.taskData[mc].build_targets[depname][0] - if depfn is None: - continue - deptid = depfn + ":" + idependtask.replace("_setscene", "") - deps.add(deptid) - return deps - - taskdepdata = {} - next = getsetscenedeps(task) - next.add(task) - while next: - additional = [] - for revdep in next: - (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - deps = getsetscenedeps(revdep) - provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] - taskhash = self.rqdata.runtaskentries[revdep].hash - unihash = self.rqdata.runtaskentries[revdep].unihash - taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] - for revdep2 in deps: - if revdep2 not in taskdepdata: - additional.append(revdep2) - next = additional - - #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) - return taskdepdata - class TaskFailure(Exception): """ Exception raised when a task in a runqueue fails -- cgit v1.2.3-54-g00ecf