Skip to content

RequestQueue Classes

langbatch.request_queues.RequestQueue

Bases: ABC

RequestQueue is an abstract class for request queues. Implementations should provide a way to add and retrieve requests.

Used in BatchDispatcher to get requests.

Usage:

request_queue = InMemoryRequestQueue()
request_queue.add_requests([
    [
        {"role": "user", "content": "How can I learn Python?"}
    ],
    [
        {"role": "user", "content": "Who is the first president of the United States?"},
        {"role": "assistant", "content": "George Washington"},
        {"role": "user", "content": "Second?"}
    ]
])

batch_dispatcher = BatchDispatcher(
    batch_handler=batch_handler,
    queue=request_queue
)

asyncio.create_task(batch_dispatcher.run())

Source code in langbatch\request_queues.py
class RequestQueue(ABC):
    """
    RequestQueue is an abstract class for request queues.
    Implementations should provide a way to add and retrieve requests.

    Used in `BatchDispatcher` to get requests.

    Usage:
    ```python
    request_queue = InMemoryRequestQueue()
    request_queue.add_requests([
        [
            {"role": "user", "content": "How can I learn Python?"}
        ],
        [
            {"role": "user", "content": "Who is the first president of the United States?"},
            {"role": "assistant", "content": "George Washington"},
            {"role": "user", "content": "Second?"}
        ]
    ])

    batch_dispatcher = BatchDispatcher(
        batch_handler=batch_handler,
        queue=request_queue
    )

    asyncio.create_task(batch_dispatcher.run())
    ```
    """
    @abstractmethod
    def add_requests(self, requests: List[Any]):
        """
        Add requests to the queue
        """
        pass

    @abstractmethod
    def get_requests(self, count: int) -> List[Any]:
        """
        Get requests from the queue
        """
        pass

    @abstractmethod
    def __len__(self):
        pass

add_requests abstractmethod

add_requests(requests: List[Any])

Add requests to the queue

Source code in langbatch\request_queues.py
@abstractmethod
def add_requests(self, requests: List[Any]):
    """
    Add requests to the queue
    """
    pass

get_requests abstractmethod

get_requests(count: int) -> List[Any]

Get requests from the queue

Source code in langbatch\request_queues.py
@abstractmethod
def get_requests(self, count: int) -> List[Any]:
    """
    Get requests from the queue
    """
    pass

Info

Please make sure to pass the the correct type of requests to the queue as per the type of Batch you are going to use. For Example, if you are using ChatCompletionBatch as your batch type, then you should pass the requests in the same format as the ChatCompletionBatch.create() expects. For EmbeddingBatch, refer EmbeddingBatch.create()

langbatch.request_queues.InMemoryRequestQueue

Bases: RequestQueue

Source code in langbatch\request_queues.py
class InMemoryRequestQueue(RequestQueue):
    def __init__(self):
        self.queue = deque()

    def add_requests(self, requests: List[Any]):
        self.queue.extend(requests)
        logging.info(f"Added {len(requests)} requests to queue")

    def get_requests(self, count: int) -> List[Any]:
        if count > len(self.queue):
            count = len(self.queue)
        return [self.queue.popleft() for _ in range(count)]

    def __len__(self):
        return len(self.queue)

add_requests

add_requests(requests: List[Any])
Source code in langbatch\request_queues.py
def add_requests(self, requests: List[Any]):
    self.queue.extend(requests)
    logging.info(f"Added {len(requests)} requests to queue")

get_requests

get_requests(count: int) -> List[Any]
Source code in langbatch\request_queues.py
def get_requests(self, count: int) -> List[Any]:
    if count > len(self.queue):
        count = len(self.queue)
    return [self.queue.popleft() for _ in range(count)]

langbatch.request_queues.RedisRequestQueue

Bases: RequestQueue

RedisRequestQueue is a request queue that uses a Redis list to store requests.

Usage:

import os
import redis

REDIS_URL = os.environ.get('REDIS_URL')
redis_client = redis.from_url(REDIS_URL)

request_queue = RedisRequestQueue(redis_client, queue_name='turbo_requests')
request_queue.add_requests([
    [
        {"role": "user", "content": "How can I learn Python?"}
    ],
    [
        {"role": "user", "content": "Who is the first president of the United States?"},
        {"role": "assistant", "content": "George Washington"},
        {"role": "user", "content": "Second?"}
    ]
])

Source code in langbatch\request_queues.py
class RedisRequestQueue(RequestQueue):
    """
    RedisRequestQueue is a request queue that uses a Redis list to store requests.

    Usage:
    ```python
    import os
    import redis

    REDIS_URL = os.environ.get('REDIS_URL')
    redis_client = redis.from_url(REDIS_URL)

    request_queue = RedisRequestQueue(redis_client, queue_name='turbo_requests')
    request_queue.add_requests([
        [
            {"role": "user", "content": "How can I learn Python?"}
        ],
        [
            {"role": "user", "content": "Who is the first president of the United States?"},
            {"role": "assistant", "content": "George Washington"},
            {"role": "user", "content": "Second?"}
        ]
    ])
    ```
    """
    def __init__(self, redis_client: Any, queue_name: str = 'request_queue'):
        try:
            import redis
            if not isinstance(redis_client, redis.Redis):
                raise TypeError("redis_client must be an instance of redis.Redis")
        except ImportError:
            raise ImportError("redis package is required for RedisRequestQueue. Run: pip install langbatch[redis]")

        self.redis_client = redis_client
        self.queue_name = queue_name

    def add_requests(self, requests: List[Any]):
        count = len(requests)
        for request in requests:
            self.redis_client.rpush(self.queue_name, str(json.dumps(request)))
        logging.debug(f"Added {count} requests to queue.")

    def get_requests(self, count: int) -> List[Any]:
        size = len(self)
        if count > size:
            count = size

        if count == 0:
            return []

        items = self.redis_client.lpop(self.queue_name, count=count)
        if items is None:
            return []

        results = [json.loads(item.decode('utf-8')) for item in items]

        logging.debug(f"Retrieved {len(results)} requests from queue.")
        return results

    def __len__(self):
        length = self.redis_client.llen(self.queue_name)
        return length

__init__

__init__(redis_client: Any, queue_name: str = 'request_queue')
Source code in langbatch\request_queues.py
def __init__(self, redis_client: Any, queue_name: str = 'request_queue'):
    try:
        import redis
        if not isinstance(redis_client, redis.Redis):
            raise TypeError("redis_client must be an instance of redis.Redis")
    except ImportError:
        raise ImportError("redis package is required for RedisRequestQueue. Run: pip install langbatch[redis]")

    self.redis_client = redis_client
    self.queue_name = queue_name

add_requests

add_requests(requests: List[Any])
Source code in langbatch\request_queues.py
def add_requests(self, requests: List[Any]):
    count = len(requests)
    for request in requests:
        self.redis_client.rpush(self.queue_name, str(json.dumps(request)))
    logging.debug(f"Added {count} requests to queue.")

get_requests

get_requests(count: int) -> List[Any]
Source code in langbatch\request_queues.py
def get_requests(self, count: int) -> List[Any]:
    size = len(self)
    if count > size:
        count = size

    if count == 0:
        return []

    items = self.redis_client.lpop(self.queue_name, count=count)
    if items is None:
        return []

    results = [json.loads(item.decode('utf-8')) for item in items]

    logging.debug(f"Retrieved {len(results)} requests from queue.")
    return results