Skip to content

guidellm.scheduler

AsyncConstantStrategy

Bases: ThroughputStrategy

A class representing an asynchronous constant scheduling strategy. This strategy schedules requests asynchronously at a constant request rate in requests per second. If initial_burst is set, it will send an initial burst of math.floor(rate) requests to reach the target rate. This is useful to ensure that the target rate is reached quickly and then maintained. It inherits from the SchedulingStrategy base class and implements the request_times method to provide the specific behavior for asynchronous constant scheduling.

Parameters:

Name Type Description Default
type_

The constant StrategyType to schedule requests asynchronously.

required
rate

The rate at which to schedule requests asynchronously in requests per second. This must be a positive float.

required
initial_burst

True to send an initial burst of requests (math.floor(self.rate)) to reach target rate. False to not send an initial burst.

required
Source code in src/guidellm/scheduler/strategy.py
class AsyncConstantStrategy(ThroughputStrategy):
    """
    A class representing an asynchronous constant scheduling strategy.
    This strategy schedules requests asynchronously at a constant request rate
    in requests per second.
    If initial_burst is set, it will send an initial burst of math.floor(rate)
    requests to reach the target rate.
    This is useful to ensure that the target rate is reached quickly
    and then maintained.
    It inherits from the `SchedulingStrategy` base class and
    implements the `request_times` method to provide the specific
    behavior for asynchronous constant scheduling.

    :param type_: The constant StrategyType to schedule requests asynchronously.
    :param rate: The rate at which to schedule requests asynchronously in
        requests per second. This must be a positive float.
    :param initial_burst: True to send an initial burst of requests
        (math.floor(self.rate)) to reach target rate.
        False to not send an initial burst.
    """

    type_: Literal["constant"] = "constant"  # type: ignore[assignment]
    rate: float = Field(
        description=(
            "The rate at which to schedule requests asynchronously in "
            "requests per second. This must be a positive float."
        ),
        gt=0,
    )
    initial_burst: bool = Field(
        default=True,
        description=(
            "True to send an initial burst of requests (math.floor(self.rate)) "
            "to reach target rate. False to not send an initial burst."
        ),
    )

    def request_times(self) -> Generator[float, None, None]:
        """
        A generator that yields timestamps for when requests should be sent.
        This method schedules requests asynchronously at a constant rate
        in requests per second.
        If burst_time is set, it will send an initial burst of requests
        to reach the target rate.
        This is useful to ensure that the target rate is reached quickly
        and then maintained.

        :return: A generator that yields timestamps for request scheduling.
        """
        start_time = time.time()
        constant_increment = 1.0 / self.rate

        # handle bursts first to get to the desired rate
        if self.initial_burst is not None:
            # send an initial burst equal to the rate
            # to reach the target rate
            burst_count = math.floor(self.rate)
            for _ in range(burst_count):
                yield start_time

            start_time += constant_increment

        counter = 0

        # continue with constant rate after bursting
        while True:
            yield start_time + constant_increment * counter
            counter += 1

request_times()

A generator that yields timestamps for when requests should be sent. This method schedules requests asynchronously at a constant rate in requests per second. If burst_time is set, it will send an initial burst of requests to reach the target rate. This is useful to ensure that the target rate is reached quickly and then maintained.

Returns:

Type Description
Generator[float, None, None]

A generator that yields timestamps for request scheduling.

Source code in src/guidellm/scheduler/strategy.py
def request_times(self) -> Generator[float, None, None]:
    """
    A generator that yields timestamps for when requests should be sent.
    This method schedules requests asynchronously at a constant rate
    in requests per second.
    If burst_time is set, it will send an initial burst of requests
    to reach the target rate.
    This is useful to ensure that the target rate is reached quickly
    and then maintained.

    :return: A generator that yields timestamps for request scheduling.
    """
    start_time = time.time()
    constant_increment = 1.0 / self.rate

    # handle bursts first to get to the desired rate
    if self.initial_burst is not None:
        # send an initial burst equal to the rate
        # to reach the target rate
        burst_count = math.floor(self.rate)
        for _ in range(burst_count):
            yield start_time

        start_time += constant_increment

    counter = 0

    # continue with constant rate after bursting
    while True:
        yield start_time + constant_increment * counter
        counter += 1

AsyncPoissonStrategy

Bases: ThroughputStrategy

A class representing an asynchronous Poisson scheduling strategy. This strategy schedules requests asynchronously at a Poisson request rate in requests per second. If initial_burst is set, it will send an initial burst of math.floor(rate) requests to reach the target rate. It inherits from the SchedulingStrategy base class and implements the request_times method to provide the specific behavior for asynchronous Poisson scheduling.

Parameters:

Name Type Description Default
type_

The Poisson StrategyType to schedule requests asynchronously.

required
rate

The rate at which to schedule requests asynchronously in requests per second. This must be a positive float.

required
initial_burst

True to send an initial burst of requests (math.floor(self.rate)) to reach target rate. False to not send an initial burst.

required
Source code in src/guidellm/scheduler/strategy.py
class AsyncPoissonStrategy(ThroughputStrategy):
    """
    A class representing an asynchronous Poisson scheduling strategy.
    This strategy schedules requests asynchronously at a Poisson request rate
    in requests per second.
    If initial_burst is set, it will send an initial burst of math.floor(rate)
    requests to reach the target rate.
    It inherits from the `SchedulingStrategy` base class and
    implements the `request_times` method to provide the specific
    behavior for asynchronous Poisson scheduling.

    :param type_: The Poisson StrategyType to schedule requests asynchronously.
    :param rate: The rate at which to schedule requests asynchronously in
        requests per second. This must be a positive float.
    :param initial_burst: True to send an initial burst of requests
        (math.floor(self.rate)) to reach target rate.
        False to not send an initial burst.
    """

    type_: Literal["poisson"] = "poisson"  # type: ignore[assignment]
    rate: float = Field(
        description=(
            "The rate at which to schedule requests asynchronously in "
            "requests per second. This must be a positive float."
        ),
        gt=0,
    )
    initial_burst: bool = Field(
        default=True,
        description=(
            "True to send an initial burst of requests (math.floor(self.rate)) "
            "to reach target rate. False to not send an initial burst."
        ),
    )
    random_seed: int = Field(
        default=42,
        description=("The random seed to use for the Poisson distribution. "),
    )

    def request_times(self) -> Generator[float, None, None]:
        """
        A generator that yields timestamps for when requests should be sent.
        This method schedules requests asynchronously at a Poisson rate
        in requests per second.
        The inter arrival time between requests is exponentially distributed
        based on the rate.

        :return: A generator that yields timestamps for request scheduling.
        """
        start_time = time.time()

        if self.initial_burst is not None:
            # send an initial burst equal to the rate
            # to reach the target rate
            burst_count = math.floor(self.rate)
            for _ in range(burst_count):
                yield start_time
        else:
            yield start_time

        # set the random seed for reproducibility
        rand = random.Random(self.random_seed)  # noqa: S311

        while True:
            inter_arrival_time = rand.expovariate(self.rate)
            start_time += inter_arrival_time
            yield start_time

request_times()

A generator that yields timestamps for when requests should be sent. This method schedules requests asynchronously at a Poisson rate in requests per second. The inter arrival time between requests is exponentially distributed based on the rate.

Returns:

Type Description
Generator[float, None, None]

A generator that yields timestamps for request scheduling.

Source code in src/guidellm/scheduler/strategy.py
def request_times(self) -> Generator[float, None, None]:
    """
    A generator that yields timestamps for when requests should be sent.
    This method schedules requests asynchronously at a Poisson rate
    in requests per second.
    The inter arrival time between requests is exponentially distributed
    based on the rate.

    :return: A generator that yields timestamps for request scheduling.
    """
    start_time = time.time()

    if self.initial_burst is not None:
        # send an initial burst equal to the rate
        # to reach the target rate
        burst_count = math.floor(self.rate)
        for _ in range(burst_count):
            yield start_time
    else:
        yield start_time

    # set the random seed for reproducibility
    rand = random.Random(self.random_seed)  # noqa: S311

    while True:
        inter_arrival_time = rand.expovariate(self.rate)
        start_time += inter_arrival_time
        yield start_time

ConcurrentStrategy

Bases: SchedulingStrategy

A class representing a concurrent scheduling strategy. This strategy schedules requests concurrently with the specified number of streams. It inherits from the SchedulingStrategy base class and implements the request_times method to provide the specific behavior for concurrent scheduling.

Parameters:

Name Type Description Default
type_

The concurrent StrategyType to schedule requests concurrently.

required
streams

The number of concurrent streams to use for scheduling requests. Each stream runs synchronously with the maximum rate possible. This must be a positive integer.

required
Source code in src/guidellm/scheduler/strategy.py
class ConcurrentStrategy(SchedulingStrategy):
    """
    A class representing a concurrent scheduling strategy.
    This strategy schedules requests concurrently with the specified
    number of streams.
    It inherits from the `SchedulingStrategy` base class and
    implements the `request_times` method to provide the specific
    behavior for concurrent scheduling.

    :param type_: The concurrent StrategyType to schedule requests concurrently.
    :param streams: The number of concurrent streams to use for scheduling requests.
        Each stream runs synchronously with the maximum rate possible.
        This must be a positive integer.
    """

    type_: Literal["concurrent"] = "concurrent"  # type: ignore[assignment]
    streams: int = Field(
        description=(
            "The number of concurrent streams to use for scheduling requests. "
            "Each stream runs sychronously with the maximum rate possible. "
            "This must be a positive integer."
        ),
        gt=0,
    )

    @property
    def processing_mode(self) -> Literal["sync"]:
        """
        The processing mode for the scheduling strategy, either 'sync' or 'async'.
        This property determines how the worker processes are setup:
        either to run synchronously with one request at a time or asynchronously.

        :return: 'sync' for synchronous scheduling strategy
            for the multiple worker processes equal to streams.
        """
        return "sync"

    @property
    def processes_limit(self) -> int:
        """
        The limit on the number of worker processes for the scheduling strategy.
        It determines how many worker processes are created
        for the scheduling strategy and must be implemented by subclasses.

        :return: {self.streams} for the concurrent scheduling strategy to limit
            the worker processes to the number of streams.
        """
        return self.streams

    @property
    def queued_requests_limit(self) -> int:
        """
        The maximum number of queued requests for the scheduling strategy.
        It determines how many requests can be queued at one time
        for the scheduling strategy and must be implemented by subclasses.

        :return: {self.streams} for the concurrent scheduling strategy to limit
            the queued requests to the number of streams that are ready to be processed.
        """
        return self.streams

    @property
    def processing_requests_limit(self) -> int:
        """
        The maximum number of processing requests for the scheduling strategy.
        It determines how many requests can be processed at one time
        for the scheduling strategy and must be implemented by subclasses.

        :return: {self.streams} for the concurrent scheduling strategy to limit
            the processing requests to the number of streams that ready to be processed.
        """
        return self.streams

    def request_times(self) -> Generator[float, None, None]:
        """
        A generator that yields time.time() so requests are sent
        immediately, while scheduling them concurrently with the specified
        number of streams.

        :return: A generator that yields time.time() for immediate request scheduling.
        """
        while True:
            yield time.time()

processes_limit property

The limit on the number of worker processes for the scheduling strategy. It determines how many worker processes are created for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

{self.streams} for the concurrent scheduling strategy to limit the worker processes to the number of streams.

processing_mode property

The processing mode for the scheduling strategy, either 'sync' or 'async'. This property determines how the worker processes are setup: either to run synchronously with one request at a time or asynchronously.

Returns:

Type Description
Literal['sync']

'sync' for synchronous scheduling strategy for the multiple worker processes equal to streams.

processing_requests_limit property

The maximum number of processing requests for the scheduling strategy. It determines how many requests can be processed at one time for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

{self.streams} for the concurrent scheduling strategy to limit the processing requests to the number of streams that ready to be processed.

queued_requests_limit property

The maximum number of queued requests for the scheduling strategy. It determines how many requests can be queued at one time for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

{self.streams} for the concurrent scheduling strategy to limit the queued requests to the number of streams that are ready to be processed.

request_times()

A generator that yields time.time() so requests are sent immediately, while scheduling them concurrently with the specified number of streams.

Returns:

Type Description
Generator[float, None, None]

A generator that yields time.time() for immediate request scheduling.

Source code in src/guidellm/scheduler/strategy.py
def request_times(self) -> Generator[float, None, None]:
    """
    A generator that yields time.time() so requests are sent
    immediately, while scheduling them concurrently with the specified
    number of streams.

    :return: A generator that yields time.time() for immediate request scheduling.
    """
    while True:
        yield time.time()

GenerativeRequestsWorker

Bases: RequestsWorker[GenerationRequest, ResponseSummary]

A class that handles the execution of requests using a backend. This class is responsible for sending requests to the backend, handling responses, and managing errors.

Parameters:

Name Type Description Default
backend Backend

The backend to use for handling requests. This should be an instance of Backend such as an OpenAIHTTPBackend.

required
Source code in src/guidellm/scheduler/worker.py
class GenerativeRequestsWorker(RequestsWorker[GenerationRequest, ResponseSummary]):
    """
    A class that handles the execution of requests using a backend.
    This class is responsible for sending requests to the backend,
    handling responses, and managing errors.

    :param backend: The backend to use for handling requests.
        This should be an instance of Backend such as an OpenAIHTTPBackend.
    """

    def __init__(self, backend: Backend):
        self.backend = backend

    @property
    def description(self) -> GenerativeRequestsWorkerDescription:
        """
        Get the description of the worker.
        :return: The description of the worker.
        """
        return GenerativeRequestsWorkerDescription(
            backend_type=self.backend.type_,
            backend_target=self.backend.target,
            backend_model=self.backend.model or "None",
            backend_info=self.backend.info,
        )

    async def prepare_multiprocessing(self):
        """
        Prepare the worker for multiprocessing.
        This is useful for workers that have instance state that can not
        be shared across processes and should be cleared out and re-initialized
        for each new process.
        """
        await self.backend.prepare_multiprocessing()

    def process_loop_synchronous(
        self,
        requests_queue: multiprocessing.Queue,
        results_queue: multiprocessing.Queue,
        process_id: int,
    ):
        asyncio.run(self.backend.validate())
        super().process_loop_synchronous(
            requests_queue=requests_queue,
            results_queue=results_queue,
            process_id=process_id,
        )

    def process_loop_asynchronous(
        self,
        requests_queue: multiprocessing.Queue,
        results_queue: multiprocessing.Queue,
        max_concurrency: int,
        process_id: int,
    ):
        asyncio.run(self.backend.validate())
        super().process_loop_asynchronous(
            requests_queue=requests_queue,
            results_queue=results_queue,
            max_concurrency=max_concurrency,
            process_id=process_id,
        )

    async def resolve(
        self,
        request: GenerationRequest,
        timeout_time: float,
    ) -> tuple[ResolveStatus, ResponseSummary]:
        """
        Resolve a request by sending it to the backend and handling the response.
        This method sends the request to the backend, waits for a response,
        and handles any errors that may occur during the process.

        :param request: The request to resolve.
        :param timeout_time: The time to wait for a response before timing out.
            If timeout_time is math.inf, the request will not timeout.
        :return: A ResponseSummary object containing the response from the backend.
            If an error occurs, the ResponseSummary will contain the error message.
        """
        resolve_start_time = time.time()
        response = None
        error: Optional[str] = None
        status = ResolveStatus(
            requested=False,
            completed=False,
            errored=False,
            canceled=False,
            request_start=-1,
            request_end=-1,
        )

        try:
            if timeout_time < time.time():
                raise asyncio.TimeoutError(
                    "The timeout time has already passed."
                )  # exit early

            status.requested = True
            request_func, request_kwargs = self._create_request_func_kwargs(request)

            async def _runner():
                # wrap function so we can enforce timeout and
                # still return the latest state from the backend
                async for resp in request_func(**request_kwargs):  # type: ignore[operator]
                    nonlocal response
                    response = resp

            await asyncio.wait_for(
                _runner(),
                timeout=timeout_time - time.time() if timeout_time < math.inf else None,
            )

            if not response:
                raise ValueError(
                    f"No response received for request: {request} "
                    f"and backend: {self.backend}"
                )
            if not isinstance(response, ResponseSummary):
                raise ValueError(
                    f"Received no ResponseSummary for request: {request} "
                    f"and backend: {self.backend}, received: {response}"
                )

            status.completed = True
        except asyncio.TimeoutError:
            error = "TimeoutError: The request timed out before completing."
            status.errored = True
            status.canceled = True
        except Exception as exc:  # noqa: BLE001
            error = str(exc)
            status.errored = True

        return self._handle_response(
            status=status,
            request=request,
            response=response,
            error=error,
            resolve_start_time=resolve_start_time,
        )

    def _create_request_func_kwargs(
        self,
        request: GenerationRequest,
    ) -> tuple[
        AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None],
        dict[str, Any],
    ]:
        request_func: AsyncGenerator[
            Union[StreamingTextResponse, ResponseSummary], None
        ]
        request_kwargs: dict[str, Any]

        if request.request_type == "text_completions":
            request_func = self.backend.text_completions  # type: ignore[assignment]
            request_kwargs = {
                "prompt": request.content,
                "request_id": request.request_id,
                "prompt_token_count": request.stats.get("prompt_tokens", None),
                "output_token_count": request.constraints.get("output_tokens", None),
                **request.params,
            }
        elif request.request_type == "chat_completions":
            request_func = self.backend.chat_completions  # type: ignore[assignment]
            request_kwargs = {
                "content": request.content,
                "request_id": request.request_id,
                "prompt_token_count": request.stats.get("prompt_tokens", None),
                "output_token_count": request.constraints.get("output_tokens", None),
                **request.params,
            }
        else:
            raise ValueError(
                f"Invalid request type: {request.request_type} for {request}"
            )

        return request_func, request_kwargs

    def _handle_response(
        self,
        status: ResolveStatus,
        request: GenerationRequest,
        response: Any,
        error: Optional[str],
        resolve_start_time: float,
    ) -> tuple[ResolveStatus, ResponseSummary]:
        if response is None or not isinstance(
            response, (ResponseSummary, StreamingTextResponse)
        ):
            # nothing received or invalid response, fill in defaults for error
            if response:
                error = str(
                    ValueError(
                        f"Invalid response: {type(response)} for request: {request}; "
                    )
                ) + (error or "")

            response = ResponseSummary(
                value="",
                request_args=RequestArgs(
                    target=self.backend.target,
                    headers={},
                    params={},
                    payload={},
                ),
                start_time=resolve_start_time,
                end_time=status.request_end,
                first_iter_time=None,
                last_iter_time=None,
                request_id=request.request_id,
                error=error or "Unknown error",
            )
        elif isinstance(response, StreamingTextResponse):
            response = ResponseSummary(
                value=response.value,
                request_args=RequestArgs(
                    target=self.backend.target,
                    headers={},
                    params={},
                    payload={},
                ),
                start_time=response.start_time,
                end_time=time.time(),
                first_iter_time=response.first_iter_time,
                last_iter_time=response.time if response.iter_count > 0 else None,
                request_prompt_tokens=request.stats.get("prompt_tokens", None),
                request_output_tokens=request.constraints.get("output_tokens", None),
                response_prompt_tokens=None,
                response_output_tokens=response.iter_count,
                request_id=request.request_id,
                error=error or "Unknown error",
            )

        response.error = error
        status.request_start = response.start_time
        status.request_end = response.end_time

        return status, response

description property

Get the description of the worker.

Returns:

Type Description
GenerativeRequestsWorkerDescription

The description of the worker.

prepare_multiprocessing() async

Prepare the worker for multiprocessing. This is useful for workers that have instance state that can not be shared across processes and should be cleared out and re-initialized for each new process.

Source code in src/guidellm/scheduler/worker.py
async def prepare_multiprocessing(self):
    """
    Prepare the worker for multiprocessing.
    This is useful for workers that have instance state that can not
    be shared across processes and should be cleared out and re-initialized
    for each new process.
    """
    await self.backend.prepare_multiprocessing()

resolve(request, timeout_time) async

Resolve a request by sending it to the backend and handling the response. This method sends the request to the backend, waits for a response, and handles any errors that may occur during the process.

Parameters:

Name Type Description Default
request GenerationRequest

The request to resolve.

required
timeout_time float

The time to wait for a response before timing out. If timeout_time is math.inf, the request will not timeout.

required

Returns:

Type Description
tuple[ResolveStatus, ResponseSummary]

A ResponseSummary object containing the response from the backend. If an error occurs, the ResponseSummary will contain the error message.

Source code in src/guidellm/scheduler/worker.py
async def resolve(
    self,
    request: GenerationRequest,
    timeout_time: float,
) -> tuple[ResolveStatus, ResponseSummary]:
    """
    Resolve a request by sending it to the backend and handling the response.
    This method sends the request to the backend, waits for a response,
    and handles any errors that may occur during the process.

    :param request: The request to resolve.
    :param timeout_time: The time to wait for a response before timing out.
        If timeout_time is math.inf, the request will not timeout.
    :return: A ResponseSummary object containing the response from the backend.
        If an error occurs, the ResponseSummary will contain the error message.
    """
    resolve_start_time = time.time()
    response = None
    error: Optional[str] = None
    status = ResolveStatus(
        requested=False,
        completed=False,
        errored=False,
        canceled=False,
        request_start=-1,
        request_end=-1,
    )

    try:
        if timeout_time < time.time():
            raise asyncio.TimeoutError(
                "The timeout time has already passed."
            )  # exit early

        status.requested = True
        request_func, request_kwargs = self._create_request_func_kwargs(request)

        async def _runner():
            # wrap function so we can enforce timeout and
            # still return the latest state from the backend
            async for resp in request_func(**request_kwargs):  # type: ignore[operator]
                nonlocal response
                response = resp

        await asyncio.wait_for(
            _runner(),
            timeout=timeout_time - time.time() if timeout_time < math.inf else None,
        )

        if not response:
            raise ValueError(
                f"No response received for request: {request} "
                f"and backend: {self.backend}"
            )
        if not isinstance(response, ResponseSummary):
            raise ValueError(
                f"Received no ResponseSummary for request: {request} "
                f"and backend: {self.backend}, received: {response}"
            )

        status.completed = True
    except asyncio.TimeoutError:
        error = "TimeoutError: The request timed out before completing."
        status.errored = True
        status.canceled = True
    except Exception as exc:  # noqa: BLE001
        error = str(exc)
        status.errored = True

    return self._handle_response(
        status=status,
        request=request,
        response=response,
        error=error,
        resolve_start_time=resolve_start_time,
    )

RequestsWorker

Bases: ABC, Generic[RequestT, ResponseT]

An abstract base class for a worker that processes requests. This class defines the interface for a worker that can resolve requests asynchronously or synchronously within the Scheduler class. Subclasses must implement the resolve method, which takes a request directly given from the load generator, along with the desired start_time for the request and a timeout_time. The resolve method should return the response from the backend.

Source code in src/guidellm/scheduler/worker.py
class RequestsWorker(ABC, Generic[RequestT, ResponseT]):
    """
    An abstract base class for a worker that processes requests.
    This class defines the interface for a worker that can resolve requests
    asynchronously or synchronously within the Scheduler class.
    Subclasses must implement the `resolve` method,
    which takes a request directly given from the load generator,
    along with the desired start_time for the request and a timeout_time.
    The `resolve` method should return the response from the backend.
    """

    @property
    @abstractmethod
    def description(self) -> WorkerDescription:
        """
        An abstract property that must be implemented by subclasses.
        This property should return a Serializable class representing the information
        about the worker instance.
        """
        ...

    @abstractmethod
    async def prepare_multiprocessing(self):
        """
        An abstract method that must be implemented by subclasses.
        This is useful for workers that have instance state that can not
        be shared across processes and should be cleared out and re-initialized
        for each new process.
        """
        ...

    @abstractmethod
    async def resolve(
        self,
        request: RequestT,
        timeout_time: float,
    ) -> tuple[ResolveStatus, ResponseT]:
        """
        An abstract method that must be implemented by subclasses.
        This method should handle the resolution of a request through asyncio,
        including any necessary backend processing and response handling.

        :param request: The request to be resolved generated by the load generator.
        :param timeout_time: The timeout time for the request, if there is no timeout
            given, then this will be math.inf.
        :return: The response from the worker.
        """
        ...

    async def get_request(
        self, requests_queue: multiprocessing.Queue
    ) -> Optional[WorkerProcessRequest[RequestT]]:
        return await asyncio.to_thread(requests_queue.get)  # type: ignore[attr-defined]

    async def send_result(
        self,
        results_queue: multiprocessing.Queue,
        result: WorkerProcessResult[RequestT, ResponseT],
    ):
        await asyncio.to_thread(results_queue.put, result)  # type: ignore[attr-defined]

    async def resolve_scheduler_request(
        self,
        request: Any,
        queued_time: float,
        dequeued_time: float,
        start_time: float,
        timeout_time: float,
        results_queue: multiprocessing.Queue,
        process_id: int,
    ):
        info = SchedulerRequestInfo(
            targeted_start_time=start_time,
            queued_time=queued_time,
            dequeued_time=dequeued_time,
            scheduled_time=time.time(),
            process_id=process_id,
        )
        result: WorkerProcessResult[RequestT, ResponseT] = WorkerProcessResult(
            type_="request_scheduled",
            request=request,
            response=None,
            info=info,
        )
        asyncio.create_task(self.send_result(results_queue, result))

        if (wait_time := start_time - time.time()) > 0:
            await asyncio.sleep(wait_time)

        info.worker_start = time.time()
        result = WorkerProcessResult(
            type_="request_start",
            request=request,
            response=None,
            info=info,
        )
        asyncio.create_task(self.send_result(results_queue, result))

        status, response = await self.resolve(request, timeout_time)
        info.worker_end = time.time()
        info.requested = status.requested
        info.completed = status.completed
        info.errored = status.errored
        info.canceled = status.canceled
        info.request_start = status.request_start
        info.request_end = status.request_end
        result = WorkerProcessResult(
            type_="request_complete",
            request=request,
            response=response,
            info=info,
        )
        asyncio.create_task(self.send_result(results_queue, result))

    def process_loop_synchronous(
        self,
        requests_queue: multiprocessing.Queue,
        results_queue: multiprocessing.Queue,
        process_id: int,
    ):
        async def _process_runner():
            while (
                process_request := await self.get_request(requests_queue)
            ) is not None:
                dequeued_time = time.time()

                await self.resolve_scheduler_request(
                    request=process_request.request,
                    queued_time=process_request.queued_time,
                    dequeued_time=dequeued_time,
                    start_time=process_request.start_time,
                    timeout_time=process_request.timeout_time,
                    results_queue=results_queue,
                    process_id=process_id,
                )

        try:
            asyncio.run(_process_runner())
        except Exception as exc:  # noqa: BLE001
            logger.error(
                f"Error in worker process {process_id}: {exc}",
                exc_info=True,
                stack_info=True,
            )

    def process_loop_asynchronous(
        self,
        requests_queue: multiprocessing.Queue,
        results_queue: multiprocessing.Queue,
        max_concurrency: int,
        process_id: int,
    ):
        async def _process_runner():
            pending = asyncio.Semaphore(max_concurrency)

            if pending.locked():
                raise ValueError("Async worker called with max_concurrency < 1")

            while (
                process_request := await self.get_request(requests_queue)
            ) is not None:
                dequeued_time = time.time()

                await pending.acquire()

                def _task_done(_: asyncio.Task):
                    nonlocal pending
                    pending.release()

                task = asyncio.create_task(
                    self.resolve_scheduler_request(
                        request=process_request.request,
                        queued_time=process_request.queued_time,
                        dequeued_time=dequeued_time,
                        start_time=process_request.start_time,
                        timeout_time=process_request.timeout_time,
                        results_queue=results_queue,
                        process_id=process_id,
                    )
                )
                task.add_done_callback(_task_done)
                await asyncio.sleep(0)  # enable start task immediately

        try:
            asyncio.run(_process_runner())
        except Exception as exc:  # noqa: BLE001
            logger.error(
                f"Error in worker process {process_id}: {exc}",
                exc_info=True,
                stack_info=True,
            )

description abstractmethod property

An abstract property that must be implemented by subclasses. This property should return a Serializable class representing the information about the worker instance.

prepare_multiprocessing() abstractmethod async

An abstract method that must be implemented by subclasses. This is useful for workers that have instance state that can not be shared across processes and should be cleared out and re-initialized for each new process.

Source code in src/guidellm/scheduler/worker.py
@abstractmethod
async def prepare_multiprocessing(self):
    """
    An abstract method that must be implemented by subclasses.
    This is useful for workers that have instance state that can not
    be shared across processes and should be cleared out and re-initialized
    for each new process.
    """
    ...

resolve(request, timeout_time) abstractmethod async

An abstract method that must be implemented by subclasses. This method should handle the resolution of a request through asyncio, including any necessary backend processing and response handling.

Parameters:

Name Type Description Default
request RequestT

The request to be resolved generated by the load generator.

required
timeout_time float

The timeout time for the request, if there is no timeout given, then this will be math.inf.

required

Returns:

Type Description
tuple[ResolveStatus, ResponseT]

The response from the worker.

Source code in src/guidellm/scheduler/worker.py
@abstractmethod
async def resolve(
    self,
    request: RequestT,
    timeout_time: float,
) -> tuple[ResolveStatus, ResponseT]:
    """
    An abstract method that must be implemented by subclasses.
    This method should handle the resolution of a request through asyncio,
    including any necessary backend processing and response handling.

    :param request: The request to be resolved generated by the load generator.
    :param timeout_time: The timeout time for the request, if there is no timeout
        given, then this will be math.inf.
    :return: The response from the worker.
    """
    ...

Scheduler

Bases: Generic[RequestT, ResponseT]

A class that handles the scheduling of requests to a worker. This class is responsible for managing the lifecycle of the requests, including their creation, queuing, and processing. It uses a multiprocessing approach to handle requests concurrently and efficiently, based on the specified scheduling strategy. The Scheduler class is designed to work with a RequestsWorker, which is an abstract base class that defines the interface for a worker that can resolve requests asynchronously or synchronously. The Scheduler class also supports different scheduling strategies, including synchronous, throughput, and concurrent strategies.

Parameters:

Name Type Description Default
worker RequestsWorker[RequestT, ResponseT]

The worker that will process the requests. This should be an instance of RequestsWorker.

required
request_loader Iterable[RequestT]

An iterable that generates requests. This can be a list, generator, or any other iterable. The requests will be processed by the worker.

required
Source code in src/guidellm/scheduler/scheduler.py
class Scheduler(Generic[RequestT, ResponseT]):
    """
    A class that handles the scheduling of requests to a worker.
    This class is responsible for managing the lifecycle of the requests,
    including their creation, queuing, and processing.
    It uses a multiprocessing approach to handle requests concurrently
    and efficiently, based on the specified scheduling strategy.
    The Scheduler class is designed to work with a RequestsWorker,
    which is an abstract base class that defines the interface for a worker
    that can resolve requests asynchronously or synchronously.
    The Scheduler class also supports different scheduling strategies,
    including synchronous, throughput, and concurrent strategies.

    :param worker: The worker that will process the requests.
        This should be an instance of RequestsWorker.
    :param request_loader: An iterable that generates requests.
        This can be a list, generator, or any other iterable.
        The requests will be processed by the worker.
    """

    def __init__(
        self,
        worker: RequestsWorker[RequestT, ResponseT],
        request_loader: Iterable[RequestT],
    ):
        if not isinstance(worker, RequestsWorker):
            raise ValueError(f"Invalid worker: {worker}")

        if not isinstance(request_loader, Iterable):
            raise ValueError(f"Invalid request_loader: {request_loader}")

        self.worker = worker
        self.request_loader = request_loader

    async def run(
        self,
        scheduling_strategy: SchedulingStrategy,
        max_number: Optional[int] = None,
        max_duration: Optional[float] = None,
    ) -> AsyncGenerator[
        Union[SchedulerResult, SchedulerRequestResult[RequestT, ResponseT]], None
    ]:
        """
        The main method that runs the scheduler.
        This method is a generator that yields SchedulerResult objects
        at the start and end of the run, as well as at the start and end
        of each request.
        It uses multiprocessing to handle requests concurrently
        and efficiently, based on the specified scheduling strategy.
        The method also handles the lifecycle of the requests,
        including their creation, queuing, and processing.
        The method is designed to be used as an asynchronous generator,
        allowing it to be used with asyncio and other asynchronous frameworks.

        :param scheduling_strategy: The scheduling strategy to use.
            Specifies the times at which requests will be sent as well how many
            worker processes are used and if requests are scheduled sync or async.
            This can be one of the following:
            - "synchronous": Requests are sent synchronously.
            - "throughput": Requests are sent at the maximum rate possible.
            - An instance of SchedulingStrategy.
        :param max_number: The maximum number of requests to process.
            If None, then no limit is set and either the iterator must be exhaustible
            or the max_duration must be set.
        :param max_duration: The maximum duration for the scheduling run.
            If None, then no limit is set and either the iterator must be exhaustible
            or the max_number must be set.
        :return: An asynchronous generator that yields SchedulerResult objects.
            Each SchedulerResult object contains information about the request,
            the response, and the run information.
        """
        if scheduling_strategy is None or not isinstance(
            scheduling_strategy, SchedulingStrategy
        ):
            raise ValueError(f"Invalid scheduling strategy: {scheduling_strategy}")

        if max_number is not None and max_number < 1:
            raise ValueError(f"Invalid max_number: {max_number}")

        if max_duration is not None and max_duration < 0:
            raise ValueError(f"Invalid max_duration: {max_duration}")

        with (
            multiprocessing.Manager() as manager,
            ProcessPoolExecutor(
                max_workers=scheduling_strategy.processes_limit
            ) as executor,
        ):
            requests_iter: Optional[Iterator[Any]] = None
            futures, requests_queue, responses_queue = await self._start_processes(
                manager, executor, scheduling_strategy
            )
            run_info, requests_iter, times_iter = self._run_setup(
                futures, scheduling_strategy, max_number, max_duration
            )
            yield SchedulerResult(
                type_="run_start",
                run_info=run_info,
            )

            try:
                while True:
                    # check errors and raise them
                    for future in futures:
                        if future.done() and (err := future.exception()) is not None:
                            raise err

                    if (
                        requests_iter is None
                        and run_info.completed_requests >= run_info.created_requests
                    ):
                        # we've exhausted all requests we've wanted to run
                        # and yielded all responses
                        break

                    requests_iter = self._add_requests(
                        requests_iter,
                        times_iter,
                        requests_queue,
                        run_info,
                    )
                    await asyncio.sleep(0)  # enable requests to start

                    iter_result = self._check_result_ready(
                        responses_queue,
                        run_info,
                    )
                    if iter_result is not None:
                        yield iter_result

                    # yield control to the event loop
                    await asyncio.sleep(settings.default_async_loop_sleep)
            except Exception as err:
                raise RuntimeError(f"Scheduler run failed: {err}") from err

            yield SchedulerResult(
                type_="run_complete",
                run_info=run_info,
            )

            await self._stop_processes(futures, requests_queue)

    async def _start_processes(
        self,
        manager,
        executor: ProcessPoolExecutor,
        scheduling_strategy: SchedulingStrategy,
    ) -> tuple[
        list[asyncio.Future],
        multiprocessing.Queue,
        multiprocessing.Queue,
    ]:
        await self.worker.prepare_multiprocessing()
        requests_queue = manager.Queue(
            maxsize=scheduling_strategy.queued_requests_limit
        )
        responses_queue = manager.Queue()

        num_processes = min(
            scheduling_strategy.processes_limit,
            scheduling_strategy.processing_requests_limit,
        )
        requests_limit_split = (
            scheduling_strategy.processing_requests_limit
            // scheduling_strategy.processes_limit
        )
        requests_limit_remain = (
            scheduling_strategy.processing_requests_limit
            % scheduling_strategy.processes_limit
        )
        process_ids = (id_ for id_ in range(num_processes))
        process_requests_limits = (
            requests_limit_split + 1
            if i < requests_limit_remain
            else requests_limit_split
            for i in range(num_processes)
        )

        futures = []
        loop = asyncio.get_event_loop()
        for id_, requests_limit in zip(process_ids, process_requests_limits):
            if scheduling_strategy.processing_mode == "sync":
                futures.append(
                    loop.run_in_executor(
                        executor,
                        self.worker.process_loop_synchronous,
                        requests_queue,
                        responses_queue,
                        id_,
                    )
                )
            elif scheduling_strategy.processing_mode == "async":
                futures.append(
                    loop.run_in_executor(
                        executor,
                        self.worker.process_loop_asynchronous,
                        requests_queue,
                        responses_queue,
                        requests_limit,
                        id_,
                    )
                )
            else:
                raise ValueError(
                    f"Invalid processing mode: {scheduling_strategy.processing_mode} "
                    f"for strategy: {scheduling_strategy}"
                )

        await asyncio.sleep(0.1)  # give time for processes to start

        return futures, requests_queue, responses_queue

    def _run_setup(
        self,
        processes: list[asyncio.Future],
        scheduling_strategy: SchedulingStrategy,
        max_number: Optional[int],
        max_duration: Optional[float],
    ) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]:
        requests_iter = iter(self.request_loader)
        start_time = time.time()
        times_iter = iter(scheduling_strategy.request_times())
        end_time = time.time() + (max_duration or math.inf)
        end_number = max_number or math.inf

        try:
            # update end number if the request loader is finite and less than max
            iter_length = len(self.request_loader)  # type: ignore[arg-type]
            if 0 < iter_length < end_number:
                end_number = iter_length
        except Exception:  # noqa: BLE001, S110
            pass

        if end_number == math.inf and end_time is None:
            logger.warning(
                "No end number or end time set, "
                "scheduler will run indefinitely until the request loader is exhausted."
            )

        info = SchedulerRunInfo(
            start_time=start_time,
            end_time=end_time,
            end_number=end_number,
            processes=len(processes),
            strategy=scheduling_strategy,
        )

        return info, requests_iter, times_iter

    def _add_requests(
        self,
        requests_iter: Optional[Iterator[Any]],
        times_iter: Iterator[float],
        requests_queue: multiprocessing.Queue,
        run_info: SchedulerRunInfo,
    ) -> Optional[Iterator[Any]]:
        if requests_iter is not None:
            try:
                added_count = 0

                while (
                    not requests_queue.full()
                    and added_count < settings.max_add_requests_per_loop
                ):
                    if run_info.created_requests >= run_info.end_number:
                        raise StopIteration

                    if (
                        request_time := next(times_iter)
                    ) >= run_info.end_time or time.time() >= run_info.end_time:
                        raise StopIteration

                    request = next(requests_iter)
                    work_req: WorkerProcessRequest[RequestT] = WorkerProcessRequest(
                        request=request,
                        start_time=request_time,
                        timeout_time=run_info.end_time,
                        queued_time=time.time(),
                    )
                    requests_queue.put(work_req)

                    run_info.created_requests += 1
                    run_info.queued_requests += 1
                    added_count += 1
            except StopIteration:
                # we've reached the limit number, limit time, or exhausted the requests
                # set to None to stop adding more and tell the loop no more requests
                requests_iter = None

        return requests_iter

    def _check_result_ready(
        self,
        responses_queue: multiprocessing.Queue,
        run_info: SchedulerRunInfo,
    ) -> Optional[SchedulerRequestResult[RequestT, ResponseT]]:
        try:
            process_response: WorkerProcessResult[RequestT, ResponseT] = (
                responses_queue.get_nowait()
            )
        except multiprocessing.queues.Empty:  # type: ignore[attr-defined]
            return None

        if process_response.type_ == "request_scheduled":
            run_info.queued_requests -= 1
            run_info.scheduled_requests += 1

            return SchedulerRequestResult(
                type_="request_scheduled",
                run_info=run_info,
                request=process_response.request,
                request_info=process_response.info,
                response=None,
            )

        if process_response.type_ == "request_start":
            run_info.scheduled_requests -= 1
            run_info.processing_requests += 1

            return SchedulerRequestResult(
                type_="request_start",
                run_info=run_info,
                request=process_response.request,
                request_info=process_response.info,
                response=None,
            )

        if process_response.type_ == "request_complete":
            run_info.processing_requests -= 1
            run_info.completed_requests += 1

            return SchedulerRequestResult(
                type_="request_complete",
                run_info=run_info,
                request=process_response.request,
                request_info=process_response.info,
                response=process_response.response,
            )
        raise ValueError(f"Invalid process response type: {process_response}")

    async def _stop_processes(
        self,
        futures: list[asyncio.Future],
        requests_queue: multiprocessing.Queue,
    ):
        for _ in futures:
            requests_queue.put(None)

        await asyncio.gather(*futures)

run(scheduling_strategy, max_number=None, max_duration=None) async

The main method that runs the scheduler. This method is a generator that yields SchedulerResult objects at the start and end of the run, as well as at the start and end of each request. It uses multiprocessing to handle requests concurrently and efficiently, based on the specified scheduling strategy. The method also handles the lifecycle of the requests, including their creation, queuing, and processing. The method is designed to be used as an asynchronous generator, allowing it to be used with asyncio and other asynchronous frameworks.

Parameters:

Name Type Description Default
scheduling_strategy SchedulingStrategy

The scheduling strategy to use. Specifies the times at which requests will be sent as well how many worker processes are used and if requests are scheduled sync or async. This can be one of the following: - "synchronous": Requests are sent synchronously. - "throughput": Requests are sent at the maximum rate possible. - An instance of SchedulingStrategy.

required
max_number Optional[int]

The maximum number of requests to process. If None, then no limit is set and either the iterator must be exhaustible or the max_duration must be set.

None
max_duration Optional[float]

The maximum duration for the scheduling run. If None, then no limit is set and either the iterator must be exhaustible or the max_number must be set.

None

Returns:

Type Description
AsyncGenerator[Union[SchedulerResult, SchedulerRequestResult[RequestT, ResponseT]], None]

An asynchronous generator that yields SchedulerResult objects. Each SchedulerResult object contains information about the request, the response, and the run information.

Source code in src/guidellm/scheduler/scheduler.py
async def run(
    self,
    scheduling_strategy: SchedulingStrategy,
    max_number: Optional[int] = None,
    max_duration: Optional[float] = None,
) -> AsyncGenerator[
    Union[SchedulerResult, SchedulerRequestResult[RequestT, ResponseT]], None
]:
    """
    The main method that runs the scheduler.
    This method is a generator that yields SchedulerResult objects
    at the start and end of the run, as well as at the start and end
    of each request.
    It uses multiprocessing to handle requests concurrently
    and efficiently, based on the specified scheduling strategy.
    The method also handles the lifecycle of the requests,
    including their creation, queuing, and processing.
    The method is designed to be used as an asynchronous generator,
    allowing it to be used with asyncio and other asynchronous frameworks.

    :param scheduling_strategy: The scheduling strategy to use.
        Specifies the times at which requests will be sent as well how many
        worker processes are used and if requests are scheduled sync or async.
        This can be one of the following:
        - "synchronous": Requests are sent synchronously.
        - "throughput": Requests are sent at the maximum rate possible.
        - An instance of SchedulingStrategy.
    :param max_number: The maximum number of requests to process.
        If None, then no limit is set and either the iterator must be exhaustible
        or the max_duration must be set.
    :param max_duration: The maximum duration for the scheduling run.
        If None, then no limit is set and either the iterator must be exhaustible
        or the max_number must be set.
    :return: An asynchronous generator that yields SchedulerResult objects.
        Each SchedulerResult object contains information about the request,
        the response, and the run information.
    """
    if scheduling_strategy is None or not isinstance(
        scheduling_strategy, SchedulingStrategy
    ):
        raise ValueError(f"Invalid scheduling strategy: {scheduling_strategy}")

    if max_number is not None and max_number < 1:
        raise ValueError(f"Invalid max_number: {max_number}")

    if max_duration is not None and max_duration < 0:
        raise ValueError(f"Invalid max_duration: {max_duration}")

    with (
        multiprocessing.Manager() as manager,
        ProcessPoolExecutor(
            max_workers=scheduling_strategy.processes_limit
        ) as executor,
    ):
        requests_iter: Optional[Iterator[Any]] = None
        futures, requests_queue, responses_queue = await self._start_processes(
            manager, executor, scheduling_strategy
        )
        run_info, requests_iter, times_iter = self._run_setup(
            futures, scheduling_strategy, max_number, max_duration
        )
        yield SchedulerResult(
            type_="run_start",
            run_info=run_info,
        )

        try:
            while True:
                # check errors and raise them
                for future in futures:
                    if future.done() and (err := future.exception()) is not None:
                        raise err

                if (
                    requests_iter is None
                    and run_info.completed_requests >= run_info.created_requests
                ):
                    # we've exhausted all requests we've wanted to run
                    # and yielded all responses
                    break

                requests_iter = self._add_requests(
                    requests_iter,
                    times_iter,
                    requests_queue,
                    run_info,
                )
                await asyncio.sleep(0)  # enable requests to start

                iter_result = self._check_result_ready(
                    responses_queue,
                    run_info,
                )
                if iter_result is not None:
                    yield iter_result

                # yield control to the event loop
                await asyncio.sleep(settings.default_async_loop_sleep)
        except Exception as err:
            raise RuntimeError(f"Scheduler run failed: {err}") from err

        yield SchedulerResult(
            type_="run_complete",
            run_info=run_info,
        )

        await self._stop_processes(futures, requests_queue)

SchedulerRequestInfo

Bases: StandardBaseModel

Information about a specific request run through the scheduler. This class holds metadata about the request, including the targeted start time, queued time, start time, end time, and the process ID that handled the request.

Parameters:

Name Type Description Default
targeted_start_time

The targeted start time for the request (time.time()).

required
queued_time

The time the request was queued (time.time()).

required
scheduled_time

The time the request was scheduled (time.time()) (any sleep time before the request was sent to the worker).

required
worker_start

The time the worker started processing request (time.time()).

required
worker_end

The time the worker finished processing request. (time.time()).

required
process_id

The ID of the underlying process that handled the request.

required
Source code in src/guidellm/scheduler/result.py
class SchedulerRequestInfo(StandardBaseModel):
    """
    Information about a specific request run through the scheduler.
    This class holds metadata about the request, including
    the targeted start time, queued time, start time, end time,
    and the process ID that handled the request.

    :param targeted_start_time: The targeted start time for the request (time.time()).
    :param queued_time: The time the request was queued (time.time()).
    :param scheduled_time: The time the request was scheduled (time.time())
        (any sleep time before the request was sent to the worker).
    :param worker_start: The time the worker started processing request (time.time()).
    :param worker_end: The time the worker finished processing request. (time.time()).
    :param process_id: The ID of the underlying process that handled the request.
    """

    requested: bool = False
    completed: bool = False
    errored: bool = False
    canceled: bool = False

    targeted_start_time: float = -1
    queued_time: float = -1
    dequeued_time: float = -1
    scheduled_time: float = -1
    worker_start: float = -1
    request_start: float = -1
    request_end: float = -1
    worker_end: float = -1
    process_id: int = -1

SchedulerResult

Bases: StandardBaseModel

The yielded, iterative result for a scheduler run. These are triggered on the start and end of the run, as well as on the start and end of each request. Depending on the type, it will hold the request and response along with information and statistics about the request and general run.

Parameters:

Name Type Description Default
type_

The type of the result, which can be one of: - "run_start": Indicates the start of the run. - "run_complete": Indicates the completion of the run (teardown happens after). - "request_start": Indicates the start of a request. - "request_complete": Indicates the completion of a request.

required
request

The request that was processed.

required
response

The response from the worker for the request.

required
request_info

Information about the request, including the targeted start time, queued time, start time, end time, and the process ID that handled the request.

required
run_info

Information about the current run of the scheduler, including the start and end times, the number of processes, and the scheduling strategy used. It also tracks the number of requests created, queued, pending, and completed during the run.

required
Source code in src/guidellm/scheduler/result.py
class SchedulerResult(StandardBaseModel):
    """
    The yielded, iterative result for a scheduler run.
    These are triggered on the start and end of the run,
    as well as on the start and end of each request.
    Depending on the type, it will hold the request and response
    along with information and statistics about the request and general run.

    :param type_: The type of the result, which can be one of:
        - "run_start": Indicates the start of the run.
        - "run_complete": Indicates the completion of the run (teardown happens after).
        - "request_start": Indicates the start of a request.
        - "request_complete": Indicates the completion of a request.
    :param request: The request that was processed.
    :param response: The response from the worker for the request.
    :param request_info: Information about the request, including
        the targeted start time, queued time, start time, end time,
        and the process ID that handled the request.
    :param run_info: Information about the current run of the scheduler,
        including the start and end times, the number of processes,
        and the scheduling strategy used.
        It also tracks the number of requests created, queued, pending,
        and completed during the run.
    """

    pydantic_type: Literal["scheduler_result"] = "scheduler_result"
    type_: Literal[
        "run_start",
        "run_complete",
        "request_scheduled",
        "request_start",
        "request_complete",
    ]
    run_info: SchedulerRunInfo

SchedulerRunInfo

Bases: StandardBaseModel

Information about the current run of the scheduler. This class holds metadata about the scheduling run, including the start and end times, the number of processes, and the scheduling strategy used. It also tracks the number of requests created, queued, pending, and completed during the run.

Parameters:

Name Type Description Default
start_time

The start time of the scheduling run.

required
end_time

The end time of the scheduling run; if None, then this will be math.inf.

required
end_number

The maximum number of requests to be processed; if None, then this will be math.inf.

required
processes

The number of processes used in the scheduling run.

required
strategy

The scheduling strategy used in the run. This should be an instance of SchedulingStrategy.

required
created_requests

The number of requests created during the run.

required
queued_requests

The number of requests queued during the run.

required
scheduled_requests

The number of requests scheduled during the run. (requests pending being sent to the worker but recieved by a process)

required
processing_requests

The number of requests actively being run.

required
completed_requests

The number of requests completed during the run.

required
Source code in src/guidellm/scheduler/result.py
class SchedulerRunInfo(StandardBaseModel):
    """
    Information about the current run of the scheduler.
    This class holds metadata about the scheduling run,
    including the start and end times, the number of processes,
    and the scheduling strategy used.
    It also tracks the number of requests created, queued, pending,
    and completed during the run.

    :param start_time: The start time of the scheduling run.
    :param end_time: The end time of the scheduling run;
        if None, then this will be math.inf.
    :param end_number: The maximum number of requests to be processed;
        if None, then this will be math.inf.
    :param processes: The number of processes used in the scheduling run.
    :param strategy: The scheduling strategy used in the run.
        This should be an instance of SchedulingStrategy.
    :param created_requests: The number of requests created during the run.
    :param queued_requests: The number of requests queued during the run.
    :param scheduled_requests: The number of requests scheduled during the run.
        (requests pending being sent to the worker but recieved by a process)
    :param processing_requests: The number of requests actively being run.
    :param completed_requests: The number of requests completed during the run.
    """

    start_time: float
    end_time: float
    end_number: float
    processes: int
    strategy: SchedulingStrategy

    created_requests: int = 0
    queued_requests: int = 0
    scheduled_requests: int = 0
    processing_requests: int = 0
    completed_requests: int = 0

SchedulingStrategy

Bases: StandardBaseModel

An abstract base class for scheduling strategies. This class defines the interface for scheduling requests and provides a common structure for all scheduling strategies. Subclasses should implement the request_times method to provide specific scheduling behavior.

Parameters:

Name Type Description Default
type_

The type of scheduling strategy to use. This should be one of the predefined strategy types.

required
Source code in src/guidellm/scheduler/strategy.py
class SchedulingStrategy(StandardBaseModel):
    """
    An abstract base class for scheduling strategies.
    This class defines the interface for scheduling requests and provides
    a common structure for all scheduling strategies.
    Subclasses should implement the `request_times` method to provide
    specific scheduling behavior.

    :param type_: The type of scheduling strategy to use.
        This should be one of the predefined strategy types.
    """

    type_: Literal["strategy"] = Field(
        description="The type of scheduling strategy schedule requests with.",
    )

    @property
    def processing_mode(self) -> Literal["sync", "async"]:
        """
        The processing mode for the scheduling strategy, either 'sync' or 'async'.
        This property determines how the worker processes are setup:
        either to run synchronously with one request at a time or asynchronously.
        This property should be implemented by subclasses to return
        the appropriate processing mode.

        :return: The processing mode for the scheduling strategy,
            either 'sync' or 'async'.
        """
        return "async"

    @property
    def processes_limit(self) -> int:
        """
        The limit on the number of worker processes for the scheduling strategy.
        It determines how many worker processes are created
        for the scheduling strategy and must be implemented by subclasses.

        :return: The number of processes for the scheduling strategy.
        """
        cpu_cores = os.cpu_count() or 1

        return min(max(1, cpu_cores - 1), settings.max_worker_processes)

    @property
    def queued_requests_limit(self) -> Optional[int]:
        """
        The maximum number of queued requests for the scheduling strategy.
        It determines how many requests can be queued at one time
        for the scheduling strategy and must be implemented by subclasses.

        :return: The maximum number of queued requests for the scheduling strategy.
        """
        return settings.max_concurrency

    @property
    def processing_requests_limit(self) -> int:
        """
        The maximum number of processing requests for the scheduling strategy.
        It determines how many requests can be processed at one time
        for the scheduling strategy and must be implemented by subclasses.

        :return: The maximum number of processing requests for the scheduling strategy.
        """
        return settings.max_concurrency

    def request_times(self) -> Generator[float, None, None]:
        """
        A generator that yields timestamps for when requests should be sent.
        This method should be implemented by subclasses to provide specific
        scheduling behavior.

        :return: A generator that yields timestamps for request scheduling
            or -1 for requests that should be sent immediately.
        """
        raise NotImplementedError("Subclasses must implement request_times() method.")

processes_limit property

The limit on the number of worker processes for the scheduling strategy. It determines how many worker processes are created for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

The number of processes for the scheduling strategy.

processing_mode property

The processing mode for the scheduling strategy, either 'sync' or 'async'. This property determines how the worker processes are setup: either to run synchronously with one request at a time or asynchronously. This property should be implemented by subclasses to return the appropriate processing mode.

Returns:

Type Description
Literal['sync', 'async']

The processing mode for the scheduling strategy, either 'sync' or 'async'.

processing_requests_limit property

The maximum number of processing requests for the scheduling strategy. It determines how many requests can be processed at one time for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

The maximum number of processing requests for the scheduling strategy.

queued_requests_limit property

The maximum number of queued requests for the scheduling strategy. It determines how many requests can be queued at one time for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
Optional[int]

The maximum number of queued requests for the scheduling strategy.

request_times()

A generator that yields timestamps for when requests should be sent. This method should be implemented by subclasses to provide specific scheduling behavior.

Returns:

Type Description
Generator[float, None, None]

A generator that yields timestamps for request scheduling or -1 for requests that should be sent immediately.

Source code in src/guidellm/scheduler/strategy.py
def request_times(self) -> Generator[float, None, None]:
    """
    A generator that yields timestamps for when requests should be sent.
    This method should be implemented by subclasses to provide specific
    scheduling behavior.

    :return: A generator that yields timestamps for request scheduling
        or -1 for requests that should be sent immediately.
    """
    raise NotImplementedError("Subclasses must implement request_times() method.")

SynchronousStrategy

Bases: SchedulingStrategy

A class representing a synchronous scheduling strategy. This strategy schedules requests synchronously, one at a time, with the maximum rate possible. It inherits from the SchedulingStrategy base class and implements the request_times method to provide the specific behavior for synchronous scheduling.

Parameters:

Name Type Description Default
type_

The synchronous StrategyType to schedule requests synchronously.

required
Source code in src/guidellm/scheduler/strategy.py
class SynchronousStrategy(SchedulingStrategy):
    """
    A class representing a synchronous scheduling strategy.
    This strategy schedules requests synchronously, one at a time,
    with the maximum rate possible.
    It inherits from the `SchedulingStrategy` base class and
    implements the `request_times` method to provide the specific
    behavior for synchronous scheduling.

    :param type_: The synchronous StrategyType to schedule requests synchronously.
    """

    type_: Literal["synchronous"] = "synchronous"  # type: ignore[assignment]

    @property
    def processing_mode(self) -> Literal["sync"]:
        """
        The processing mode for the scheduling strategy, either 'sync' or 'async'.
        This property determines how the worker processes are setup:
        either to run synchronously with one request at a time or asynchronously.

        :return: 'sync' for synchronous scheduling strategy
            for the single worker process.
        """
        return "sync"

    @property
    def processes_limit(self) -> int:
        """
        The limit on the number of worker processes for the scheduling strategy.
        It determines how many worker processes are created
        for the scheduling strategy and must be implemented by subclasses.

        :return: 1 for the synchronous scheduling strategy to limit
            the worker processes to one.
        """
        return 1

    @property
    def queued_requests_limit(self) -> int:
        """
        The maximum number of queued requests for the scheduling strategy.
        It determines how many requests can be queued at one time
        for the scheduling strategy and must be implemented by subclasses.

        :return: 1 for the synchronous scheduling strategy to limit
            the queued requests to one that is ready to be processed.
        """
        return 1

    @property
    def processing_requests_limit(self) -> int:
        """
        The maximum number of processing requests for the scheduling strategy.
        It determines how many requests can be processed at one time
        for the scheduling strategy and must be implemented by subclasses.

        :return: 1 for the synchronous scheduling strategy to limit
            the processing requests to one that is ready to be processed.
        """
        return 1

    def request_times(self) -> Generator[float, None, None]:
        """
        A generator that yields time.time() so requests are sent immediately,
            while scheduling them synchronously.

        :return: A generator that yields time.time() for immediate request scheduling.
        """
        while True:
            yield time.time()

processes_limit property

The limit on the number of worker processes for the scheduling strategy. It determines how many worker processes are created for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

1 for the synchronous scheduling strategy to limit the worker processes to one.

processing_mode property

The processing mode for the scheduling strategy, either 'sync' or 'async'. This property determines how the worker processes are setup: either to run synchronously with one request at a time or asynchronously.

Returns:

Type Description
Literal['sync']

'sync' for synchronous scheduling strategy for the single worker process.

processing_requests_limit property

The maximum number of processing requests for the scheduling strategy. It determines how many requests can be processed at one time for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

1 for the synchronous scheduling strategy to limit the processing requests to one that is ready to be processed.

queued_requests_limit property

The maximum number of queued requests for the scheduling strategy. It determines how many requests can be queued at one time for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

1 for the synchronous scheduling strategy to limit the queued requests to one that is ready to be processed.

request_times()

A generator that yields time.time() so requests are sent immediately, while scheduling them synchronously.

Returns:

Type Description
Generator[float, None, None]

A generator that yields time.time() for immediate request scheduling.

Source code in src/guidellm/scheduler/strategy.py
def request_times(self) -> Generator[float, None, None]:
    """
    A generator that yields time.time() so requests are sent immediately,
        while scheduling them synchronously.

    :return: A generator that yields time.time() for immediate request scheduling.
    """
    while True:
        yield time.time()

ThroughputStrategy

Bases: SchedulingStrategy

A class representing a throughput scheduling strategy. This strategy schedules as many requests asynchronously as possible, with the maximum rate possible. It inherits from the SchedulingStrategy base class and implements the request_times method to provide the specific behavior for throughput scheduling.

Parameters:

Name Type Description Default
type_

The throughput StrategyType to schedule requests asynchronously.

required
Source code in src/guidellm/scheduler/strategy.py
class ThroughputStrategy(SchedulingStrategy):
    """
    A class representing a throughput scheduling strategy.
    This strategy schedules as many requests asynchronously as possible,
    with the maximum rate possible.
    It inherits from the `SchedulingStrategy` base class and
    implements the `request_times` method to provide the specific
    behavior for throughput scheduling.

    :param type_: The throughput StrategyType to schedule requests asynchronously.
    """

    type_: Literal["throughput"] = "throughput"  # type: ignore[assignment]
    max_concurrency: Optional[int] = Field(
        default=None,
        description=(
            "The maximum number of concurrent requests to schedule. "
            "If set to None, the concurrency value from settings will be used. "
            "This must be a positive integer greater than 0."
        ),
        gt=0,
    )

    @property
    def processing_mode(self) -> Literal["async"]:
        """
        The processing mode for the scheduling strategy, either 'sync' or 'async'.
        This property determines how the worker processes are setup:
        either to run synchronously with one request at a time or asynchronously.

        :return: 'async' for asynchronous scheduling strategy
            for the multiple worker processes handling requests.
        """
        return "async"

    @property
    def queued_requests_limit(self) -> int:
        """
        The maximum number of queued requests for the scheduling strategy.
        It determines how many requests can be queued at one time
        for the scheduling strategy and must be implemented by subclasses.

        :return: The processing requests limit to ensure that there are enough
            requests even for the worst case scenario where the max concurrent
            requests are pulled at once for processing.
        """
        return self.processing_requests_limit

    @property
    def processing_requests_limit(self) -> int:
        """
        The maximum number of processing requests for the scheduling strategy.
        It determines how many requests can be processed at one time
        for the scheduling strategy and must be implemented by subclasses.

        :return: {self.max_concurrency} for the throughput scheduling strategy to limit
            the processing requests to the maximum concurrency.
            If max_concurrency is None, then the default processing requests limit
            will be used.
        """
        return self.max_concurrency or super().processing_requests_limit

    def request_times(self) -> Generator[float, None, None]:
        """
        A generator that yields the start time.time() so requests are sent
        immediately, while scheduling as many asynchronously as possible.

        :return: A generator that yields the start time.time()
            for immediate request scheduling.
        """
        start_time = time.time()

        while True:
            yield start_time

processing_mode property

The processing mode for the scheduling strategy, either 'sync' or 'async'. This property determines how the worker processes are setup: either to run synchronously with one request at a time or asynchronously.

Returns:

Type Description
Literal['async']

'async' for asynchronous scheduling strategy for the multiple worker processes handling requests.

processing_requests_limit property

The maximum number of processing requests for the scheduling strategy. It determines how many requests can be processed at one time for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

{self.max_concurrency} for the throughput scheduling strategy to limit the processing requests to the maximum concurrency. If max_concurrency is None, then the default processing requests limit will be used.

queued_requests_limit property

The maximum number of queued requests for the scheduling strategy. It determines how many requests can be queued at one time for the scheduling strategy and must be implemented by subclasses.

Returns:

Type Description
int

The processing requests limit to ensure that there are enough requests even for the worst case scenario where the max concurrent requests are pulled at once for processing.

request_times()

A generator that yields the start time.time() so requests are sent immediately, while scheduling as many asynchronously as possible.

Returns:

Type Description
Generator[float, None, None]

A generator that yields the start time.time() for immediate request scheduling.

Source code in src/guidellm/scheduler/strategy.py
def request_times(self) -> Generator[float, None, None]:
    """
    A generator that yields the start time.time() so requests are sent
    immediately, while scheduling as many asynchronously as possible.

    :return: A generator that yields the start time.time()
        for immediate request scheduling.
    """
    start_time = time.time()

    while True:
        yield start_time