Skip to content

Batch

langbatch.Batch.Batch

Bases: ABC

Batch class is the base class for all batch classes.

Implementations of this class will be platform specific (OpenAI, Vertex AI, etc.)

Source code in langbatch\Batch.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
class Batch(ABC):
    """
    Batch class is the base class for all batch classes.

    Implementations of this class will be platform specific (OpenAI, Vertex AI, etc.)
    """
    _url: str = ""
    platform_batch_id: str | None = None

    def __init__(self, file: str):
        """
        Initialize the Batch class.

        Args:
            file (str): The path to the batch file. File should be in OpenAI compatible batch file in jsonl format.
        """
        self._file = file
        self.id = str(uuid.uuid4())

        self._validate_requests() # Validate the requests in the batch file

    @classmethod
    def _create_batch_file_from_requests(cls, requests) -> Path:
        try:
            batches_dir = Path(DATA_PATH) / "created_batches"
            batches_dir.mkdir(exist_ok=True, parents=True)

            id = str(uuid.uuid4())
            file_path = batches_dir / f"{id}.jsonl"
            with jsonlines.open(file_path, mode='w') as writer:
                writer.write_all(requests)
        except:
            logging.error(f"Error creating batch file", exc_info=True)
            return None

        return file_path

    @classmethod
    def _create_batch_file(cls, key: str, data: List[Any], request_kwargs: Dict = {}, batch_kwargs: Dict = {}) -> Path | None:
        """
        Create the batch file when given a list of items.
        For Chat Completions, this would be a list of messages.
        For Embeddings, this would be a list of texts.
        """
        requests = []
        try:
            for item in data:
                try:
                    body = request_kwargs.copy()  # Copy kwargs to avoid mutation
                    custom_id = str(uuid.uuid4())

                    body[key] = item

                    request = {
                        "custom_id": custom_id,
                        "method": "POST",
                        "url": cls._url,
                        "body": body
                    }
                    requests.append(request)
                except:
                    logging.warning(f"Error processing item {item}", exc_info= True)
                    continue
        except:
            logging.error(f"Error creating requests from data to create batch file", exc_info=True)
            return None

        file_path = cls._create_batch_file_from_requests(requests)

        if file_path is None:
            raise BatchInitializationError("Failed to create batch. Check the input data.")

        return cls(file_path, **batch_kwargs)

    @classmethod
    def create_from_requests(cls, requests, batch_kwargs: Dict = {}):
        """
        Creates a batch when given a list of requests. 
        These requests should be in correct Batch API request format as per the Batch type.
        Ex. for OpenAIChatCompletionBatch, requests should be a Chat Completion request with custom_id.

        Args:
            requests: A list of requests.
            batch_kwargs (Dict, optional): Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

        Returns:
            An instance of the Batch class.

        Raises:
            BatchInitializationError: If the input data is invalid.

        Usage:
        ```python
        batch = OpenAIChatCompletionBatch.create_from_requests([
            {   "custom_id": "request-1",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o-mini",
                    "messages": [{"role": "user", "content": "Biryani Receipe, pls."}],
                    "max_tokens": 1000
                }
            },
            {
                "custom_id": "request-2",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o-mini",
                    "messages": [{"role": "user", "content": "Write a short story about AI"}],
                    "max_tokens": 1000
                }
            }
        ]
        ``` 
        """

        file_path = cls._create_batch_file_from_requests(requests)

        if file_path is None:
            raise BatchInitializationError("Failed to create batch. Check the input data.")

        return cls(file_path, **batch_kwargs)

    @classmethod
    @abstractmethod
    def _get_init_args(cls, meta_data) -> Dict[str, Any]:
        """
        Get the init arguments from meta data json file when loading a batch from storage.
        """
        pass

    @classmethod
    def load(cls, id: str, storage: BatchStorage = FileBatchStorage(), batch_kwargs: Dict = {}):
        """
        Load a batch from the storage and return a Batch object.

        Args:
            id (str): The id of the batch.
            storage (BatchStorage, optional): The storage to load the batch from. Defaults to FileBatchStorage().
            batch_kwargs (Dict, optional): Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

        Returns:
            Batch: The batch object.

        Usage:
        ```python
        batch = OpenAIChatCompletionBatch.load("123", storage=FileBatchStorage("./data"))
        ```
        """
        data_file, meta_file = storage.load(id)

        # Load metadata based on file extension
        if meta_file.suffix == '.json':
            with open(meta_file, 'r') as f:
                meta_data = json.load(f)
        else:  # .pkl
            with open(meta_file, 'rb') as f:
                meta_data = pickle.load(f)

        init_args = cls._get_init_args(meta_data)

        for key, value in batch_kwargs.items():
            if key not in init_args:
                init_args[key] = value

        batch = cls(str(data_file), **init_args)
        batch.platform_batch_id = meta_data['platform_batch_id']
        batch.id = id

        return batch

    @abstractmethod
    def _create_meta_data(self) -> Dict[str, Any]:
        """
        Create the meta data for the batch to be saved in the storage.
        """
        pass

    def save(self, storage: BatchStorage = FileBatchStorage()):
        """
        Save the batch to the storage.

        Args:
            storage (BatchStorage, optional): The storage to save the batch to. Defaults to FileBatchStorage().

        Usage:
        ```python
        batch = OpenAIChatCompletionBatch(file)
        batch.save()

        # save the batch to file storage
        batch.save(storage=FileBatchStorage("./data"))
        ```
        """
        meta_data = self._create_meta_data()
        meta_data["platform_batch_id"] = self.platform_batch_id

        storage.save(self.id, Path(self._file), meta_data)

    @abstractmethod
    def _upload_batch_file(self):
        pass

    @abstractmethod
    def start(self):
        """
        Usage:
        ```python
        # create a batch
        batch = OpenAIChatCompletionBatch(file)

        # start the batch process
        batch.start()
        ```
        """
        pass

    @abstractmethod
    def get_status(self):
        """
        Usage:
        ```python
        # create a batch and start batch process
        batch = OpenAIChatCompletionBatch(file)
        batch.start()

        # get the status of the batch process
        status = batch.get_status()
        print(status)
        ```
        """
        pass

    def _get_requests(self) -> List[Dict[str, Any]]:
        """
        Get all the requests from the jsonl batch file.
        """
        requests = []
        try:
            with jsonlines.open(self._file) as reader:
                for obj in reader:
                    requests.append(obj)
        except:
            logging.error(f"Error reading requests from batch file", exc_info=True)
            raise BatchError("Error reading requests from batch file")

        return requests

    @abstractmethod
    def _validate_request(self, request):
        pass

    def _validate_requests(self) -> None:
        """
        Validate all the requests in the batch file before starting the batch process.

        Depends on the implementation of the _validate_request method in the subclass.
        """
        invalid_requests = []
        for request in self._get_requests():
            valid = True
            try:
                self._validate_request(request['body'])
            except:
                logging.info(f"Invalid request: {request}", exc_info=True)
                valid = False

            if not valid:
                invalid_requests.append(request['custom_id'])

        if len(invalid_requests) > 0:
            raise BatchValidationError(f"Invalid requests: {invalid_requests}")

        if len(self._get_requests()) == 0:
            raise BatchValidationError("No requests found in the batch file")

    def _create_results_file_path(self):
        results_dir = Path(DATA_PATH) / "results"
        results_dir.mkdir(exist_ok=True)

        return results_dir / f"{self.id}.jsonl"

    @abstractmethod
    def _download_results_file(self):
        pass

    # return results file in OpenAI compatible format
    def get_results_file(self):
        """
        Usage:
        ```python
        import jsonlines

        # create a batch and start batch process
        batch = OpenAIChatCompletionBatch(file)
        batch.start()

        if batch.get_status() == "completed":
            # get the results file
            results_file = batch.get_results_file()

            with jsonlines.open(results_file) as reader:
                for obj in reader:
                    print(obj)
        ```
        """
        file_path = self._download_results_file()
        return file_path

    def _prepare_results(
        self, process_func
    ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]] | Tuple[None, None]:
        """
        Prepare the results file by processing the results,
        and separating them into successful and unsuccessful results
        based on the status code of the response.

        Depends on the implementation of the process_func method in the subclass.
        """

        file_id = self._download_results_file()

        if file_id is None:
            return None, None

        try:
            results = []
            with jsonlines.open(file_id) as reader:
                for obj in reader:
                    results.append(obj)

            successful_results = []
            unsuccessful_results = []
            for result in results:
                if result['response'] is None:
                    if result['error'] is not None:
                        error = {
                            "custom_id": result['custom_id'],
                            "error": result['error']
                        }
                    else:
                        error = {
                            "custom_id": result['custom_id'],
                            "error": "No response from the API"
                        }
                    unsuccessful_results.append(error)
                    continue

                if result['response']['status_code'] == 200:
                    choices = {
                        "custom_id": result['custom_id'],
                        **process_func(result)
                    }
                    successful_results.append(choices)
                else:
                    error = {
                        "custom_id": result['custom_id'],
                        "error": result['error']
                    }
                    unsuccessful_results.append(error)

            return successful_results, unsuccessful_results
        except:
            logging.error(f"Error preparing results file", exc_info=True)
            return None, None

    # return results list
    @abstractmethod
    def get_results(self):
        pass

    @abstractmethod
    def is_retryable_failure(self) -> bool:
        pass

    # Retry on rate limit fail cases
    @abstractmethod
    def retry(self):
        pass

    def get_unsuccessful_requests(self) -> List[Dict[str, Any]]:
        """
        Retrieve the unsuccessful requests from the batch.

        Returns:
            A list of requests that failed.

        Usage:
        ```python
        batch = OpenAIChatCompletionBatch(file)
        batch.start()

        if batch.get_status() == "completed":
            # get the unsuccessful requests
            unsuccessful_requests = batch.get_unsuccessful_requests()

            for request in unsuccessful_requests:
                print(request["custom_id"])
        ```
        """
        custom_ids = []
        _, unsuccessful_results = self.get_results()
        for result in unsuccessful_results:
            custom_ids.append(result["custom_id"])

        return self.get_requests_by_custom_ids(custom_ids)

    def get_requests_by_custom_ids(self, custom_ids: List[str]) -> List[Dict[str, Any]]:
        """
        Retrieve the requests from the batch file by custom ids.

        Args:
            custom_ids (List[str]): A list of custom ids.

        Returns:
            A list of requests.

        Usage:
        ```python
        batch = OpenAIChatCompletionBatch(file)
        batch.start()

        if batch.get_status() == "completed":
            # get the requests by custom ids
            requests = batch.get_requests_by_custom_ids(["custom_id1", "custom_id2"])

            for request in requests:
                print(request["custom_id"])
        ```
        """
        requests = []
        with jsonlines.open(self._file) as reader:
            for request in reader:
                if request["custom_id"] in custom_ids:
                    requests.append(request)
        return requests

__init__

__init__(file: str)

Initialize the Batch class.

Parameters:

  • file (str) –

    The path to the batch file. File should be in OpenAI compatible batch file in jsonl format.

Source code in langbatch\Batch.py
def __init__(self, file: str):
    """
    Initialize the Batch class.

    Args:
        file (str): The path to the batch file. File should be in OpenAI compatible batch file in jsonl format.
    """
    self._file = file
    self.id = str(uuid.uuid4())

    self._validate_requests() # Validate the requests in the batch file

create_from_requests classmethod

create_from_requests(requests, batch_kwargs: Dict = {})

Creates a batch when given a list of requests. These requests should be in correct Batch API request format as per the Batch type. Ex. for OpenAIChatCompletionBatch, requests should be a Chat Completion request with custom_id.

Parameters:

  • requests –

    A list of requests.

  • batch_kwargs (Dict, default: {} ) –

    Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

Returns:

  • –

    An instance of the Batch class.

Raises:

  • BatchInitializationError –

    If the input data is invalid.

Usage:

batch = OpenAIChatCompletionBatch.create_from_requests([
    {   "custom_id": "request-1",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-4o-mini",
            "messages": [{"role": "user", "content": "Biryani Receipe, pls."}],
            "max_tokens": 1000
        }
    },
    {
        "custom_id": "request-2",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-4o-mini",
            "messages": [{"role": "user", "content": "Write a short story about AI"}],
            "max_tokens": 1000
        }
    }
]

Source code in langbatch\Batch.py
@classmethod
def create_from_requests(cls, requests, batch_kwargs: Dict = {}):
    """
    Creates a batch when given a list of requests. 
    These requests should be in correct Batch API request format as per the Batch type.
    Ex. for OpenAIChatCompletionBatch, requests should be a Chat Completion request with custom_id.

    Args:
        requests: A list of requests.
        batch_kwargs (Dict, optional): Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

    Returns:
        An instance of the Batch class.

    Raises:
        BatchInitializationError: If the input data is invalid.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch.create_from_requests([
        {   "custom_id": "request-1",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [{"role": "user", "content": "Biryani Receipe, pls."}],
                "max_tokens": 1000
            }
        },
        {
            "custom_id": "request-2",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [{"role": "user", "content": "Write a short story about AI"}],
                "max_tokens": 1000
            }
        }
    ]
    ``` 
    """

    file_path = cls._create_batch_file_from_requests(requests)

    if file_path is None:
        raise BatchInitializationError("Failed to create batch. Check the input data.")

    return cls(file_path, **batch_kwargs)

load classmethod

load(id: str, storage: BatchStorage = FileBatchStorage(), batch_kwargs: Dict = {})

Load a batch from the storage and return a Batch object.

Parameters:

  • id (str) –

    The id of the batch.

  • storage (BatchStorage, default: FileBatchStorage() ) –

    The storage to load the batch from. Defaults to FileBatchStorage().

  • batch_kwargs (Dict, default: {} ) –

    Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

Returns:

  • Batch –

    The batch object.

Usage:

batch = OpenAIChatCompletionBatch.load("123", storage=FileBatchStorage("./data"))

Source code in langbatch\Batch.py
@classmethod
def load(cls, id: str, storage: BatchStorage = FileBatchStorage(), batch_kwargs: Dict = {}):
    """
    Load a batch from the storage and return a Batch object.

    Args:
        id (str): The id of the batch.
        storage (BatchStorage, optional): The storage to load the batch from. Defaults to FileBatchStorage().
        batch_kwargs (Dict, optional): Additional keyword arguments for the batch class. Ex. gcp_project, etc. for VertexAIChatCompletionBatch.

    Returns:
        Batch: The batch object.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch.load("123", storage=FileBatchStorage("./data"))
    ```
    """
    data_file, meta_file = storage.load(id)

    # Load metadata based on file extension
    if meta_file.suffix == '.json':
        with open(meta_file, 'r') as f:
            meta_data = json.load(f)
    else:  # .pkl
        with open(meta_file, 'rb') as f:
            meta_data = pickle.load(f)

    init_args = cls._get_init_args(meta_data)

    for key, value in batch_kwargs.items():
        if key not in init_args:
            init_args[key] = value

    batch = cls(str(data_file), **init_args)
    batch.platform_batch_id = meta_data['platform_batch_id']
    batch.id = id

    return batch

save

save(storage: BatchStorage = FileBatchStorage())

Save the batch to the storage.

Parameters:

Usage:

batch = OpenAIChatCompletionBatch(file)
batch.save()

# save the batch to file storage
batch.save(storage=FileBatchStorage("./data"))

Source code in langbatch\Batch.py
def save(self, storage: BatchStorage = FileBatchStorage()):
    """
    Save the batch to the storage.

    Args:
        storage (BatchStorage, optional): The storage to save the batch to. Defaults to FileBatchStorage().

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch(file)
    batch.save()

    # save the batch to file storage
    batch.save(storage=FileBatchStorage("./data"))
    ```
    """
    meta_data = self._create_meta_data()
    meta_data["platform_batch_id"] = self.platform_batch_id

    storage.save(self.id, Path(self._file), meta_data)

start abstractmethod

start()

Usage:

# create a batch
batch = OpenAIChatCompletionBatch(file)

# start the batch process
batch.start()

Source code in langbatch\Batch.py
@abstractmethod
def start(self):
    """
    Usage:
    ```python
    # create a batch
    batch = OpenAIChatCompletionBatch(file)

    # start the batch process
    batch.start()
    ```
    """
    pass

get_status abstractmethod

get_status()

Usage:

# create a batch and start batch process
batch = OpenAIChatCompletionBatch(file)
batch.start()

# get the status of the batch process
status = batch.get_status()
print(status)

Source code in langbatch\Batch.py
@abstractmethod
def get_status(self):
    """
    Usage:
    ```python
    # create a batch and start batch process
    batch = OpenAIChatCompletionBatch(file)
    batch.start()

    # get the status of the batch process
    status = batch.get_status()
    print(status)
    ```
    """
    pass

get_results_file

get_results_file()

Usage:

import jsonlines

# create a batch and start batch process
batch = OpenAIChatCompletionBatch(file)
batch.start()

if batch.get_status() == "completed":
    # get the results file
    results_file = batch.get_results_file()

    with jsonlines.open(results_file) as reader:
        for obj in reader:
            print(obj)

Source code in langbatch\Batch.py
def get_results_file(self):
    """
    Usage:
    ```python
    import jsonlines

    # create a batch and start batch process
    batch = OpenAIChatCompletionBatch(file)
    batch.start()

    if batch.get_status() == "completed":
        # get the results file
        results_file = batch.get_results_file()

        with jsonlines.open(results_file) as reader:
            for obj in reader:
                print(obj)
    ```
    """
    file_path = self._download_results_file()
    return file_path

get_unsuccessful_requests

get_unsuccessful_requests() -> List[Dict[str, Any]]

Retrieve the unsuccessful requests from the batch.

Returns:

  • List[Dict[str, Any]] –

    A list of requests that failed.

Usage:

batch = OpenAIChatCompletionBatch(file)
batch.start()

if batch.get_status() == "completed":
    # get the unsuccessful requests
    unsuccessful_requests = batch.get_unsuccessful_requests()

    for request in unsuccessful_requests:
        print(request["custom_id"])

Source code in langbatch\Batch.py
def get_unsuccessful_requests(self) -> List[Dict[str, Any]]:
    """
    Retrieve the unsuccessful requests from the batch.

    Returns:
        A list of requests that failed.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch(file)
    batch.start()

    if batch.get_status() == "completed":
        # get the unsuccessful requests
        unsuccessful_requests = batch.get_unsuccessful_requests()

        for request in unsuccessful_requests:
            print(request["custom_id"])
    ```
    """
    custom_ids = []
    _, unsuccessful_results = self.get_results()
    for result in unsuccessful_results:
        custom_ids.append(result["custom_id"])

    return self.get_requests_by_custom_ids(custom_ids)

get_requests_by_custom_ids

get_requests_by_custom_ids(custom_ids: List[str]) -> List[Dict[str, Any]]

Retrieve the requests from the batch file by custom ids.

Parameters:

  • custom_ids (List[str]) –

    A list of custom ids.

Returns:

  • List[Dict[str, Any]] –

    A list of requests.

Usage:

batch = OpenAIChatCompletionBatch(file)
batch.start()

if batch.get_status() == "completed":
    # get the requests by custom ids
    requests = batch.get_requests_by_custom_ids(["custom_id1", "custom_id2"])

    for request in requests:
        print(request["custom_id"])

Source code in langbatch\Batch.py
def get_requests_by_custom_ids(self, custom_ids: List[str]) -> List[Dict[str, Any]]:
    """
    Retrieve the requests from the batch file by custom ids.

    Args:
        custom_ids (List[str]): A list of custom ids.

    Returns:
        A list of requests.

    Usage:
    ```python
    batch = OpenAIChatCompletionBatch(file)
    batch.start()

    if batch.get_status() == "completed":
        # get the requests by custom ids
        requests = batch.get_requests_by_custom_ids(["custom_id1", "custom_id2"])

        for request in requests:
            print(request["custom_id"])
    ```
    """
    requests = []
    with jsonlines.open(self._file) as reader:
        for request in reader:
            if request["custom_id"] in custom_ids:
                requests.append(request)
    return requests