Skip to content

BatchDispatcher

langbatch.BatchDispatcher

BatchDispatcher

Batch dispatcher creates batches from requests in the queue and dispatches them to the batch handler. It periodically checks the queue size and time threshold, and creates a batch and dispatches it to the batch handler.

Usage:

# Create a batch dispatcher
batch_dispatcher = BatchDispatcher(
    batch_handler=batch_handler,
    queue=request_queue,
    queue_threshold=50000,
    time_threshold=3600 * 2,
    requests_type="partial",
    request_kwargs=request_kwargs
)

asyncio.create_task(batch_dispatcher.run())

Source code in langbatch\BatchDispatcher.py
class BatchDispatcher:
    """
    Batch dispatcher creates batches from requests in the queue and dispatches them to the batch handler.
    It periodically checks the queue size and time threshold, and creates a batch and dispatches it to the batch handler.

    Usage:
    ```python
    # Create a batch dispatcher
    batch_dispatcher = BatchDispatcher(
        batch_handler=batch_handler,
        queue=request_queue,
        queue_threshold=50000,
        time_threshold=3600 * 2,
        requests_type="partial",
        request_kwargs=request_kwargs
    )

    asyncio.create_task(batch_dispatcher.run())
    ```
    """

    def __init__(
            self, 
            batch_handler: BatchHandler, 
            queue: RequestQueue, 
            queue_threshold: int = 50000, 
            time_threshold: int = 3600 * 2, 
            time_interval: int = 600, 
            requests_type: Literal["partial", "full"] = "partial", 
            request_kwargs: Dict = {}
        ):
        self.batch_handler = batch_handler
        self.queue = queue
        self.queue_threshold = queue_threshold
        self.time_threshold = time_threshold
        self.time_interval = time_interval
        self.last_batch_time = time.time()
        self.requests_type = requests_type
        self.request_kwargs = request_kwargs

    async def run(self):
        """
        Start the batch dispatcher as a asynchronous background task.
        Periodically checks the queue size and time threshold, and creates a batch and dispatches it to the batch handler.

        Examples:
            ```python
            asyncio.create_task(batch_dispatcher.run())
            ```
        """
        while True:
            logger.info("Running batch dispatcher")
            await self._check_batch_conditions()
            await asyncio.sleep(self.time_interval)

    async def _check_batch_conditions(self):
        logger.info("Checking queue for batch creation")
        while True:
            current_time = time.time()
            queue_size = len(self.queue)
            has_threshold_requests = queue_size >= self.queue_threshold
            reached_time_threshold = (current_time - self.last_batch_time) >= self.time_threshold
            if has_threshold_requests or (reached_time_threshold and queue_size > 0):
                logger.info("Creating and dispatching batch")
                await self._create_and_dispatch_batch()
            else:
                logger.info("No batch conditions met, waiting for next check")
                break

    async def _create_and_dispatch_batch(self):
        try:
            logger.info("Creating batch")
            requests = await asyncio.to_thread(self.queue.get_requests, self.queue_threshold)
            batch_class = self.batch_handler.batch_type
            batch_kwargs = self.batch_handler.batch_kwargs
            if self.requests_type == "partial":
                batch = await asyncio.to_thread(batch_class.create, requests, self.request_kwargs, batch_kwargs)
            else:
                batch = await asyncio.to_thread(batch_class.create_from_requests, requests, batch_kwargs)
            self.last_batch_time = time.time()
            await self._dispatch_batch(batch)
        except BatchInitializationError as e:
            logger.warning(f"Failed to create batch: {str(e)}")

    async def _dispatch_batch(self, batch: Batch):
        logger.info(f"Dispatching batch {batch.id}")
        await asyncio.to_thread(batch.save, self.batch_handler.batch_storage)

        await self.batch_handler.add_batch(batch.id)
        logger.info(f"Batch {batch.id} dispatched successfully")

run async

run()

Start the batch dispatcher as a asynchronous background task. Periodically checks the queue size and time threshold, and creates a batch and dispatches it to the batch handler.

Examples:

asyncio.create_task(batch_dispatcher.run())
Source code in langbatch\BatchDispatcher.py
async def run(self):
    """
    Start the batch dispatcher as a asynchronous background task.
    Periodically checks the queue size and time threshold, and creates a batch and dispatches it to the batch handler.

    Examples:
        ```python
        asyncio.create_task(batch_dispatcher.run())
        ```
    """
    while True:
        logger.info("Running batch dispatcher")
        await self._check_batch_conditions()
        await asyncio.sleep(self.time_interval)