summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/asyncrpc/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py76
1 files changed, 1 insertions, 75 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index f81ad92f48..11179b0fcb 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -29,6 +29,7 @@ WEBSOCKETS_MIN_VERSION = (9, 1)
29if sys.version_info >= (3, 10, 0): 29if sys.version_info >= (3, 10, 0):
30 WEBSOCKETS_MIN_VERSION = (10, 0) 30 WEBSOCKETS_MIN_VERSION = (10, 0)
31 31
32
32def parse_address(addr): 33def parse_address(addr):
33 if addr.startswith(UNIX_PREFIX): 34 if addr.startswith(UNIX_PREFIX):
34 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) 35 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],))
@@ -259,78 +260,3 @@ class Client(object):
259 def __exit__(self, exc_type, exc_value, traceback): 260 def __exit__(self, exc_type, exc_value, traceback):
260 self.close() 261 self.close()
261 return False 262 return False
262
263
264class ClientPool(object):
265 def __init__(self, max_clients):
266 self.avail_clients = []
267 self.num_clients = 0
268 self.max_clients = max_clients
269 self.loop = None
270 self.client_condition = None
271
272 @abc.abstractmethod
273 async def _new_client(self):
274 raise NotImplementedError("Must be implemented in derived class")
275
276 def close(self):
277 if self.client_condition:
278 self.client_condition = None
279
280 if self.loop:
281 self.loop.run_until_complete(self.__close_clients())
282 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
283 self.loop.close()
284 self.loop = None
285
286 def run_tasks(self, tasks):
287 if not self.loop:
288 self.loop = asyncio.new_event_loop()
289
290 thread = Thread(target=self.__thread_main, args=(tasks,))
291 thread.start()
292 thread.join()
293
294 @contextlib.asynccontextmanager
295 async def get_client(self):
296 async with self.client_condition:
297 if self.avail_clients:
298 client = self.avail_clients.pop()
299 elif self.num_clients < self.max_clients:
300 self.num_clients += 1
301 client = await self._new_client()
302 else:
303 while not self.avail_clients:
304 await self.client_condition.wait()
305 client = self.avail_clients.pop()
306
307 try:
308 yield client
309 finally:
310 async with self.client_condition:
311 self.avail_clients.append(client)
312 self.client_condition.notify()
313
314 def __thread_main(self, tasks):
315 async def process_task(task):
316 async with self.get_client() as client:
317 await task(client)
318
319 asyncio.set_event_loop(self.loop)
320 if not self.client_condition:
321 self.client_condition = asyncio.Condition()
322 tasks = [process_task(t) for t in tasks]
323 self.loop.run_until_complete(asyncio.gather(*tasks))
324
325 async def __close_clients(self):
326 for c in self.avail_clients:
327 await c.close()
328 self.avail_clients = []
329 self.num_clients = 0
330
331 def __enter__(self):
332 return self
333
334 def __exit__(self, exc_type, exc_value, traceback):
335 self.close()
336 return False