From 5adeefd63fd499fee13c77fb4c1da8ef2f128377 Mon Sep 17 00:00:00 2001 From: Joshua Watt Date: Tue, 8 Jul 2025 09:42:22 -0600 Subject: bitbake: cooker: Use shared counter for processing parser jobs Instead of pre-partitioning which jobs will go to which parser processes, pass the list of all jobs to all the parser processes (efficiently via fork()), then used a shared counter of the next index in the list that needs to be processed. This allows the parser processes to run independently of needing to be feed by the parent process, and load balances them much better. (Bitbake rev: 373c4ddaf0e8128cc4f7d47aefa9860bd477a00f) Signed-off-by: Joshua Watt Signed-off-by: Richard Purdie --- bitbake/lib/bb/cooker.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 2bb80e330d..dc131939ed 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py @@ -26,6 +26,7 @@ import json import pickle import codecs import hashserv +import ctypes logger = logging.getLogger("BitBake") collectlog = logging.getLogger("BitBake.Collection") @@ -1998,8 +1999,9 @@ class ParsingFailure(Exception): Exception.__init__(self, realexception, recipe) class Parser(multiprocessing.Process): - def __init__(self, jobs, results, quit, profile): + def __init__(self, jobs, next_job_id, results, quit, profile): self.jobs = jobs + self.next_job_id = next_job_id self.results = results self.quit = quit multiprocessing.Process.__init__(self) @@ -2065,10 +2067,14 @@ class Parser(multiprocessing.Process): break job = None - try: - job = self.jobs.pop() - except IndexError: - havejobs = False + if havejobs: + with self.next_job_id.get_lock(): + if self.next_job_id.value < len(self.jobs): + job = self.jobs[self.next_job_id.value] + self.next_job_id.value += 1 + else: + havejobs = False + if job: result = self.parse(*job) # Clear the siggen cache after parsing to control memory usage, its huge @@ -2134,13 +2140,13 @@ class CookerParser(object): self.bb_caches = bb.cache.MulticonfigCache(self.cfgbuilder, self.cfghash, cooker.caches_array) self.fromcache = set() - self.willparse = set() + self.willparse = [] for mc in self.cooker.multiconfigs: for filename in self.mcfilelist[mc]: appends = self.cooker.collections[mc].get_file_appends(filename) layername = self.cooker.collections[mc].calc_bbfile_priority(filename)[2] if not self.bb_caches[mc].cacheValid(filename, appends): - self.willparse.add((mc, self.bb_caches[mc], filename, appends, layername)) + self.willparse.append((mc, self.bb_caches[mc], filename, appends, layername)) else: self.fromcache.add((mc, self.bb_caches[mc], filename, appends, layername)) @@ -2159,18 +2165,18 @@ class CookerParser(object): def start(self): self.results = self.load_cached() self.processes = [] + if self.toparse: bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) + next_job_id = multiprocessing.Value(ctypes.c_int, 0) self.parser_quit = multiprocessing.Event() self.result_queue = multiprocessing.Queue() - def chunkify(lst,n): - return [lst[i::n] for i in range(n)] - self.jobs = chunkify(list(self.willparse), self.num_processes) - + # Have to pass in willparse at fork time so all parsing processes have the unpickleable data + # then access it by index from the parse queue. for i in range(0, self.num_processes): - parser = Parser(self.jobs[i], self.result_queue, self.parser_quit, self.cooker.configuration.profile) + parser = Parser(self.willparse, next_job_id, self.result_queue, self.parser_quit, self.cooker.configuration.profile) parser.start() self.process_names.append(parser.name) self.processes.append(parser) -- cgit v1.2.3-54-g00ecf