Source code for openfactory.assets.utils.async_loop

""" AsyncLoopThread """

import asyncio
import threading
from typing import Coroutine, Any


[docs] class AsyncLoopThread: """ Runs an asyncio event loop in a dedicated background thread. This allows running asynchronous coroutines in a synchronous context, without blocking the main thread. Example usage: loop_thread = AsyncLoopThread() future = loop_thread.run_coro(some_async_func()) result = future.result() # blocks until coroutine completes loop_thread.stop() """
[docs] def __init__(self) -> None: """ Initializes the asyncio loop and starts the background thread. """ self.loop = asyncio.new_event_loop() self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start()
def _run(self) -> None: """ Sets the event loop in this thread and runs it forever. """ asyncio.set_event_loop(self.loop) self.loop.run_forever()
[docs] def run_coro(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Future: """ Schedules a coroutine to run in the background loop. Args: coro (Coroutine): The coroutine to run in the loop. Returns: :class:`asyncio.Future`: A Future representing the execution of the coroutine. """ return asyncio.run_coroutine_threadsafe(coro, self.loop)
[docs] def stop(self) -> None: """ Stops the background event loop and joins the thread. This blocks until the thread has fully terminated. """ self.loop.call_soon_threadsafe(self.loop.stop) self.thread.join()