diff options
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/__init__.py | 2 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 76 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 80 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/tests.py | 82 |
4 files changed, 2 insertions, 238 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index 639e1607f8..a4371643d7 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py | |||
| @@ -5,7 +5,7 @@ | |||
| 5 | # | 5 | # |
| 6 | 6 | ||
| 7 | 7 | ||
| 8 | from .client import AsyncClient, Client, ClientPool | 8 | from .client import AsyncClient, Client |
| 9 | from .serv import AsyncServer, AsyncServerConnection | 9 | from .serv import AsyncServer, AsyncServerConnection |
| 10 | from .connection import DEFAULT_MAX_CHUNK | 10 | from .connection import DEFAULT_MAX_CHUNK |
| 11 | from .exceptions import ( | 11 | from .exceptions import ( |
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index f81ad92f48..11179b0fcb 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
| @@ -29,6 +29,7 @@ WEBSOCKETS_MIN_VERSION = (9, 1) | |||
| 29 | if sys.version_info >= (3, 10, 0): | 29 | if sys.version_info >= (3, 10, 0): |
| 30 | WEBSOCKETS_MIN_VERSION = (10, 0) | 30 | WEBSOCKETS_MIN_VERSION = (10, 0) |
| 31 | 31 | ||
| 32 | |||
| 32 | def parse_address(addr): | 33 | def parse_address(addr): |
| 33 | if addr.startswith(UNIX_PREFIX): | 34 | if addr.startswith(UNIX_PREFIX): |
| 34 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) | 35 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) |
| @@ -259,78 +260,3 @@ class Client(object): | |||
| 259 | def __exit__(self, exc_type, exc_value, traceback): | 260 | def __exit__(self, exc_type, exc_value, traceback): |
| 260 | self.close() | 261 | self.close() |
| 261 | return False | 262 | return False |
| 262 | |||
| 263 | |||
| 264 | class ClientPool(object): | ||
| 265 | def __init__(self, max_clients): | ||
| 266 | self.avail_clients = [] | ||
| 267 | self.num_clients = 0 | ||
| 268 | self.max_clients = max_clients | ||
| 269 | self.loop = None | ||
| 270 | self.client_condition = None | ||
| 271 | |||
| 272 | @abc.abstractmethod | ||
| 273 | async def _new_client(self): | ||
| 274 | raise NotImplementedError("Must be implemented in derived class") | ||
| 275 | |||
| 276 | def close(self): | ||
| 277 | if self.client_condition: | ||
| 278 | self.client_condition = None | ||
| 279 | |||
| 280 | if self.loop: | ||
| 281 | self.loop.run_until_complete(self.__close_clients()) | ||
| 282 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
| 283 | self.loop.close() | ||
| 284 | self.loop = None | ||
| 285 | |||
| 286 | def run_tasks(self, tasks): | ||
| 287 | if not self.loop: | ||
| 288 | self.loop = asyncio.new_event_loop() | ||
| 289 | |||
| 290 | thread = Thread(target=self.__thread_main, args=(tasks,)) | ||
| 291 | thread.start() | ||
| 292 | thread.join() | ||
| 293 | |||
| 294 | @contextlib.asynccontextmanager | ||
| 295 | async def get_client(self): | ||
| 296 | async with self.client_condition: | ||
| 297 | if self.avail_clients: | ||
| 298 | client = self.avail_clients.pop() | ||
| 299 | elif self.num_clients < self.max_clients: | ||
| 300 | self.num_clients += 1 | ||
| 301 | client = await self._new_client() | ||
| 302 | else: | ||
| 303 | while not self.avail_clients: | ||
| 304 | await self.client_condition.wait() | ||
| 305 | client = self.avail_clients.pop() | ||
| 306 | |||
| 307 | try: | ||
| 308 | yield client | ||
| 309 | finally: | ||
| 310 | async with self.client_condition: | ||
| 311 | self.avail_clients.append(client) | ||
| 312 | self.client_condition.notify() | ||
| 313 | |||
| 314 | def __thread_main(self, tasks): | ||
| 315 | async def process_task(task): | ||
| 316 | async with self.get_client() as client: | ||
| 317 | await task(client) | ||
| 318 | |||
| 319 | asyncio.set_event_loop(self.loop) | ||
| 320 | if not self.client_condition: | ||
| 321 | self.client_condition = asyncio.Condition() | ||
| 322 | tasks = [process_task(t) for t in tasks] | ||
| 323 | self.loop.run_until_complete(asyncio.gather(*tasks)) | ||
| 324 | |||
| 325 | async def __close_clients(self): | ||
| 326 | for c in self.avail_clients: | ||
| 327 | await c.close() | ||
| 328 | self.avail_clients = [] | ||
| 329 | self.num_clients = 0 | ||
| 330 | |||
| 331 | def __enter__(self): | ||
| 332 | return self | ||
| 333 | |||
| 334 | def __exit__(self, exc_type, exc_value, traceback): | ||
| 335 | self.close() | ||
| 336 | return False | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 775faf935a..d415617b20 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -352,83 +352,3 @@ class Client(bb.asyncrpc.Client): | |||
| 352 | 352 | ||
| 353 | def _get_async_client(self): | 353 | def _get_async_client(self): |
| 354 | return AsyncClient(self.username, self.password) | 354 | return AsyncClient(self.username, self.password) |
| 355 | |||
| 356 | |||
| 357 | class ClientPool(bb.asyncrpc.ClientPool): | ||
| 358 | def __init__( | ||
| 359 | self, | ||
| 360 | address, | ||
| 361 | max_clients, | ||
| 362 | *, | ||
| 363 | username=None, | ||
| 364 | password=None, | ||
| 365 | become=None, | ||
| 366 | ): | ||
| 367 | super().__init__(max_clients) | ||
| 368 | self.address = address | ||
| 369 | self.username = username | ||
| 370 | self.password = password | ||
| 371 | self.become = become | ||
| 372 | |||
| 373 | async def _new_client(self): | ||
| 374 | client = await create_async_client( | ||
| 375 | self.address, | ||
| 376 | username=self.username, | ||
| 377 | password=self.password, | ||
| 378 | ) | ||
| 379 | if self.become: | ||
| 380 | await client.become_user(self.become) | ||
| 381 | return client | ||
| 382 | |||
| 383 | def _run_key_tasks(self, queries, call): | ||
| 384 | results = {key: None for key in queries.keys()} | ||
| 385 | |||
| 386 | def make_task(key, args): | ||
| 387 | async def task(client): | ||
| 388 | nonlocal results | ||
| 389 | unihash = await call(client, args) | ||
| 390 | results[key] = unihash | ||
| 391 | |||
| 392 | return task | ||
| 393 | |||
| 394 | def gen_tasks(): | ||
| 395 | for key, args in queries.items(): | ||
| 396 | yield make_task(key, args) | ||
| 397 | |||
| 398 | self.run_tasks(gen_tasks()) | ||
| 399 | return results | ||
| 400 | |||
| 401 | def get_unihashes(self, queries): | ||
| 402 | """ | ||
| 403 | Query multiple unihashes in parallel. | ||
| 404 | |||
| 405 | The queries argument is a dictionary with arbitrary key. The values | ||
| 406 | must be a tuple of (method, taskhash). | ||
| 407 | |||
| 408 | Returns a dictionary with a corresponding key for each input key, and | ||
| 409 | the value is the queried unihash (which might be none if the query | ||
| 410 | failed) | ||
| 411 | """ | ||
| 412 | |||
| 413 | async def call(client, args): | ||
| 414 | method, taskhash = args | ||
| 415 | return await client.get_unihash(method, taskhash) | ||
| 416 | |||
| 417 | return self._run_key_tasks(queries, call) | ||
| 418 | |||
| 419 | def unihashes_exist(self, queries): | ||
| 420 | """ | ||
| 421 | Query multiple unihash existence checks in parallel. | ||
| 422 | |||
| 423 | The queries argument is a dictionary with arbitrary key. The values | ||
| 424 | must be a unihash. | ||
| 425 | |||
| 426 | Returns a dictionary with a corresponding key for each input key, and | ||
| 427 | the value is True or False if the unihash is known by the server (or | ||
| 428 | None if there was a failure) | ||
| 429 | """ | ||
| 430 | |||
| 431 | async def call(client, unihash): | ||
| 432 | return await client.unihash_exists(unihash) | ||
| 433 | |||
| 434 | return self._run_key_tasks(queries, call) | ||
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index cf74d9de7e..13ccb20ebf 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
| @@ -8,7 +8,6 @@ | |||
| 8 | from . import create_server, create_client | 8 | from . import create_server, create_client |
| 9 | from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS | 9 | from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS |
| 10 | from bb.asyncrpc import InvokeError | 10 | from bb.asyncrpc import InvokeError |
| 11 | from .client import ClientPool | ||
| 12 | import hashlib | 11 | import hashlib |
| 13 | import logging | 12 | import logging |
| 14 | import multiprocessing | 13 | import multiprocessing |
| @@ -552,45 +551,6 @@ class HashEquivalenceCommonTests(object): | |||
| 552 | # shares a taskhash with Task 2 | 551 | # shares a taskhash with Task 2 |
| 553 | self.assertClientGetHash(self.client, taskhash2, unihash2) | 552 | self.assertClientGetHash(self.client, taskhash2, unihash2) |
| 554 | 553 | ||
| 555 | |||
| 556 | def test_client_pool_get_unihashes(self): | ||
| 557 | TEST_INPUT = ( | ||
| 558 | # taskhash outhash unihash | ||
| 559 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | ||
| 560 | # Duplicated taskhash with multiple output hashes and unihashes. | ||
| 561 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), | ||
| 562 | # Equivalent hash | ||
| 563 | ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), | ||
| 564 | ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), | ||
| 565 | ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), | ||
| 566 | ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), | ||
| 567 | ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), | ||
| 568 | ) | ||
| 569 | EXTRA_QUERIES = ( | ||
| 570 | "6b6be7a84ab179b4240c4302518dc3f6", | ||
| 571 | ) | ||
| 572 | |||
| 573 | with ClientPool(self.server_address, 10) as client_pool: | ||
| 574 | for taskhash, outhash, unihash in TEST_INPUT: | ||
| 575 | self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 576 | |||
| 577 | query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} | ||
| 578 | for idx, taskhash in enumerate(EXTRA_QUERIES): | ||
| 579 | query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) | ||
| 580 | |||
| 581 | result = client_pool.get_unihashes(query) | ||
| 582 | |||
| 583 | self.assertDictEqual(result, { | ||
| 584 | 0: "218e57509998197d570e2c98512d0105985dffc9", | ||
| 585 | 1: "218e57509998197d570e2c98512d0105985dffc9", | ||
| 586 | 2: "218e57509998197d570e2c98512d0105985dffc9", | ||
| 587 | 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", | ||
| 588 | 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
| 589 | 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
| 590 | 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", | ||
| 591 | 7: None, | ||
| 592 | }) | ||
| 593 | |||
| 594 | def test_get_unihash_batch(self): | 554 | def test_get_unihash_batch(self): |
| 595 | TEST_INPUT = ( | 555 | TEST_INPUT = ( |
| 596 | # taskhash outhash unihash | 556 | # taskhash outhash unihash |
| @@ -628,48 +588,6 @@ class HashEquivalenceCommonTests(object): | |||
| 628 | None, | 588 | None, |
| 629 | ]) | 589 | ]) |
| 630 | 590 | ||
| 631 | def test_client_pool_unihash_exists(self): | ||
| 632 | TEST_INPUT = ( | ||
| 633 | # taskhash outhash unihash | ||
| 634 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | ||
| 635 | # Duplicated taskhash with multiple output hashes and unihashes. | ||
| 636 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), | ||
| 637 | # Equivalent hash | ||
| 638 | ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), | ||
| 639 | ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), | ||
| 640 | ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), | ||
| 641 | ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), | ||
| 642 | ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), | ||
| 643 | ) | ||
| 644 | EXTRA_QUERIES = ( | ||
| 645 | "6b6be7a84ab179b4240c4302518dc3f6", | ||
| 646 | ) | ||
| 647 | |||
| 648 | result_unihashes = set() | ||
| 649 | |||
| 650 | |||
| 651 | with ClientPool(self.server_address, 10) as client_pool: | ||
| 652 | for taskhash, outhash, unihash in TEST_INPUT: | ||
| 653 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 654 | result_unihashes.add(result["unihash"]) | ||
| 655 | |||
| 656 | query = {} | ||
| 657 | expected = {} | ||
| 658 | |||
| 659 | for _, _, unihash in TEST_INPUT: | ||
| 660 | idx = len(query) | ||
| 661 | query[idx] = unihash | ||
| 662 | expected[idx] = unihash in result_unihashes | ||
| 663 | |||
| 664 | |||
| 665 | for unihash in EXTRA_QUERIES: | ||
| 666 | idx = len(query) | ||
| 667 | query[idx] = unihash | ||
| 668 | expected[idx] = False | ||
| 669 | |||
| 670 | result = client_pool.unihashes_exist(query) | ||
| 671 | self.assertDictEqual(result, expected) | ||
| 672 | |||
| 673 | def test_unihash_exists_batch(self): | 591 | def test_unihash_exists_batch(self): |
| 674 | TEST_INPUT = ( | 592 | TEST_INPUT = ( |
| 675 | # taskhash outhash unihash | 593 | # taskhash outhash unihash |
