Skip to content

Anthropic ChatCompletionBatch

langbatch.anthropic.AnthropicChatCompletionBatch

Bases: AnthropicBatch, ChatCompletionBatch

AnthropicChatCompletionBatch is a class for Anthropic chat completion batches.

Usage:

batch = AnthropicChatCompletionBatch("path/to/file.jsonl", "claude-3-sonnet-20240229", "your-api-key")
batch.start()

Source code in langbatch\anthropic.py
class AnthropicChatCompletionBatch(AnthropicBatch, ChatCompletionBatch):
    """
    AnthropicChatCompletionBatch is a class for Anthropic chat completion batches.

    Usage:
    ```python
    batch = AnthropicChatCompletionBatch("path/to/file.jsonl", "claude-3-sonnet-20240229", "your-api-key")
    batch.start()
    ```
    """
    def _convert_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        converted_messages = []
        for message in messages:
            if message["role"] == "assistant" and message["tool_calls"]:
                converted_tool_calls = []
                for tool_call in message["tool_calls"]:
                    converted_tool_call = {
                        "type": "tool_use",
                        "id": tool_call["id"],
                        "name": tool_call["function"]["name"],
                        "input": tool_call["function"]["arguments"]
                    }
                    converted_tool_calls.append(converted_tool_call)
                converted_message = {"role": "assistant", "content": [converted_tool_call]}
            elif message["role"] == "tool":
                converted_message = {
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": message["tool_call_id"],
                            "content": message["content"]
                        }
                    ]
                }
            else:
                converted_message = {
                    "role": message["role"],
                    "content": self._convert_content(message["content"])
                }
            converted_messages.append(converted_message)
        return converted_messages

    def _convert_content(self, content: Any) -> List[Dict[str, Any]]:
        if isinstance(content, str):
            return [{"type": "text", "text": content}]
        elif isinstance(content, list):
            converted_content = []
            for item in content:
                if isinstance(item, str):
                    converted_content.append({"type": "text", "text": item})
                elif isinstance(item, dict):
                    if item["type"] == "text":
                        converted_content.append(item)
                    elif item["type"] == "image_url":
                        image_url = item["image_url"]["url"]
                        if image_url.startswith("data:"):
                            image_media_type = image_url.split(";")[0].split(":")[-1]
                            image_data = image_url.split(",")[1]
                        else:
                            image_media_type, image_data = get_web_image(image_url)

                        converted_content.append({
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": image_media_type,
                                "data": image_data
                            }
                        })

            return converted_content
        return []

    def _convert_tools(self, tools: Optional[List[Dict[str, Any]]]):
        if not tools:
            return None

        converted_tools = []
        for tool in tools:
            if tool["type"] == "function":
                converted_tool = {
                    "name": tool["function"]["name"],
                    "input_schema": tool["function"]["parameters"]
                }
                if tool["function"]["description"]:
                    converted_tool["description"] = tool["function"]["description"]
                converted_tools.append(converted_tool)
        return converted_tools

    def _convert_tool_choice(self, tools_given: bool, tool_choice: Optional[Dict[str, Any]], parallel_tool_calls: Optional[bool]):
        tool_choice_obj = None
        if tool_choice is None and tools_given:
            tool_choice_obj = {"type": "auto"}

        if isinstance(tool_choice, str):
            match tool_choice:
                case "auto":
                    tool_choice_obj = {"type": "auto"}
                case "required":
                    tool_choice_obj = {"type": "any"}
                case "none":
                    tool_choice_obj = {"type": "auto"} if tools_given else None
        elif isinstance(tool_choice, dict):
            if tool_choice["type"] == "function":
                return {"type": "tool", "name": tool_choice["function"]["name"]}

        # Handle parallel_tool_calls
        if parallel_tool_calls and tool_choice_obj:
            tool_choice_obj["disable_parallel_tool_use"] = parallel_tool_calls

        return tool_choice_obj

    def _convert_request(self, req: dict) -> Request:
        custom_id = req["custom_id"]
        request = AnthropicChatCompletionRequest(**req["body"])

        messages = []
        system = ""
        for message in request.messages:
            if message["role"] == "system":
                if isinstance(message["content"], str):
                    system = message["content"]
                elif isinstance(message["content"], dict):
                    try:
                        system = message["content"]["text"]
                    except KeyError:
                        pass
            else:
                messages.append(message)

        messages = self._convert_messages(messages)

        req = {
            "model": request.model,
            "messages": messages,
            "system": system
        }

        if request.max_tokens:
            req["max_tokens"] = request.max_tokens
        if request.temperature:
            req["temperature"] = request.temperature
        if request.top_p:
            req["top_p"] = request.top_p
        if request.stop:
            req["stop_sequences"] = request.stop
        if request.tools:
            tools = self._convert_tools(request.tools)
            tool_choice = self._convert_tool_choice(tools is not None, request.tool_choice, request.parallel_tool_calls)
            req["tools"] = tools
            req["tool_choice"] = tool_choice

        anthropic_request = Request(
            custom_id=custom_id,
            params=MessageCreateParamsNonStreaming(**req)
        )
        return anthropic_request

    def _convert_response_message(self, message):
        if isinstance(message.content, str):
            return {
                "role": message.role,
                "content": message.content
            }
        elif isinstance(message.content, list):
            tool_calls = []
            content = []
            for item in message.content:
                if item.type == "tool_use":
                    tool_calls.append(
                        {
                            "type": "function",
                            "id": item.id,
                            "function":{
                                "name": item.name,
                                "arguments": item.input
                            }
                        }
                    )
                else:
                    content.append(item.to_dict())

            return {
                "role": message.role,
                "content": content,
                "tool_calls": tool_calls
            }

    def _convert_response(self, response) -> dict:
        if response.result.type == "succeeded":
            message = response.result.message

            choice = {
                "index": 0,
                "logprobs": None,
                "finish_reason": message.stop_reason.lower(),
                "message": self._convert_response_message(message)
            }
            choices = [choice]
            usage = {
                "prompt_tokens": message.usage.input_tokens,
                "completion_tokens": message.usage.output_tokens,
                "total_tokens": message.usage.input_tokens + message.usage.output_tokens
            }
            body = {
                "id": f'{response.custom_id}',
                "object": "chat.completion",
                "created": int(time.time()),
                "model": message.model,
                "system_fingerprint": None,
                "choices": choices,
                "usage": usage
            }
            res = {
                "request_id": response.custom_id,
                "status_code": 200,
                "body": body,
            }

            error = None
        elif response.result.type == "errored":
            error = {
                "message": response.result.error.type,
                "code": response.result.error.type
            }
            res = None
        elif response.result.type == "expired":
            error = {
                "message": "Request expired",
                "code": "request_expired"
            }
            res = None

        # create output
        output = {
            "id": f'{response.custom_id}',
            "custom_id": response.custom_id,
            "response": res,
            "error": error
        }
        return output

    def _validate_request(self, request):
        AnthropicChatCompletionRequest(**request)

platform_batch_id class-attribute instance-attribute

platform_batch_id: str | None = None

id instance-attribute

id = str(uuid4())

client instance-attribute

client = client

__init__

__init__(file: str, client: Anthropic = Anthropic()) -> None

Initialize the AnthropicBatch class.

Parameters:

  • file (str) –

    The path to the jsonl file in OpenAI batch format.

  • client (Anthropic, default: Anthropic() ) –

    The Anthropic client.

Usage:

batch = AnthropicChatCompletionBatch(
    "path/to/file.jsonl"
)

Source code in langbatch\anthropic.py
def __init__(self, file: str, client: Anthropic = Anthropic()) -> None:
    """
    Initialize the AnthropicBatch class.

    Args:
        file (str): The path to the jsonl file in OpenAI batch format.
        client (Anthropic): The Anthropic client.

    Usage:
    ```python
    batch = AnthropicChatCompletionBatch(
        "path/to/file.jsonl"
    )
    ```
    """
    super().__init__(file)
    self.client = client

create_from_requests classmethod

create_from_requests(requests, batch_kwargs: Dict = {})

Creates a batch when given a list of requests. These requests should be in correct Batch API request format as per the Batch type. Ex. for OpenAIChatCompletionBatch, requests should be a Chat Completion request with custom_id.

Parameters:

  • requests –

    A list of requests.

  • batch_kwargs (Dict, default: {} ) –

    Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

Returns:

  • –

    An instance of the Batch class.

Raises:

  • ValueError –

    If the input data is invalid.

Usage:

batch = OpenAIChatCompletionBatch.create_from_requests([
    {   "custom_id": "request-1",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-4o-mini",
            "messages": [{"role": "user", "content": "Biryani Receipe, pls."}],
            "max_tokens": 1000
        }
    },
    {
        "custom_id": "request-2",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-4o-mini",
            "messages": [{"role": "user", "content": "Write a short story about AI"}],
            "max_tokens": 1000
        }
    }
]

Source code in langbatch\Batch.py
@classmethod
def create_from_requests(cls, requests, batch_kwargs: Dict = {}):
    """
    Creates a batch when given a list of requests. 
    These requests should be in correct Batch API request format as per the Batch type.
    Ex. for OpenAIChatCompletionBatch, requests should be a Chat Completion request with custom_id.

    Args:
        requests: A list of requests.
        batch_kwargs (Dict, optional): Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

    Returns:
        An instance of the Batch class.

    Raises:
        ValueError: If the input data is invalid.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch.create_from_requests([
        {   "custom_id": "request-1",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [{"role": "user", "content": "Biryani Receipe, pls."}],
                "max_tokens": 1000
            }
        },
        {
            "custom_id": "request-2",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [{"role": "user", "content": "Write a short story about AI"}],
                "max_tokens": 1000
            }
        }
    ]
    ``` 
    """

    file_path = cls._create_batch_file_from_requests(requests)

    if file_path is None:
        raise ValueError("Failed to create batch. Check the input data.")

    return cls(file_path, **batch_kwargs)

load classmethod

load(id: str, storage: BatchStorage = FileBatchStorage())

Load a batch from the storage and return a Batch object.

Parameters:

  • id (str) –

    The id of the batch.

  • storage (BatchStorage, default: FileBatchStorage() ) –

    The storage to load the batch from. Defaults to FileBatchStorage().

Returns:

  • Batch –

    The batch object.

Usage:

batch = OpenAIChatCompletionBatch.load("123", storage=FileBatchStorage("./data"))

Source code in langbatch\Batch.py
@classmethod
def load(cls, id: str, storage: BatchStorage = FileBatchStorage()):
    """
    Load a batch from the storage and return a Batch object.

    Args:
        id (str): The id of the batch.
        storage (BatchStorage, optional): The storage to load the batch from. Defaults to FileBatchStorage().

    Returns:
        Batch: The batch object.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch.load("123", storage=FileBatchStorage("./data"))
    ```
    """
    data_file, json_file = storage.load(id)

    with open(json_file, 'r') as f:
        meta_data = json.load(f)

    init_args = cls._get_init_args(meta_data)

    batch = cls(str(data_file), **init_args)
    batch.platform_batch_id = meta_data['platform_batch_id']
    batch.id = id

    return batch

save

save(storage: BatchStorage = FileBatchStorage())

Save the batch to the storage.

Parameters:

Usage:

batch = OpenAIChatCompletionBatch(file)
batch.save()

# save the batch to file storage
batch.save(storage=FileBatchStorage("./data"))

Source code in langbatch\Batch.py
def save(self, storage: BatchStorage = FileBatchStorage()):
    """
    Save the batch to the storage.

    Args:
        storage (BatchStorage, optional): The storage to save the batch to. Defaults to FileBatchStorage().

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch(file)
    batch.save()

    # save the batch to file storage
    batch.save(storage=FileBatchStorage("./data"))
    ```
    """
    meta_data = self._create_meta_data()
    meta_data["platform_batch_id"] = self.platform_batch_id

    storage.save(self.id, Path(self._file), meta_data)

start

start()
Source code in langbatch\anthropic.py
def start(self):
    if self.platform_batch_id is not None:
        raise ValueError("Batch already started")

    self._create_batch()

get_status

get_status()
Source code in langbatch\anthropic.py
def get_status(self):
    if self.platform_batch_id is None:
        raise ValueError("Batch not started")

    response = self.client.beta.messages.batches.retrieve(
        self.platform_batch_id
    )
    return anthropic_state_map[response.processing_status]

get_results_file

get_results_file()

Usage:

import jsonlines

# create a batch and start batch process
batch = OpenAIChatCompletionBatch(file)
batch.start()

if batch.get_status() == "completed":
    # get the results file
    results_file = batch.get_results_file()

    with jsonlines.open(results_file) as reader:
        for obj in reader:
            print(obj)

Source code in langbatch\Batch.py
def get_results_file(self):
    """
    Usage:
    ```python
    import jsonlines

    # create a batch and start batch process
    batch = OpenAIChatCompletionBatch(file)
    batch.start()

    if batch.get_status() == "completed":
        # get the results file
        results_file = batch.get_results_file()

        with jsonlines.open(results_file) as reader:
            for obj in reader:
                print(obj)
    ```
    """
    file_path = self._download_results_file()
    return file_path

get_results

get_results() -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]] | Tuple[None, None]

Retrieve the results of the chat completion batch.

Returns:

  • Tuple[List[Dict[str, Any]], List[Dict[str, Any]]] | Tuple[None, None] –

    A tuple containing successful and unsuccessful results. Successful results: A list of dictionaries with "choices" and "custom_id" keys. Unsuccessful results: A list of dictionaries with "error" and "custom_id" keys.

Usage:

successful_results, unsuccessful_results = batch.get_results()
for result in successful_results:
    print(result["choices"])

Source code in langbatch\ChatCompletionBatch.py
def get_results(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]] | Tuple[None, None]:
    """
    Retrieve the results of the chat completion batch.

    Returns:
        A tuple containing successful and unsuccessful results. Successful results: A list of dictionaries with "choices" and "custom_id" keys. Unsuccessful results: A list of dictionaries with "error" and "custom_id" keys.

    Usage:
    ```python
    successful_results, unsuccessful_results = batch.get_results()
    for result in successful_results:
        print(result["choices"])
    ```
    """
    process_func = lambda result: {"choices": result['response']['body']['choices']}
    return self._prepare_results(process_func)

is_retryable_failure

is_retryable_failure() -> bool
Source code in langbatch\anthropic.py
def is_retryable_failure(self) -> bool:
    status = self.get_status()
    if status == "errored" or status == "expired":
        return True
    else:
        return False

retry

retry()
Source code in langbatch\anthropic.py
def retry(self):
    if self.platform_batch_id is None:
        raise ValueError("Batch not started")

    self._create_batch()

get_unsuccessful_requests

get_unsuccessful_requests() -> List[Dict[str, Any]]

Retrieve the unsuccessful requests from the batch.

Returns:

  • List[Dict[str, Any]] –

    A list of requests that failed.

Usage:

batch = OpenAIChatCompletionBatch(file)
batch.start()

if batch.get_status() == "completed":
    # get the unsuccessful requests
    unsuccessful_requests = batch.get_unsuccessful_requests()

    for request in unsuccessful_requests:
        print(request["custom_id"])

Source code in langbatch\Batch.py
def get_unsuccessful_requests(self) -> List[Dict[str, Any]]:
    """
    Retrieve the unsuccessful requests from the batch.

    Returns:
        A list of requests that failed.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch(file)
    batch.start()

    if batch.get_status() == "completed":
        # get the unsuccessful requests
        unsuccessful_requests = batch.get_unsuccessful_requests()

        for request in unsuccessful_requests:
            print(request["custom_id"])
    ```
    """
    custom_ids = []
    _, unsuccessful_results = self.get_results()
    for result in unsuccessful_results:
        custom_ids.append(result["custom_id"])

    return self.get_requests_by_custom_ids(custom_ids)

get_requests_by_custom_ids

get_requests_by_custom_ids(custom_ids: List[str]) -> List[Dict[str, Any]]

Retrieve the requests from the batch file by custom ids.

Parameters:

  • custom_ids (List[str]) –

    A list of custom ids.

Returns:

  • List[Dict[str, Any]] –

    A list of requests.

Usage:

batch = OpenAIChatCompletionBatch(file)
batch.start()

if batch.get_status() == "completed":
    # get the requests by custom ids
    requests = batch.get_requests_by_custom_ids(["custom_id1", "custom_id2"])

    for request in requests:
        print(request["custom_id"])

Source code in langbatch\Batch.py
def get_requests_by_custom_ids(self, custom_ids: List[str]) -> List[Dict[str, Any]]:
    """
    Retrieve the requests from the batch file by custom ids.

    Args:
        custom_ids (List[str]): A list of custom ids.

    Returns:
        A list of requests.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch(file)
    batch.start()

    if batch.get_status() == "completed":
        # get the requests by custom ids
        requests = batch.get_requests_by_custom_ids(["custom_id1", "custom_id2"])

        for request in requests:
            print(request["custom_id"])
    ```
    """
    requests = []
    with jsonlines.open(self._file) as reader:
        for request in reader:
            if request["custom_id"] in custom_ids:
                requests.append(request)
    return requests

create classmethod

create(data: List[Iterable[ChatCompletionMessageParam]], request_kwargs: Dict = {}, batch_kwargs: Dict = {}) -> ChatCompletionBatch

Create a chat completion batch when given a list of messages.

Parameters:

  • data (List[Iterable[ChatCompletionMessageParam]]) –

    A list of messages to be sent to the API.

  • request_kwargs (Dict, default: {} ) –

    Additional keyword arguments for the API call. Ex. model, messages, etc.

  • batch_kwargs (Dict, default: {} ) –

    Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

Returns:

Raises:

  • ValueError –

    If the input data is invalid.

Usage:

batch = OpenAIChatCompletionBatch.create([
        [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of France?"}],
        [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of Germany?"}]
    ],
    request_kwargs={"model": "gpt-4o"})

# For Vertex AI
batch = VertexAIChatCompletionBatch.create([
        [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of France?"}],
        [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of Germany?"}]
    ],
    request_kwargs={"model": "gemini-1.5-flash-002"},
    batch_kwargs={
        "gcp_project": "your-gcp-project", 
        "bigquery_input_dataset": "your-bigquery-input-dataset", 
        "bigquery_output_dataset": "your-bigquery-output-dataset"
    })

Source code in langbatch\ChatCompletionBatch.py
@classmethod
def create(cls, data: List[Iterable[ChatCompletionMessageParam]], request_kwargs: Dict = {}, batch_kwargs: Dict = {}) -> "ChatCompletionBatch":
    """
    Create a chat completion batch when given a list of messages.

    Args:
        data (List[Iterable[ChatCompletionMessageParam]]): A list of messages to be sent to the API.
        request_kwargs (Dict): Additional keyword arguments for the API call. Ex. model, messages, etc.
        batch_kwargs (Dict): Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

    Returns:
        An instance of the ChatCompletionBatch class.

    Raises:
        ValueError: If the input data is invalid.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch.create([
            [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of France?"}],
            [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of Germany?"}]
        ],
        request_kwargs={"model": "gpt-4o"})

    # For Vertex AI
    batch = VertexAIChatCompletionBatch.create([
            [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of France?"}],
            [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of Germany?"}]
        ],
        request_kwargs={"model": "gemini-1.5-flash-002"},
        batch_kwargs={
            "gcp_project": "your-gcp-project", 
            "bigquery_input_dataset": "your-bigquery-input-dataset", 
            "bigquery_output_dataset": "your-bigquery-output-dataset"
        })
    ```
    """
    return cls._create_batch_file("messages", data, request_kwargs, batch_kwargs)