diff options
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 76 |
1 files changed, 1 insertions, 75 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index f81ad92f48..11179b0fcb 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -29,6 +29,7 @@ WEBSOCKETS_MIN_VERSION = (9, 1) | |||
29 | if sys.version_info >= (3, 10, 0): | 29 | if sys.version_info >= (3, 10, 0): |
30 | WEBSOCKETS_MIN_VERSION = (10, 0) | 30 | WEBSOCKETS_MIN_VERSION = (10, 0) |
31 | 31 | ||
32 | |||
32 | def parse_address(addr): | 33 | def parse_address(addr): |
33 | if addr.startswith(UNIX_PREFIX): | 34 | if addr.startswith(UNIX_PREFIX): |
34 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) | 35 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) |
@@ -259,78 +260,3 @@ class Client(object): | |||
259 | def __exit__(self, exc_type, exc_value, traceback): | 260 | def __exit__(self, exc_type, exc_value, traceback): |
260 | self.close() | 261 | self.close() |
261 | return False | 262 | return False |
262 | |||
263 | |||
264 | class ClientPool(object): | ||
265 | def __init__(self, max_clients): | ||
266 | self.avail_clients = [] | ||
267 | self.num_clients = 0 | ||
268 | self.max_clients = max_clients | ||
269 | self.loop = None | ||
270 | self.client_condition = None | ||
271 | |||
272 | @abc.abstractmethod | ||
273 | async def _new_client(self): | ||
274 | raise NotImplementedError("Must be implemented in derived class") | ||
275 | |||
276 | def close(self): | ||
277 | if self.client_condition: | ||
278 | self.client_condition = None | ||
279 | |||
280 | if self.loop: | ||
281 | self.loop.run_until_complete(self.__close_clients()) | ||
282 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
283 | self.loop.close() | ||
284 | self.loop = None | ||
285 | |||
286 | def run_tasks(self, tasks): | ||
287 | if not self.loop: | ||
288 | self.loop = asyncio.new_event_loop() | ||
289 | |||
290 | thread = Thread(target=self.__thread_main, args=(tasks,)) | ||
291 | thread.start() | ||
292 | thread.join() | ||
293 | |||
294 | @contextlib.asynccontextmanager | ||
295 | async def get_client(self): | ||
296 | async with self.client_condition: | ||
297 | if self.avail_clients: | ||
298 | client = self.avail_clients.pop() | ||
299 | elif self.num_clients < self.max_clients: | ||
300 | self.num_clients += 1 | ||
301 | client = await self._new_client() | ||
302 | else: | ||
303 | while not self.avail_clients: | ||
304 | await self.client_condition.wait() | ||
305 | client = self.avail_clients.pop() | ||
306 | |||
307 | try: | ||
308 | yield client | ||
309 | finally: | ||
310 | async with self.client_condition: | ||
311 | self.avail_clients.append(client) | ||
312 | self.client_condition.notify() | ||
313 | |||
314 | def __thread_main(self, tasks): | ||
315 | async def process_task(task): | ||
316 | async with self.get_client() as client: | ||
317 | await task(client) | ||
318 | |||
319 | asyncio.set_event_loop(self.loop) | ||
320 | if not self.client_condition: | ||
321 | self.client_condition = asyncio.Condition() | ||
322 | tasks = [process_task(t) for t in tasks] | ||
323 | self.loop.run_until_complete(asyncio.gather(*tasks)) | ||
324 | |||
325 | async def __close_clients(self): | ||
326 | for c in self.avail_clients: | ||
327 | await c.close() | ||
328 | self.avail_clients = [] | ||
329 | self.num_clients = 0 | ||
330 | |||
331 | def __enter__(self): | ||
332 | return self | ||
333 | |||
334 | def __exit__(self, exc_type, exc_value, traceback): | ||
335 | self.close() | ||
336 | return False | ||