diff options
Diffstat (limited to 'bitbake/lib')
| -rw-r--r-- | bitbake/lib/bb/cooker.py | 161 |
1 files changed, 132 insertions, 29 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index bb09dff82f..8188aaef34 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py | |||
| @@ -36,6 +36,7 @@ from functools import wraps | |||
| 36 | from collections import defaultdict | 36 | from collections import defaultdict |
| 37 | import bb, bb.exceptions, bb.command | 37 | import bb, bb.exceptions, bb.command |
| 38 | from bb import utils, data, parse, event, cache, providers, taskdata, runqueue | 38 | from bb import utils, data, parse, event, cache, providers, taskdata, runqueue |
| 39 | import Queue | ||
| 39 | import prserv.serv | 40 | import prserv.serv |
| 40 | 41 | ||
| 41 | logger = logging.getLogger("BitBake") | 42 | logger = logging.getLogger("BitBake") |
| @@ -1402,20 +1403,87 @@ class ParsingFailure(Exception): | |||
| 1402 | self.recipe = recipe | 1403 | self.recipe = recipe |
| 1403 | Exception.__init__(self, realexception, recipe) | 1404 | Exception.__init__(self, realexception, recipe) |
| 1404 | 1405 | ||
| 1405 | def parse_file(task): | 1406 | class Feeder(multiprocessing.Process): |
| 1406 | filename, appends, caches_array = task | 1407 | def __init__(self, jobs, to_parsers, quit): |
| 1407 | try: | 1408 | self.quit = quit |
| 1408 | return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array) | 1409 | self.jobs = jobs |
| 1409 | except Exception as exc: | 1410 | self.to_parsers = to_parsers |
| 1410 | tb = sys.exc_info()[2] | 1411 | multiprocessing.Process.__init__(self) |
| 1411 | exc.recipe = filename | 1412 | |
| 1412 | exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) | 1413 | def run(self): |
| 1413 | raise exc | 1414 | while True: |
| 1414 | # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown | 1415 | try: |
| 1415 | # and for example a worker thread doesn't just exit on its own in response to | 1416 | quit = self.quit.get_nowait() |
| 1416 | # a SystemExit event for example. | 1417 | except Queue.Empty: |
| 1417 | except BaseException as exc: | 1418 | pass |
| 1418 | raise ParsingFailure(exc, filename) | 1419 | else: |
| 1420 | if quit == 'cancel': | ||
| 1421 | self.to_parsers.cancel_join_thread() | ||
| 1422 | break | ||
| 1423 | |||
| 1424 | try: | ||
| 1425 | job = self.jobs.pop() | ||
| 1426 | except IndexError: | ||
| 1427 | break | ||
| 1428 | |||
| 1429 | try: | ||
| 1430 | self.to_parsers.put(job, timeout=0.5) | ||
| 1431 | except Queue.Full: | ||
| 1432 | self.jobs.insert(0, job) | ||
| 1433 | continue | ||
| 1434 | |||
| 1435 | class Parser(multiprocessing.Process): | ||
| 1436 | def __init__(self, jobs, results, quit, init): | ||
| 1437 | self.jobs = jobs | ||
| 1438 | self.results = results | ||
| 1439 | self.quit = quit | ||
| 1440 | self.init = init | ||
| 1441 | multiprocessing.Process.__init__(self) | ||
| 1442 | |||
| 1443 | def run(self): | ||
| 1444 | if self.init: | ||
| 1445 | self.init() | ||
| 1446 | |||
| 1447 | pending = [] | ||
| 1448 | while True: | ||
| 1449 | try: | ||
| 1450 | self.quit.get_nowait() | ||
| 1451 | except Queue.Empty: | ||
| 1452 | pass | ||
| 1453 | else: | ||
| 1454 | self.results.cancel_join_thread() | ||
| 1455 | break | ||
| 1456 | |||
| 1457 | if pending: | ||
| 1458 | result = pending.pop() | ||
| 1459 | else: | ||
| 1460 | try: | ||
| 1461 | job = self.jobs.get(timeout=0.25) | ||
| 1462 | except Queue.Empty: | ||
| 1463 | continue | ||
| 1464 | |||
| 1465 | if job is None: | ||
| 1466 | break | ||
| 1467 | result = self.parse(*job) | ||
| 1468 | |||
| 1469 | try: | ||
| 1470 | self.results.put(result, timeout=0.25) | ||
| 1471 | except Queue.Full: | ||
| 1472 | pending.append(result) | ||
| 1473 | |||
| 1474 | def parse(self, filename, appends, caches_array): | ||
| 1475 | try: | ||
| 1476 | return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array) | ||
| 1477 | except Exception as exc: | ||
| 1478 | tb = sys.exc_info()[2] | ||
| 1479 | exc.recipe = filename | ||
| 1480 | exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) | ||
| 1481 | return True, exc | ||
| 1482 | # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown | ||
| 1483 | # and for example a worker thread doesn't just exit on its own in response to | ||
| 1484 | # a SystemExit event for example. | ||
| 1485 | except BaseException as exc: | ||
| 1486 | return True, ParsingFailure(exc, filename) | ||
| 1419 | 1487 | ||
| 1420 | class CookerParser(object): | 1488 | class CookerParser(object): |
| 1421 | def __init__(self, cooker, filelist, masked): | 1489 | def __init__(self, cooker, filelist, masked): |
| @@ -1452,22 +1520,28 @@ class CookerParser(object): | |||
| 1452 | self.start() | 1520 | self.start() |
| 1453 | 1521 | ||
| 1454 | def start(self): | 1522 | def start(self): |
| 1455 | def init(cfg): | ||
| 1456 | parse_file.cfg = cfg | ||
| 1457 | multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cooker.configuration.data, ), exitpriority=1) | ||
| 1458 | |||
| 1459 | self.results = self.load_cached() | 1523 | self.results = self.load_cached() |
| 1460 | 1524 | self.processes = [] | |
| 1461 | if self.toparse: | 1525 | if self.toparse: |
| 1462 | bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) | 1526 | bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) |
| 1463 | 1527 | def init(): | |
| 1464 | self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata]) | 1528 | Parser.cfg = self.cfgdata |
| 1465 | parsed = self.pool.imap(parse_file, self.willparse) | 1529 | multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1) |
| 1466 | self.pool.close() | 1530 | |
| 1467 | 1531 | self.feeder_quit = multiprocessing.Queue(maxsize=1) | |
| 1468 | self.results = itertools.chain(self.results, parsed) | 1532 | self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes) |
| 1469 | 1533 | self.jobs = multiprocessing.Queue(maxsize=self.num_processes) | |
| 1470 | def shutdown(self, clean=True): | 1534 | self.result_queue = multiprocessing.Queue() |
| 1535 | self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit) | ||
| 1536 | self.feeder.start() | ||
| 1537 | for i in range(1, self.num_processes): | ||
| 1538 | parser = Parser(self.jobs, self.result_queue, self.parser_quit, init) | ||
| 1539 | parser.start() | ||
| 1540 | self.processes.append(parser) | ||
| 1541 | |||
| 1542 | self.results = itertools.chain(self.results, self.parse_generator()) | ||
| 1543 | |||
| 1544 | def shutdown(self, clean=True, force=False): | ||
| 1471 | if not self.toparse: | 1545 | if not self.toparse: |
| 1472 | return | 1546 | return |
| 1473 | 1547 | ||
| @@ -1477,9 +1551,22 @@ class CookerParser(object): | |||
| 1477 | self.virtuals, self.error, | 1551 | self.virtuals, self.error, |
| 1478 | self.total) | 1552 | self.total) |
| 1479 | bb.event.fire(event, self.cfgdata) | 1553 | bb.event.fire(event, self.cfgdata) |
| 1554 | self.feeder_quit.put(None) | ||
| 1555 | for process in self.processes: | ||
| 1556 | self.jobs.put(None) | ||
| 1480 | else: | 1557 | else: |
| 1481 | self.pool.terminate() | 1558 | self.feeder_quit.put('cancel') |
| 1482 | self.pool.join() | 1559 | |
| 1560 | self.parser_quit.cancel_join_thread() | ||
| 1561 | for process in self.processes: | ||
| 1562 | self.parser_quit.put(None) | ||
| 1563 | |||
| 1564 | self.jobs.cancel_join_thread() | ||
| 1565 | sys.exit(1) | ||
| 1566 | |||
| 1567 | for process in self.processes: | ||
| 1568 | process.join() | ||
| 1569 | self.feeder.join() | ||
| 1483 | 1570 | ||
| 1484 | sync = threading.Thread(target=self.bb_cache.sync) | 1571 | sync = threading.Thread(target=self.bb_cache.sync) |
| 1485 | sync.start() | 1572 | sync.start() |
| @@ -1491,6 +1578,22 @@ class CookerParser(object): | |||
| 1491 | cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) | 1578 | cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) |
| 1492 | yield not cached, infos | 1579 | yield not cached, infos |
| 1493 | 1580 | ||
| 1581 | def parse_generator(self): | ||
| 1582 | while True: | ||
| 1583 | if self.parsed >= self.toparse: | ||
| 1584 | break | ||
| 1585 | |||
| 1586 | try: | ||
| 1587 | result = self.result_queue.get(timeout=0.25) | ||
| 1588 | except Queue.Empty: | ||
| 1589 | pass | ||
| 1590 | else: | ||
| 1591 | value = result[1] | ||
| 1592 | if isinstance(value, BaseException): | ||
| 1593 | raise value | ||
| 1594 | else: | ||
| 1595 | yield result | ||
| 1596 | |||
| 1494 | def parse_next(self): | 1597 | def parse_next(self): |
| 1495 | try: | 1598 | try: |
| 1496 | parsed, result = self.results.next() | 1599 | parsed, result = self.results.next() |
