Skip to content

guidellm.backend

Backend

Bases: ABC

Abstract base class for generative AI backends.

This class provides a common interface for creating and interacting with different generative AI backends. Subclasses should implement the abstract methods to define specific backend behavior.

Parameters:

Name Type Description Default
type_ BackendType

The type of the backend.

required

Attributes:

Name Type Description
_registry dict[BackendType, type[Backend]]

A registration dictionary that maps BackendType to backend classes.

Source code in src/guidellm/backend/backend.py
class Backend(ABC):
    """
    Abstract base class for generative AI backends.

    This class provides a common interface for creating and interacting with different
    generative AI backends. Subclasses should implement the abstract methods to
    define specific backend behavior.

    :cvar _registry: A registration dictionary that maps BackendType to backend classes.
    :param type_: The type of the backend.
    """

    _registry: dict[BackendType, "type[Backend]"] = {}

    @classmethod
    def register(cls, backend_type: BackendType):
        """
        A decorator to register a backend class in the backend registry.

        :param backend_type: The type of backend to register.
        :type backend_type: BackendType
        :return: The decorated backend class.
        :rtype: Type[Backend]
        """
        if backend_type in cls._registry:
            raise ValueError(f"Backend type already registered: {backend_type}")

        if not issubclass(cls, Backend):
            raise TypeError("Only subclasses of Backend can be registered")

        def inner_wrapper(wrapped_class: type["Backend"]):
            cls._registry[backend_type] = wrapped_class
            logger.info("Registered backend type: {}", backend_type)
            return wrapped_class

        return inner_wrapper

    @classmethod
    def create(cls, type_: BackendType, **kwargs) -> "Backend":
        """
        Factory method to create a backend instance based on the backend type.

        :param type_: The type of backend to create.
        :type type_: BackendType
        :param kwargs: Additional arguments for backend initialization.
        :return: An instance of a subclass of Backend.
        :rtype: Backend
        :raises ValueError: If the backend type is not registered.
        """

        logger.info("Creating backend of type {}", type_)

        if type_ not in cls._registry:
            err = ValueError(f"Unsupported backend type: {type_}")
            logger.error("{}", err)
            raise err

        return Backend._registry[type_](**kwargs)

    def __init__(self, type_: BackendType):
        self._type = type_

    @property
    def type_(self) -> BackendType:
        """
        :return: The type of the backend.
        """
        return self._type

    @property
    @abstractmethod
    def target(self) -> str:
        """
        :return: The target location for the backend.
        """
        ...

    @property
    @abstractmethod
    def model(self) -> Optional[str]:
        """
        :return: The model used for the backend requests.
        """
        ...

    @property
    @abstractmethod
    def info(self) -> dict[str, Any]:
        """
        :return: The information about the backend.
        """
        ...

    async def validate(self):
        """
        Handle final setup and validate the backend is ready for use.
        If not successful, raises the appropriate exception.
        """
        logger.info("{} validating backend {}", self.__class__.__name__, self.type_)
        await self.check_setup()
        models = await self.available_models()
        if not models:
            raise ValueError("No models available for the backend")

        async for _ in self.text_completions(
            prompt="Test connection", output_token_count=1
        ):  # type: ignore[attr-defined]
            pass

    @abstractmethod
    async def check_setup(self):
        """
        Check the setup for the backend.
        If unsuccessful, raises the appropriate exception.

        :raises ValueError: If the setup check fails.
        """
        ...

    @abstractmethod
    async def prepare_multiprocessing(self):
        """
        Prepare the backend for use in a multiprocessing environment.
        This is useful for backends that have instance state that can not
        be shared across processes and should be cleared out and re-initialized
        for each new process.
        """
        ...

    @abstractmethod
    async def available_models(self) -> list[str]:
        """
        Get the list of available models for the backend.

        :return: The list of available models.
        :rtype: List[str]
        """
        ...

    @abstractmethod
    async def text_completions(
        self,
        prompt: Union[str, list[str]],
        request_id: Optional[str] = None,
        prompt_token_count: Optional[int] = None,
        output_token_count: Optional[int] = None,
        **kwargs,
    ) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
        """
        Generate text only completions for the given prompt.
        Does not support multiple modalities, complicated chat interfaces,
        or chat templates. Specifically, it requests with only the prompt.

        :param prompt: The prompt (or list of prompts) to generate a completion for.
            If a list is supplied, these are concatenated and run through the model
            for a single prompt.
        :param request_id: The unique identifier for the request, if any.
            Added to logging statements and the response for tracking purposes.
        :param prompt_token_count: The number of tokens measured in the prompt, if any.
            Returned in the response stats for later analysis, if applicable.
        :param output_token_count: If supplied, the number of tokens to enforce
            generation of for the output for this request.
        :param kwargs: Additional keyword arguments to pass with the request.
        :return: An async generator that yields a StreamingTextResponse for start,
            a StreamingTextResponse for each received iteration,
            and a ResponseSummary for the final response.
        """
        ...

    @abstractmethod
    async def chat_completions(
        self,
        content: Union[
            str,
            list[Union[str, dict[str, Union[str, dict[str, str]]], Path, Image.Image]],
            Any,
        ],
        request_id: Optional[str] = None,
        prompt_token_count: Optional[int] = None,
        output_token_count: Optional[int] = None,
        raw_content: bool = False,
        **kwargs,
    ) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
        """
        Generate chat completions for the given content.
        Supports multiple modalities, complicated chat interfaces, and chat templates.
        Specifically, it requests with the content, which can be any combination of
        text, images, and audio provided the target model supports it,
        and returns the output text. Additionally, any chat templates
        for the model are applied within the backend.

        :param content: The content (or list of content) to generate a completion for.
            This supports any combination of text, images, and audio (model dependent).
            Supported text only request examples:
                content="Sample prompt", content=["Sample prompt", "Second prompt"],
                content=[{"type": "text", "value": "Sample prompt"}.
            Supported text and image request examples:
                content=["Describe the image", PIL.Image.open("image.jpg")],
                content=["Describe the image", Path("image.jpg")],
                content=["Describe the image", {"type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}].
            Supported text and audio request examples:
                content=["Transcribe the audio", Path("audio.wav")],
                content=["Transcribe the audio", {"type": "input_audio",
                "input_audio": {"data": f"{base64_bytes}", "format": "wav}].
            Additionally, if raw_content=True then the content is passed directly to the
            backend without any processing.
        :param request_id: The unique identifier for the request, if any.
            Added to logging statements and the response for tracking purposes.
        :param prompt_token_count: The number of tokens measured in the prompt, if any.
            Returned in the response stats for later analysis, if applicable.
        :param output_token_count: If supplied, the number of tokens to enforce
            generation of for the output for this request.
        :param kwargs: Additional keyword arguments to pass with the request.
        :return: An async generator that yields a StreamingTextResponse for start,
            a StreamingTextResponse for each received iteration,
            and a ResponseSummary for the final response.
        """
        ...

info abstractmethod property

Returns:

Type Description
dict[str, Any]

The information about the backend.

model abstractmethod property

Returns:

Type Description
Optional[str]

The model used for the backend requests.

target abstractmethod property

Returns:

Type Description
str

The target location for the backend.

type_ property

Returns:

Type Description
BackendType

The type of the backend.

available_models() abstractmethod async

Get the list of available models for the backend.

Returns:

Type Description
List[str]

The list of available models.

Source code in src/guidellm/backend/backend.py
@abstractmethod
async def available_models(self) -> list[str]:
    """
    Get the list of available models for the backend.

    :return: The list of available models.
    :rtype: List[str]
    """
    ...

chat_completions(content, request_id=None, prompt_token_count=None, output_token_count=None, raw_content=False, **kwargs) abstractmethod async

Generate chat completions for the given content. Supports multiple modalities, complicated chat interfaces, and chat templates. Specifically, it requests with the content, which can be any combination of text, images, and audio provided the target model supports it, and returns the output text. Additionally, any chat templates for the model are applied within the backend.

Parameters:

Name Type Description Default
content Union[str, list[Union[str, dict[str, Union[str, dict[str, str]]], Path, Image]], Any]

The content (or list of content) to generate a completion for. This supports any combination of text, images, and audio (model dependent). Supported text only request examples: content="Sample prompt", content=["Sample prompt", "Second prompt"], content=[{"type": "text", "value": "Sample prompt"}. Supported text and image request examples: content=["Describe the image", PIL.Image.open("image.jpg")], content=["Describe the image", Path("image.jpg")], content=["Describe the image", {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}]. Supported text and audio request examples: content=["Transcribe the audio", Path("audio.wav")], content=["Transcribe the audio", {"type": "input_audio", "input_audio": {"data": f"{base64_bytes}", "format": "wav}]. Additionally, if raw_content=True then the content is passed directly to the backend without any processing.

required
request_id Optional[str]

The unique identifier for the request, if any. Added to logging statements and the response for tracking purposes.

None
prompt_token_count Optional[int]

The number of tokens measured in the prompt, if any. Returned in the response stats for later analysis, if applicable.

None
output_token_count Optional[int]

If supplied, the number of tokens to enforce generation of for the output for this request.

None
kwargs

Additional keyword arguments to pass with the request.

{}

Returns:

Type Description
AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]

An async generator that yields a StreamingTextResponse for start, a StreamingTextResponse for each received iteration, and a ResponseSummary for the final response.

Source code in src/guidellm/backend/backend.py
@abstractmethod
async def chat_completions(
    self,
    content: Union[
        str,
        list[Union[str, dict[str, Union[str, dict[str, str]]], Path, Image.Image]],
        Any,
    ],
    request_id: Optional[str] = None,
    prompt_token_count: Optional[int] = None,
    output_token_count: Optional[int] = None,
    raw_content: bool = False,
    **kwargs,
) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
    """
    Generate chat completions for the given content.
    Supports multiple modalities, complicated chat interfaces, and chat templates.
    Specifically, it requests with the content, which can be any combination of
    text, images, and audio provided the target model supports it,
    and returns the output text. Additionally, any chat templates
    for the model are applied within the backend.

    :param content: The content (or list of content) to generate a completion for.
        This supports any combination of text, images, and audio (model dependent).
        Supported text only request examples:
            content="Sample prompt", content=["Sample prompt", "Second prompt"],
            content=[{"type": "text", "value": "Sample prompt"}.
        Supported text and image request examples:
            content=["Describe the image", PIL.Image.open("image.jpg")],
            content=["Describe the image", Path("image.jpg")],
            content=["Describe the image", {"type": "image_url",
                "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}].
        Supported text and audio request examples:
            content=["Transcribe the audio", Path("audio.wav")],
            content=["Transcribe the audio", {"type": "input_audio",
            "input_audio": {"data": f"{base64_bytes}", "format": "wav}].
        Additionally, if raw_content=True then the content is passed directly to the
        backend without any processing.
    :param request_id: The unique identifier for the request, if any.
        Added to logging statements and the response for tracking purposes.
    :param prompt_token_count: The number of tokens measured in the prompt, if any.
        Returned in the response stats for later analysis, if applicable.
    :param output_token_count: If supplied, the number of tokens to enforce
        generation of for the output for this request.
    :param kwargs: Additional keyword arguments to pass with the request.
    :return: An async generator that yields a StreamingTextResponse for start,
        a StreamingTextResponse for each received iteration,
        and a ResponseSummary for the final response.
    """
    ...

check_setup() abstractmethod async

Check the setup for the backend. If unsuccessful, raises the appropriate exception.

Raises:

Type Description
ValueError

If the setup check fails.

Source code in src/guidellm/backend/backend.py
@abstractmethod
async def check_setup(self):
    """
    Check the setup for the backend.
    If unsuccessful, raises the appropriate exception.

    :raises ValueError: If the setup check fails.
    """
    ...

create(type_, **kwargs) classmethod

Factory method to create a backend instance based on the backend type.

Parameters:

Name Type Description Default
type_ BackendType

The type of backend to create.

required
kwargs

Additional arguments for backend initialization.

{}

Returns:

Type Description
Backend

An instance of a subclass of Backend.

Raises:

Type Description
ValueError

If the backend type is not registered.

Source code in src/guidellm/backend/backend.py
@classmethod
def create(cls, type_: BackendType, **kwargs) -> "Backend":
    """
    Factory method to create a backend instance based on the backend type.

    :param type_: The type of backend to create.
    :type type_: BackendType
    :param kwargs: Additional arguments for backend initialization.
    :return: An instance of a subclass of Backend.
    :rtype: Backend
    :raises ValueError: If the backend type is not registered.
    """

    logger.info("Creating backend of type {}", type_)

    if type_ not in cls._registry:
        err = ValueError(f"Unsupported backend type: {type_}")
        logger.error("{}", err)
        raise err

    return Backend._registry[type_](**kwargs)

prepare_multiprocessing() abstractmethod async

Prepare the backend for use in a multiprocessing environment. This is useful for backends that have instance state that can not be shared across processes and should be cleared out and re-initialized for each new process.

Source code in src/guidellm/backend/backend.py
@abstractmethod
async def prepare_multiprocessing(self):
    """
    Prepare the backend for use in a multiprocessing environment.
    This is useful for backends that have instance state that can not
    be shared across processes and should be cleared out and re-initialized
    for each new process.
    """
    ...

register(backend_type) classmethod

A decorator to register a backend class in the backend registry.

Parameters:

Name Type Description Default
backend_type BackendType

The type of backend to register.

required

Returns:

Type Description
Type[Backend]

The decorated backend class.

Source code in src/guidellm/backend/backend.py
@classmethod
def register(cls, backend_type: BackendType):
    """
    A decorator to register a backend class in the backend registry.

    :param backend_type: The type of backend to register.
    :type backend_type: BackendType
    :return: The decorated backend class.
    :rtype: Type[Backend]
    """
    if backend_type in cls._registry:
        raise ValueError(f"Backend type already registered: {backend_type}")

    if not issubclass(cls, Backend):
        raise TypeError("Only subclasses of Backend can be registered")

    def inner_wrapper(wrapped_class: type["Backend"]):
        cls._registry[backend_type] = wrapped_class
        logger.info("Registered backend type: {}", backend_type)
        return wrapped_class

    return inner_wrapper

text_completions(prompt, request_id=None, prompt_token_count=None, output_token_count=None, **kwargs) abstractmethod async

Generate text only completions for the given prompt. Does not support multiple modalities, complicated chat interfaces, or chat templates. Specifically, it requests with only the prompt.

Parameters:

Name Type Description Default
prompt Union[str, list[str]]

The prompt (or list of prompts) to generate a completion for. If a list is supplied, these are concatenated and run through the model for a single prompt.

required
request_id Optional[str]

The unique identifier for the request, if any. Added to logging statements and the response for tracking purposes.

None
prompt_token_count Optional[int]

The number of tokens measured in the prompt, if any. Returned in the response stats for later analysis, if applicable.

None
output_token_count Optional[int]

If supplied, the number of tokens to enforce generation of for the output for this request.

None
kwargs

Additional keyword arguments to pass with the request.

{}

Returns:

Type Description
AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]

An async generator that yields a StreamingTextResponse for start, a StreamingTextResponse for each received iteration, and a ResponseSummary for the final response.

Source code in src/guidellm/backend/backend.py
@abstractmethod
async def text_completions(
    self,
    prompt: Union[str, list[str]],
    request_id: Optional[str] = None,
    prompt_token_count: Optional[int] = None,
    output_token_count: Optional[int] = None,
    **kwargs,
) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
    """
    Generate text only completions for the given prompt.
    Does not support multiple modalities, complicated chat interfaces,
    or chat templates. Specifically, it requests with only the prompt.

    :param prompt: The prompt (or list of prompts) to generate a completion for.
        If a list is supplied, these are concatenated and run through the model
        for a single prompt.
    :param request_id: The unique identifier for the request, if any.
        Added to logging statements and the response for tracking purposes.
    :param prompt_token_count: The number of tokens measured in the prompt, if any.
        Returned in the response stats for later analysis, if applicable.
    :param output_token_count: If supplied, the number of tokens to enforce
        generation of for the output for this request.
    :param kwargs: Additional keyword arguments to pass with the request.
    :return: An async generator that yields a StreamingTextResponse for start,
        a StreamingTextResponse for each received iteration,
        and a ResponseSummary for the final response.
    """
    ...

validate() async

Handle final setup and validate the backend is ready for use. If not successful, raises the appropriate exception.

Source code in src/guidellm/backend/backend.py
async def validate(self):
    """
    Handle final setup and validate the backend is ready for use.
    If not successful, raises the appropriate exception.
    """
    logger.info("{} validating backend {}", self.__class__.__name__, self.type_)
    await self.check_setup()
    models = await self.available_models()
    if not models:
        raise ValueError("No models available for the backend")

    async for _ in self.text_completions(
        prompt="Test connection", output_token_count=1
    ):  # type: ignore[attr-defined]
        pass

OpenAIHTTPBackend

Bases: Backend

A HTTP-based backend implementation for requests to an OpenAI compatible server. For example, a vLLM server instance or requests to OpenAI's API.

Parameters:

Name Type Description Default
target Optional[str]

The target URL string for the OpenAI server. ex: http://0.0.0.0:8000

None
model Optional[str]

The model to use for all requests on the target server. If none is provided, the first available model will be used.

None
api_key Optional[str]

The API key to use for requests to the OpenAI server. If provided, adds an Authorization header with the value "Authorization: Bearer {api_key}". If not provided, no Authorization header is added.

None
organization Optional[str]

The organization to use for requests to the OpenAI server. For example, if set to "org_123", adds an OpenAI-Organization header with the value "OpenAI-Organization: org_123". If not provided, no OpenAI-Organization header is added.

None
project Optional[str]

The project to use for requests to the OpenAI server. For example, if set to "project_123", adds an OpenAI-Project header with the value "OpenAI-Project: project_123". If not provided, no OpenAI-Project header is added.

None
timeout Optional[float]

The timeout to use for requests to the OpenAI server. If not provided, the default timeout provided from settings is used.

None
http2 Optional[bool]

If True, uses HTTP/2 for requests to the OpenAI server. Defaults to True.

True
follow_redirects Optional[bool]

If True, the HTTP client will follow redirect responses. If not provided, the default value from settings is used.

None
max_output_tokens Optional[int]

The maximum number of tokens to request for completions. If not provided, the default maximum tokens provided from settings is used.

None
extra_query Optional[dict]

Query parameters to include in requests to the OpenAI server. If "chat_completions", "models", or "text_completions" are included as keys, the values of these keys will be used as the parameters for the respective endpoint. If not provided, no extra query parameters are added.

None
Source code in src/guidellm/backend/openai.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
@Backend.register("openai_http")
class OpenAIHTTPBackend(Backend):
    """
    A HTTP-based backend implementation for requests to an OpenAI compatible server.
    For example, a vLLM server instance or requests to OpenAI's API.

    :param target: The target URL string for the OpenAI server. ex: http://0.0.0.0:8000
    :param model: The model to use for all requests on the target server.
        If none is provided, the first available model will be used.
    :param api_key: The API key to use for requests to the OpenAI server.
        If provided, adds an Authorization header with the value
        "Authorization: Bearer {api_key}".
        If not provided, no Authorization header is added.
    :param organization: The organization to use for requests to the OpenAI server.
        For example, if set to "org_123", adds an OpenAI-Organization header with the
        value "OpenAI-Organization: org_123".
        If not provided, no OpenAI-Organization header is added.
    :param project: The project to use for requests to the OpenAI server.
        For example, if set to "project_123", adds an OpenAI-Project header with the
        value "OpenAI-Project: project_123".
        If not provided, no OpenAI-Project header is added.
    :param timeout: The timeout to use for requests to the OpenAI server.
        If not provided, the default timeout provided from settings is used.
    :param http2: If True, uses HTTP/2 for requests to the OpenAI server.
        Defaults to True.
    :param follow_redirects: If True, the HTTP client will follow redirect responses.
        If not provided, the default value from settings is used.
    :param max_output_tokens: The maximum number of tokens to request for completions.
        If not provided, the default maximum tokens provided from settings is used.
    :param extra_query: Query parameters to include in requests to the OpenAI server.
        If "chat_completions", "models", or "text_completions" are included as keys,
        the values of these keys will be used as the parameters for the respective
        endpoint.
        If not provided, no extra query parameters are added.
    """

    def __init__(
        self,
        target: Optional[str] = None,
        model: Optional[str] = None,
        api_key: Optional[str] = None,
        organization: Optional[str] = None,
        project: Optional[str] = None,
        timeout: Optional[float] = None,
        http2: Optional[bool] = True,
        follow_redirects: Optional[bool] = None,
        max_output_tokens: Optional[int] = None,
        extra_query: Optional[dict] = None,
        extra_body: Optional[dict] = None,
    ):
        super().__init__(type_="openai_http")
        self._target = target or settings.openai.base_url

        if not self._target:
            raise ValueError("Target URL must be provided for OpenAI HTTP backend.")

        if self._target.endswith("/v1") or self._target.endswith("/v1/"):
            # backwards compatability, strip v1 off
            self._target = self._target[:-3]

        if self._target.endswith("/"):
            self._target = self._target[:-1]

        self._model = model

        api_key = api_key or settings.openai.api_key
        self.authorization = (
            f"Bearer {api_key}" if api_key else settings.openai.bearer_token
        )

        self.organization = organization or settings.openai.organization
        self.project = project or settings.openai.project
        self.timeout = timeout if timeout is not None else settings.request_timeout
        self.http2 = http2 if http2 is not None else settings.request_http2
        self.follow_redirects = (
            follow_redirects
            if follow_redirects is not None
            else settings.request_follow_redirects
        )
        self.max_output_tokens = (
            max_output_tokens
            if max_output_tokens is not None
            else settings.openai.max_output_tokens
        )
        self.extra_query = extra_query
        self.extra_body = extra_body
        self._async_client: Optional[httpx.AsyncClient] = None

    @property
    def target(self) -> str:
        """
        :return: The target URL string for the OpenAI server.
        """
        return self._target

    @property
    def model(self) -> Optional[str]:
        """
        :return: The model to use for all requests on the target server.
            If validate hasn't been called yet and no model was passed in,
            this will be None until validate is called to set the default.
        """
        return self._model

    @property
    def info(self) -> dict[str, Any]:
        """
        :return: The information about the backend.
        """
        return {
            "max_output_tokens": self.max_output_tokens,
            "timeout": self.timeout,
            "http2": self.http2,
            "follow_redirects": self.follow_redirects,
            "authorization": bool(self.authorization),
            "organization": self.organization,
            "project": self.project,
            "text_completions_path": TEXT_COMPLETIONS_PATH,
            "chat_completions_path": CHAT_COMPLETIONS_PATH,
        }

    async def check_setup(self):
        """
        Check if the backend is setup correctly and can be used for requests.
        Specifically, if a model is not provided, it grabs the first available model.
        If no models are available, raises a ValueError.
        If a model is provided and not available, raises a ValueError.

        :raises ValueError: If no models or the provided model is not available.
        """
        models = await self.available_models()
        if not models:
            raise ValueError(f"No models available for target: {self.target}")

        if not self.model:
            self._model = models[0]
        elif self.model not in models:
            raise ValueError(
                f"Model {self.model} not found in available models:"
                f"{models} for target: {self.target}"
            )

    async def prepare_multiprocessing(self):
        """
        Prepare the backend for use in a multiprocessing environment.
        Clears out the sync and async clients to ensure they are re-initialized
        for each process.
        """
        if self._async_client is not None:
            await self._async_client.aclose()
            self._async_client = None

    async def available_models(self) -> list[str]:
        """
        Get the available models for the target server using the OpenAI models endpoint:
        /v1/models
        """
        target = f"{self.target}/v1/models"
        headers = self._headers()
        params = self._params(MODELS)
        response = await self._get_async_client().get(
            target, headers=headers, params=params
        )
        response.raise_for_status()

        models = []

        for item in response.json()["data"]:
            models.append(item["id"])

        return models

    async def text_completions(  # type: ignore[override]
        self,
        prompt: Union[str, list[str]],
        request_id: Optional[str] = None,
        prompt_token_count: Optional[int] = None,
        output_token_count: Optional[int] = None,
        **kwargs,
    ) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
        """
        Generate text completions for the given prompt using the OpenAI
        completions endpoint: /v1/completions.

        :param prompt: The prompt (or list of prompts) to generate a completion for.
            If a list is supplied, these are concatenated and run through the model
            for a single prompt.
        :param request_id: The unique identifier for the request, if any.
            Added to logging statements and the response for tracking purposes.
        :param prompt_token_count: The number of tokens measured in the prompt, if any.
            Returned in the response stats for later analysis, if applicable.
        :param output_token_count: If supplied, the number of tokens to enforce
            generation of for the output for this request.
        :param kwargs: Additional keyword arguments to pass with the request.
        :return: An async generator that yields a StreamingTextResponse for start,
            a StreamingTextResponse for each received iteration,
            and a ResponseSummary for the final response.
        """
        logger.debug("{} invocation with args: {}", self.__class__.__name__, locals())

        if isinstance(prompt, list):
            raise ValueError(
                "List prompts (batching) is currently not supported for "
                f"text_completions OpenAI pathways. Received: {prompt}"
            )

        headers = self._headers()
        params = self._params(TEXT_COMPLETIONS)
        body = self._body(TEXT_COMPLETIONS)
        payload = self._completions_payload(
            body=body,
            orig_kwargs=kwargs,
            max_output_tokens=output_token_count,
            prompt=prompt,
        )

        try:
            async for resp in self._iterative_completions_request(
                type_="text_completions",
                request_id=request_id,
                request_prompt_tokens=prompt_token_count,
                request_output_tokens=output_token_count,
                headers=headers,
                params=params,
                payload=payload,
            ):
                yield resp
        except Exception as ex:
            logger.error(
                "{} request with headers: {} and params: {} and payload: {} failed: {}",
                self.__class__.__name__,
                headers,
                params,
                payload,
                ex,
            )
            raise ex

    async def chat_completions(  # type: ignore[override]
        self,
        content: Union[
            str,
            list[Union[str, dict[str, Union[str, dict[str, str]]], Path, Image.Image]],
            Any,
        ],
        request_id: Optional[str] = None,
        prompt_token_count: Optional[int] = None,
        output_token_count: Optional[int] = None,
        raw_content: bool = False,
        **kwargs,
    ) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
        """
        Generate chat completions for the given content using the OpenAI
        chat completions endpoint: /v1/chat/completions.

        :param content: The content (or list of content) to generate a completion for.
            This supports any combination of text, images, and audio (model dependent).
            Supported text only request examples:
                content="Sample prompt", content=["Sample prompt", "Second prompt"],
                content=[{"type": "text", "value": "Sample prompt"}.
            Supported text and image request examples:
                content=["Describe the image", PIL.Image.open("image.jpg")],
                content=["Describe the image", Path("image.jpg")],
                content=["Describe the image", {"type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}].
            Supported text and audio request examples:
                content=["Transcribe the audio", Path("audio.wav")],
                content=["Transcribe the audio", {"type": "input_audio",
                "input_audio": {"data": f"{base64_bytes}", "format": "wav}].
            Additionally, if raw_content=True then the content is passed directly to the
            backend without any processing.
        :param request_id: The unique identifier for the request, if any.
            Added to logging statements and the response for tracking purposes.
        :param prompt_token_count: The number of tokens measured in the prompt, if any.
            Returned in the response stats for later analysis, if applicable.
        :param output_token_count: If supplied, the number of tokens to enforce
            generation of for the output for this request.
        :param kwargs: Additional keyword arguments to pass with the request.
        :return: An async generator that yields a StreamingTextResponse for start,
            a StreamingTextResponse for each received iteration,
            and a ResponseSummary for the final response.
        """
        logger.debug("{} invocation with args: {}", self.__class__.__name__, locals())
        headers = self._headers()
        params = self._params(CHAT_COMPLETIONS)
        body = self._body(CHAT_COMPLETIONS)
        messages = (
            content if raw_content else self._create_chat_messages(content=content)
        )
        payload = self._completions_payload(
            body=body,
            orig_kwargs=kwargs,
            max_output_tokens=output_token_count,
            messages=messages,
        )

        try:
            async for resp in self._iterative_completions_request(
                type_="chat_completions",
                request_id=request_id,
                request_prompt_tokens=prompt_token_count,
                request_output_tokens=output_token_count,
                headers=headers,
                params=params,
                payload=payload,
            ):
                yield resp
        except Exception as ex:
            logger.error(
                "{} request with headers: {} and params: {} and payload: {} failed: {}",
                self.__class__.__name__,
                headers,
                params,
                payload,
                ex,
            )
            raise ex

    def _get_async_client(self) -> httpx.AsyncClient:
        """
        Get the async HTTP client for making requests.
        If the client has not been created yet, it will create one.

        :return: The async HTTP client.
        """
        if self._async_client is None:
            client = httpx.AsyncClient(
                http2=self.http2,
                timeout=self.timeout,
                follow_redirects=self.follow_redirects,
            )
            self._async_client = client
        else:
            client = self._async_client

        return client

    def _headers(self) -> dict[str, str]:
        headers = {
            "Content-Type": "application/json",
        }

        if self.authorization:
            headers["Authorization"] = self.authorization

        if self.organization:
            headers["OpenAI-Organization"] = self.organization

        if self.project:
            headers["OpenAI-Project"] = self.project

        return headers

    def _params(self, endpoint_type: EndpointType) -> dict[str, str]:
        if self.extra_query is None:
            return {}

        if (
            CHAT_COMPLETIONS in self.extra_query
            or MODELS in self.extra_query
            or TEXT_COMPLETIONS in self.extra_query
        ):
            return self.extra_query.get(endpoint_type, {})

        return self.extra_query

    def _body(self, endpoint_type: EndpointType) -> dict[str, str]:
        if self.extra_body is None:
            return {}

        if (
            CHAT_COMPLETIONS in self.extra_body
            or MODELS in self.extra_body
            or TEXT_COMPLETIONS in self.extra_body
        ):
            return self.extra_body.get(endpoint_type, {})

        return self.extra_body

    def _completions_payload(
        self,
        body: Optional[dict],
        orig_kwargs: Optional[dict],
        max_output_tokens: Optional[int],
        **kwargs,
    ) -> dict:
        payload = body or {}
        payload.update(orig_kwargs or {})
        payload.update(kwargs)
        payload["model"] = self.model
        payload["stream"] = True
        payload["stream_options"] = {
            "include_usage": True,
        }

        if max_output_tokens or self.max_output_tokens:
            logger.debug(
                "{} adding payload args for setting output_token_count: {}",
                self.__class__.__name__,
                max_output_tokens or self.max_output_tokens,
            )
            payload["max_tokens"] = max_output_tokens or self.max_output_tokens
            payload["max_completion_tokens"] = payload["max_tokens"]

            if max_output_tokens:
                # only set stop and ignore_eos if max_output_tokens set at request level
                # otherwise the instance value is just the max to enforce we stay below
                payload["stop"] = None
                payload["ignore_eos"] = True

        return payload

    @staticmethod
    def _create_chat_messages(
        content: Union[
            str,
            list[Union[str, dict[str, Union[str, dict[str, str]]], Path, Image.Image]],
            Any,
        ],
    ) -> list[dict]:
        if isinstance(content, str):
            return [
                {
                    "role": "user",
                    "content": content,
                }
            ]

        if isinstance(content, list):
            resolved_content = []

            for item in content:
                if isinstance(item, dict):
                    resolved_content.append(item)
                elif isinstance(item, str):
                    resolved_content.append({"type": "text", "text": item})
                elif isinstance(item, Image.Image) or (
                    isinstance(item, Path) and item.suffix.lower() in [".jpg", ".jpeg"]
                ):
                    image = item if isinstance(item, Image.Image) else Image.open(item)
                    encoded = base64.b64encode(image.tobytes()).decode("utf-8")
                    resolved_content.append(
                        {
                            "type": "image",
                            "image": {
                                "url": f"data:image/jpeg;base64,{encoded}",
                            },
                        }
                    )
                elif isinstance(item, Path) and item.suffix.lower() in [".wav"]:
                    encoded = base64.b64encode(item.read_bytes()).decode("utf-8")
                    resolved_content.append(
                        {
                            "type": "input_audio",
                            "input_audio": {
                                "data": f"{encoded}",
                                "format": "wav",
                            },
                        }
                    )
                else:
                    raise ValueError(
                        f"Unsupported content item type: {item} in list: {content}"
                    )

            return [
                {
                    "role": "user",
                    "content": resolved_content,
                }
            ]

        raise ValueError(f"Unsupported content type: {content}")

    async def _iterative_completions_request(
        self,
        type_: Literal["text_completions", "chat_completions"],
        request_id: Optional[str],
        request_prompt_tokens: Optional[int],
        request_output_tokens: Optional[int],
        headers: dict[str, str],
        params: dict[str, str],
        payload: dict[str, Any],
    ) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
        if type_ == "text_completions":
            target = f"{self.target}{TEXT_COMPLETIONS_PATH}"
        elif type_ == "chat_completions":
            target = f"{self.target}{CHAT_COMPLETIONS_PATH}"
        else:
            raise ValueError(f"Unsupported type: {type_}")

        logger.info(
            "{} making request: {} to target: {} using http2: {} following "
            "redirects: {} for timeout: {} with headers: {} and params: {} and ",
            "payload: {}",
            self.__class__.__name__,
            request_id,
            target,
            self.http2,
            self.follow_redirects,
            self.timeout,
            headers,
            params,
            payload,
        )

        response_value = ""
        response_prompt_count: Optional[int] = None
        response_output_count: Optional[int] = None
        iter_count = 0
        start_time = time.time()
        iter_time = start_time
        first_iter_time: Optional[float] = None
        last_iter_time: Optional[float] = None

        yield StreamingTextResponse(
            type_="start",
            value="",
            start_time=start_time,
            first_iter_time=None,
            iter_count=iter_count,
            delta="",
            time=start_time,
            request_id=request_id,
        )

        # reset start time after yielding start response to ensure accurate timing
        start_time = time.time()

        async with self._get_async_client().stream(
            "POST", target, headers=headers, params=params, json=payload
        ) as stream:
            stream.raise_for_status()

            async for line in stream.aiter_lines():
                iter_time = time.time()
                logger.debug(
                    "{} request: {} recieved iter response line: {}",
                    self.__class__.__name__,
                    request_id,
                    line,
                )

                if not line or not line.strip().startswith("data:"):
                    continue

                if line.strip() == "data: [DONE]":
                    break

                data = json.loads(line.strip()[len("data: ") :])
                if delta := self._extract_completions_delta_content(type_, data):
                    if first_iter_time is None:
                        first_iter_time = iter_time
                    last_iter_time = iter_time

                    iter_count += 1
                    response_value += delta

                    yield StreamingTextResponse(
                        type_="iter",
                        value=response_value,
                        iter_count=iter_count,
                        start_time=start_time,
                        first_iter_time=first_iter_time,
                        delta=delta,
                        time=iter_time,
                        request_id=request_id,
                    )

                if usage := self._extract_completions_usage(data):
                    response_prompt_count = usage["prompt"]
                    response_output_count = usage["output"]

        logger.info(
            "{} request: {} with headers: {} and params: {} and payload: {} completed"
            "with: {}",
            self.__class__.__name__,
            request_id,
            headers,
            params,
            payload,
            response_value,
        )

        yield ResponseSummary(
            value=response_value,
            request_args=RequestArgs(
                target=target,
                headers=headers,
                params=params,
                payload=payload,
                timeout=self.timeout,
                http2=self.http2,
                follow_redirects=self.follow_redirects,
            ),
            start_time=start_time,
            end_time=iter_time,
            first_iter_time=first_iter_time,
            last_iter_time=last_iter_time,
            iterations=iter_count,
            request_prompt_tokens=request_prompt_tokens,
            request_output_tokens=request_output_tokens,
            response_prompt_tokens=response_prompt_count,
            response_output_tokens=response_output_count,
            request_id=request_id,
        )

    @staticmethod
    def _extract_completions_delta_content(
        type_: Literal["text_completions", "chat_completions"], data: dict
    ) -> Optional[str]:
        if "choices" not in data or not data["choices"]:
            return None

        if type_ == "text_completions":
            return data["choices"][0]["text"]

        if type_ == "chat_completions":
            return data["choices"][0]["delta"]["content"]

        raise ValueError(f"Unsupported type: {type_}")

    @staticmethod
    def _extract_completions_usage(
        data: dict,
    ) -> Optional[dict[Literal["prompt", "output"], int]]:
        if "usage" not in data or not data["usage"]:
            return None

        return {
            "prompt": data["usage"]["prompt_tokens"],
            "output": data["usage"]["completion_tokens"],
        }

info property

Returns:

Type Description
dict[str, Any]

The information about the backend.

model property

Returns:

Type Description
Optional[str]

The model to use for all requests on the target server. If validate hasn't been called yet and no model was passed in, this will be None until validate is called to set the default.

target property

Returns:

Type Description
str

The target URL string for the OpenAI server.

available_models() async

Get the available models for the target server using the OpenAI models endpoint: /v1/models

Source code in src/guidellm/backend/openai.py
async def available_models(self) -> list[str]:
    """
    Get the available models for the target server using the OpenAI models endpoint:
    /v1/models
    """
    target = f"{self.target}/v1/models"
    headers = self._headers()
    params = self._params(MODELS)
    response = await self._get_async_client().get(
        target, headers=headers, params=params
    )
    response.raise_for_status()

    models = []

    for item in response.json()["data"]:
        models.append(item["id"])

    return models

chat_completions(content, request_id=None, prompt_token_count=None, output_token_count=None, raw_content=False, **kwargs) async

Generate chat completions for the given content using the OpenAI chat completions endpoint: /v1/chat/completions.

Parameters:

Name Type Description Default
content Union[str, list[Union[str, dict[str, Union[str, dict[str, str]]], Path, Image]], Any]

The content (or list of content) to generate a completion for. This supports any combination of text, images, and audio (model dependent). Supported text only request examples: content="Sample prompt", content=["Sample prompt", "Second prompt"], content=[{"type": "text", "value": "Sample prompt"}. Supported text and image request examples: content=["Describe the image", PIL.Image.open("image.jpg")], content=["Describe the image", Path("image.jpg")], content=["Describe the image", {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}]. Supported text and audio request examples: content=["Transcribe the audio", Path("audio.wav")], content=["Transcribe the audio", {"type": "input_audio", "input_audio": {"data": f"{base64_bytes}", "format": "wav}]. Additionally, if raw_content=True then the content is passed directly to the backend without any processing.

required
request_id Optional[str]

The unique identifier for the request, if any. Added to logging statements and the response for tracking purposes.

None
prompt_token_count Optional[int]

The number of tokens measured in the prompt, if any. Returned in the response stats for later analysis, if applicable.

None
output_token_count Optional[int]

If supplied, the number of tokens to enforce generation of for the output for this request.

None
kwargs

Additional keyword arguments to pass with the request.

{}

Returns:

Type Description
AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]

An async generator that yields a StreamingTextResponse for start, a StreamingTextResponse for each received iteration, and a ResponseSummary for the final response.

Source code in src/guidellm/backend/openai.py
async def chat_completions(  # type: ignore[override]
    self,
    content: Union[
        str,
        list[Union[str, dict[str, Union[str, dict[str, str]]], Path, Image.Image]],
        Any,
    ],
    request_id: Optional[str] = None,
    prompt_token_count: Optional[int] = None,
    output_token_count: Optional[int] = None,
    raw_content: bool = False,
    **kwargs,
) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
    """
    Generate chat completions for the given content using the OpenAI
    chat completions endpoint: /v1/chat/completions.

    :param content: The content (or list of content) to generate a completion for.
        This supports any combination of text, images, and audio (model dependent).
        Supported text only request examples:
            content="Sample prompt", content=["Sample prompt", "Second prompt"],
            content=[{"type": "text", "value": "Sample prompt"}.
        Supported text and image request examples:
            content=["Describe the image", PIL.Image.open("image.jpg")],
            content=["Describe the image", Path("image.jpg")],
            content=["Describe the image", {"type": "image_url",
                "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}].
        Supported text and audio request examples:
            content=["Transcribe the audio", Path("audio.wav")],
            content=["Transcribe the audio", {"type": "input_audio",
            "input_audio": {"data": f"{base64_bytes}", "format": "wav}].
        Additionally, if raw_content=True then the content is passed directly to the
        backend without any processing.
    :param request_id: The unique identifier for the request, if any.
        Added to logging statements and the response for tracking purposes.
    :param prompt_token_count: The number of tokens measured in the prompt, if any.
        Returned in the response stats for later analysis, if applicable.
    :param output_token_count: If supplied, the number of tokens to enforce
        generation of for the output for this request.
    :param kwargs: Additional keyword arguments to pass with the request.
    :return: An async generator that yields a StreamingTextResponse for start,
        a StreamingTextResponse for each received iteration,
        and a ResponseSummary for the final response.
    """
    logger.debug("{} invocation with args: {}", self.__class__.__name__, locals())
    headers = self._headers()
    params = self._params(CHAT_COMPLETIONS)
    body = self._body(CHAT_COMPLETIONS)
    messages = (
        content if raw_content else self._create_chat_messages(content=content)
    )
    payload = self._completions_payload(
        body=body,
        orig_kwargs=kwargs,
        max_output_tokens=output_token_count,
        messages=messages,
    )

    try:
        async for resp in self._iterative_completions_request(
            type_="chat_completions",
            request_id=request_id,
            request_prompt_tokens=prompt_token_count,
            request_output_tokens=output_token_count,
            headers=headers,
            params=params,
            payload=payload,
        ):
            yield resp
    except Exception as ex:
        logger.error(
            "{} request with headers: {} and params: {} and payload: {} failed: {}",
            self.__class__.__name__,
            headers,
            params,
            payload,
            ex,
        )
        raise ex

check_setup() async

Check if the backend is setup correctly and can be used for requests. Specifically, if a model is not provided, it grabs the first available model. If no models are available, raises a ValueError. If a model is provided and not available, raises a ValueError.

Raises:

Type Description
ValueError

If no models or the provided model is not available.

Source code in src/guidellm/backend/openai.py
async def check_setup(self):
    """
    Check if the backend is setup correctly and can be used for requests.
    Specifically, if a model is not provided, it grabs the first available model.
    If no models are available, raises a ValueError.
    If a model is provided and not available, raises a ValueError.

    :raises ValueError: If no models or the provided model is not available.
    """
    models = await self.available_models()
    if not models:
        raise ValueError(f"No models available for target: {self.target}")

    if not self.model:
        self._model = models[0]
    elif self.model not in models:
        raise ValueError(
            f"Model {self.model} not found in available models:"
            f"{models} for target: {self.target}"
        )

prepare_multiprocessing() async

Prepare the backend for use in a multiprocessing environment. Clears out the sync and async clients to ensure they are re-initialized for each process.

Source code in src/guidellm/backend/openai.py
async def prepare_multiprocessing(self):
    """
    Prepare the backend for use in a multiprocessing environment.
    Clears out the sync and async clients to ensure they are re-initialized
    for each process.
    """
    if self._async_client is not None:
        await self._async_client.aclose()
        self._async_client = None

text_completions(prompt, request_id=None, prompt_token_count=None, output_token_count=None, **kwargs) async

Generate text completions for the given prompt using the OpenAI completions endpoint: /v1/completions.

Parameters:

Name Type Description Default
prompt Union[str, list[str]]

The prompt (or list of prompts) to generate a completion for. If a list is supplied, these are concatenated and run through the model for a single prompt.

required
request_id Optional[str]

The unique identifier for the request, if any. Added to logging statements and the response for tracking purposes.

None
prompt_token_count Optional[int]

The number of tokens measured in the prompt, if any. Returned in the response stats for later analysis, if applicable.

None
output_token_count Optional[int]

If supplied, the number of tokens to enforce generation of for the output for this request.

None
kwargs

Additional keyword arguments to pass with the request.

{}

Returns:

Type Description
AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]

An async generator that yields a StreamingTextResponse for start, a StreamingTextResponse for each received iteration, and a ResponseSummary for the final response.

Source code in src/guidellm/backend/openai.py
async def text_completions(  # type: ignore[override]
    self,
    prompt: Union[str, list[str]],
    request_id: Optional[str] = None,
    prompt_token_count: Optional[int] = None,
    output_token_count: Optional[int] = None,
    **kwargs,
) -> AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None]:
    """
    Generate text completions for the given prompt using the OpenAI
    completions endpoint: /v1/completions.

    :param prompt: The prompt (or list of prompts) to generate a completion for.
        If a list is supplied, these are concatenated and run through the model
        for a single prompt.
    :param request_id: The unique identifier for the request, if any.
        Added to logging statements and the response for tracking purposes.
    :param prompt_token_count: The number of tokens measured in the prompt, if any.
        Returned in the response stats for later analysis, if applicable.
    :param output_token_count: If supplied, the number of tokens to enforce
        generation of for the output for this request.
    :param kwargs: Additional keyword arguments to pass with the request.
    :return: An async generator that yields a StreamingTextResponse for start,
        a StreamingTextResponse for each received iteration,
        and a ResponseSummary for the final response.
    """
    logger.debug("{} invocation with args: {}", self.__class__.__name__, locals())

    if isinstance(prompt, list):
        raise ValueError(
            "List prompts (batching) is currently not supported for "
            f"text_completions OpenAI pathways. Received: {prompt}"
        )

    headers = self._headers()
    params = self._params(TEXT_COMPLETIONS)
    body = self._body(TEXT_COMPLETIONS)
    payload = self._completions_payload(
        body=body,
        orig_kwargs=kwargs,
        max_output_tokens=output_token_count,
        prompt=prompt,
    )

    try:
        async for resp in self._iterative_completions_request(
            type_="text_completions",
            request_id=request_id,
            request_prompt_tokens=prompt_token_count,
            request_output_tokens=output_token_count,
            headers=headers,
            params=params,
            payload=payload,
        ):
            yield resp
    except Exception as ex:
        logger.error(
            "{} request with headers: {} and params: {} and payload: {} failed: {}",
            self.__class__.__name__,
            headers,
            params,
            payload,
            ex,
        )
        raise ex

RequestArgs

Bases: StandardBaseModel

A model representing the arguments for a request to a backend. Biases towards an HTTP request, but can be used for other types of backends.

Parameters:

Name Type Description Default
target

The target URL or function for the request.

required
headers

The headers, if any, included in the request such as authorization.

required
params

The query parameters, if any, included in the request.

required
payload

The payload / arguments for the request including the prompt / content and other configurations.

required
timeout

The timeout for the request in seconds, if any.

required
http2

Whether HTTP/2 was used for the request, if applicable.

required
follow_redirects

Whether the request should follow redirect responses.

required
Source code in src/guidellm/backend/response.py
class RequestArgs(StandardBaseModel):
    """
    A model representing the arguments for a request to a backend.
    Biases towards an HTTP request, but can be used for other types of backends.

    :param target: The target URL or function for the request.
    :param headers: The headers, if any, included in the request such as authorization.
    :param params: The query parameters, if any, included in the request.
    :param payload: The payload / arguments for the request including the prompt /
        content and other configurations.
    :param timeout: The timeout for the request in seconds, if any.
    :param http2: Whether HTTP/2 was used for the request, if applicable.
    :param follow_redirects: Whether the request should follow redirect responses.
    """

    target: str
    headers: dict[str, str]
    params: dict[str, str]
    payload: dict[str, Any]
    timeout: Optional[float] = None
    http2: Optional[bool] = None
    follow_redirects: Optional[bool] = None

ResponseSummary

Bases: StandardBaseModel

A model representing a summary of a backend request. Always returned as the final iteration of a streaming request.

Parameters:

Name Type Description Default
value

The final value returned from the request.

required
request_args

The arguments used to make the request.

required
iterations

The number of iterations in the request.

required
start_time

The time the request started.

required
end_time

The time the request ended.

required
first_iter_time

The time the first iteration was received.

required
last_iter_time

The time the last iteration was received.

required
request_prompt_tokens

The number of tokens measured in the prompt for the request, if any.

required
request_output_tokens

The number of tokens enforced for the output for the request, if any.

required
response_prompt_tokens

The number of tokens measured in the prompt for the response, if any.

required
response_output_tokens

The number of tokens measured in the output for the response, if any.

required
request_id

The unique identifier for the request, if any.

required
error

The error message, if any, returned from making the request.

required
Source code in src/guidellm/backend/response.py
class ResponseSummary(StandardBaseModel):
    """
    A model representing a summary of a backend request.
    Always returned as the final iteration of a streaming request.

    :param value: The final value returned from the request.
    :param request_args: The arguments used to make the request.
    :param iterations: The number of iterations in the request.
    :param start_time: The time the request started.
    :param end_time: The time the request ended.
    :param first_iter_time: The time the first iteration was received.
    :param last_iter_time: The time the last iteration was received.
    :param request_prompt_tokens: The number of tokens measured in the prompt
        for the request, if any.
    :param request_output_tokens: The number of tokens enforced for the output
        for the request, if any.
    :param response_prompt_tokens: The number of tokens measured in the prompt
        for the response, if any.
    :param response_output_tokens: The number of tokens measured in the output
        for the response, if any.
    :param request_id: The unique identifier for the request, if any.
    :param error: The error message, if any, returned from making the request.
    """

    value: str
    request_args: RequestArgs
    iterations: int = 0
    start_time: float
    end_time: float
    first_iter_time: Optional[float]
    last_iter_time: Optional[float]
    request_prompt_tokens: Optional[int] = None
    request_output_tokens: Optional[int] = None
    response_prompt_tokens: Optional[int] = None
    response_output_tokens: Optional[int] = None
    request_id: Optional[str] = None
    error: Optional[str] = None

    @computed_field  # type: ignore[misc]
    @property
    def prompt_tokens(self) -> Optional[int]:
        """
        The number of tokens measured in the prompt based on preferences
        for trusting the input or response.

        :return: The number of tokens in the prompt, if any.
        """
        if settings.preferred_prompt_tokens_source == "request":
            return self.request_prompt_tokens or self.response_prompt_tokens

        return self.response_prompt_tokens or self.request_prompt_tokens

    @computed_field  # type: ignore[misc]
    @property
    def output_tokens(self) -> Optional[int]:
        """
        The number of tokens measured in the output based on preferences
        for trusting the input or response.

        :return: The number of tokens in the output, if any.
        """
        if self.error is not None:
            # error occurred, can't trust request tokens were all generated
            return self.response_prompt_tokens

        if settings.preferred_output_tokens_source == "request":
            return self.request_output_tokens or self.response_output_tokens

        return self.response_output_tokens or self.request_output_tokens

output_tokens property

The number of tokens measured in the output based on preferences for trusting the input or response.

Returns:

Type Description
Optional[int]

The number of tokens in the output, if any.

prompt_tokens property

The number of tokens measured in the prompt based on preferences for trusting the input or response.

Returns:

Type Description
Optional[int]

The number of tokens in the prompt, if any.

StreamingTextResponse

Bases: StandardBaseModel

A model representing the response content for a streaming text request.

Parameters:

Name Type Description Default
type_

The type of the response; either 'start' or 'iter'.

required
value

The value of the response up to this iteration.

required
start_time

The time.time() the request started.

required
iter_count

The iteration count for the response. For 'start' this is 0 and for the first 'iter' it is 1.

required
delta

The text delta added to the response for this stream iteration.

required
time

If 'start', the time.time() the request started. If 'iter', the time.time() the iteration was received.

required
request_id

The unique identifier for the request, if any.

required
Source code in src/guidellm/backend/response.py
class StreamingTextResponse(StandardBaseModel):
    """
    A model representing the response content for a streaming text request.

    :param type_: The type of the response; either 'start' or 'iter'.
    :param value: The value of the response up to this iteration.
    :param start_time: The time.time() the request started.
    :param iter_count: The iteration count for the response. For 'start' this is 0
        and for the first 'iter' it is 1.
    :param delta: The text delta added to the response for this stream iteration.
    :param time: If 'start', the time.time() the request started.
        If 'iter', the time.time() the iteration was received.
    :param request_id: The unique identifier for the request, if any.
    """

    type_: StreamingResponseType
    value: str
    start_time: float
    first_iter_time: Optional[float]
    iter_count: int
    delta: str
    time: float
    request_id: Optional[str] = None