Skip to content

llmcompressor.core

CompressionLifecycle dataclass

A class for managing the lifecycle of compression events in the LLM Compressor.

Parameters:

Name Type Description Default
state State

The current state of the compression process

State()
recipe_container RecipeContainer

The container for the compression recipe

RecipeContainer()
modifiers List[StageModifiers]

The list of stage modifiers

list()
Source code in src/llmcompressor/core/lifecycle.py
@dataclass
class CompressionLifecycle:
    """
    A class for managing the lifecycle of compression events in the LLM Compressor.

    :param state: The current state of the compression process
    :type state: Optional[State]
    :param recipe_container: The container for the compression recipe
    :type recipe_container: RecipeContainer
    :param modifiers: The list of stage modifiers
    :type modifiers: List[StageModifiers]
    """

    state: State = field(default_factory=State)
    recipe_container: RecipeContainer = field(default_factory=RecipeContainer)
    modifiers: List[StageModifiers] = field(default_factory=list)

    initialized_: bool = False
    finalized: bool = False

    # event order validation
    _last_event_type: Optional[EventType] = EventType.BATCH_END
    _event_order: List[EventType] = field(
        default_factory=lambda: [
            EventType.BATCH_START,
            EventType.LOSS_CALCULATED,
            EventType.OPTIM_PRE_STEP,
            EventType.OPTIM_POST_STEP,
            EventType.BATCH_END,
        ]
    )

    # track global step in training (could be epoch/batch)
    global_step: int = 0

    def reset(self):
        """
        Reset the compression lifecycle, finalizing any active modifiers
        and resetting all attributes.
        """
        logger.debug("Resetting compression lifecycle")

        for mod in self.modifiers:
            if not mod.initialized or mod.finalized:
                continue
            try:
                mod.finalize(self.state)
                logger.debug("Finalized modifier: {}", mod)
            except Exception as e:
                logger.warning(f"Exception during finalizing modifier: {e}")

        self.__init__()
        logger.info("Compression lifecycle reset")

    def initialize(
        self,
        recipe: Optional[RecipeInput] = None,
        recipe_stage: Optional[RecipeStageInput] = None,
        recipe_args: Optional[RecipeArgsInput] = None,
        **kwargs,
    ) -> List[Any]:
        """
        Initialize the compression lifecycle.

        :param kwargs: Additional arguments to update the state with
        :return: List of data returned from initialization of modifiers
        :rtype: List[Any]
        """
        self.state.update(**kwargs)
        if self.initialized_:  # TODO: do not initialize twice
            return

        logger.debug("Initializing compression lifecycle")
        self.recipe_container.append(recipe, recipe_stage, recipe_args)
        self.modifiers = self.recipe_container.get_modifiers()

        mod_data = []
        for mod in self.modifiers:
            data = mod.initialize(state=self.state, **kwargs)
            logger.debug("Initialized modifier: {}", mod)
            if data is not None:
                mod_data.append(data)

        self.initialized_ = True
        logger.info(
            "Compression lifecycle initialized for {} modifiers", len(self.modifiers)
        )

        return mod_data

    def finalize(self, **kwargs) -> List[Any]:
        """
        Finalize the compression lifecycle.

        :param kwargs: Additional arguments to update the state with
        :return: List of data returned from finalizing modifiers
        :rtype: List[Any]
        :raises ValueError: If called before initialization or more than once
        """
        if not self.initialized_:
            logger.error("Cannot finalize before initializing")
            raise ValueError("Cannot finalize before initializing")

        if self.finalized:
            logger.error("Cannot finalize more than once")
            raise ValueError("Cannot finalize more than once")

        logger.debug("Finalizing compression lifecycle")
        mod_data = []
        for mod in self.modifiers:
            data = mod.finalize(state=self.state, **kwargs)
            logger.debug("Finalized modifier: {}", mod)
            if data is not None:
                mod_data.append(data)

        self.finalized = True
        applied_stage_names = [mod.unique_id for mod in self.modifiers if mod.applied]
        self.recipe_container.update_applied_stages(applied_stage_names)

        logger.info(
            "Compression lifecycle finalized for {} modifiers", len(self.modifiers)
        )

        return mod_data

    def event(
        self, event_type: EventType, global_step: Optional[int] = 0, **kwargs
    ) -> List[Any]:
        """
        Handle a compression event.

        :param event_type: The type of event to handle
        :type event_type: EventType
        :param kwargs: Additional arguments to pass to the event handlers
        :return: List of data returned from handling the event by modifiers
        :rtype: List[Any]
        :raises ValueError: If called before initialization, after finalization,
            or for an invalid event type
        """
        if not self.initialized_:
            logger.error("Cannot invoke event before initializing")
            raise ValueError("Cannot invoke event before initializing")

        if self.finalized:
            logger.error("Cannot invoke event after finalizing")
            raise ValueError("Cannot invoke event after finalizing")

        if event_type in [EventType.INITIALIZE, EventType.FINALIZE]:
            logger.error(
                "Cannot invoke {} event. Use the corresponding method instead.",
                event_type,
            )
            raise ValueError(
                f"Cannot invoke {event_type} event. "
                f"Use the corresponding method instead."
            )

        if not self._validate_event_order(event_type):
            raise ValueError(
                f"Lifecycle events must appear following order: {self._event_order}. "
                f"Instead, {self._last_event_type} was called before {event_type}"
            )

        if event_type == EventType.LOSS_CALCULATED and (
            "loss" not in kwargs or kwargs["loss"] is None
        ):
            logger.error("Loss must be provided for loss calculated event")
            raise ValueError("Loss must be provided for loss calculated event")

        logger.debug("Handling event: {}", event_type)

        # update global step
        if global_step is not None:
            self.global_step = global_step

        event = Event(type_=event_type)
        mod_data = []
        for mod in self.modifiers:
            data = mod.update_event(state=self.state, event=event, **kwargs)
            logger.debug("Updated event with modifier: {}", mod)
            if data is not None:
                mod_data.append(data)

        assert (
            event is not None
        ), f"Event lifecycle did not return an event for {event_type}"

        return mod_data

    def _validate_event_order(self, event_type: EventType) -> bool:
        if event_type not in self._event_order:
            # for unhandled events, do not save last event
            return True

        if event_type == EventType.BATCH_START:
            valid = self._last_event_type != EventType.BATCH_START

        else:
            last_event_index = self._event_order.index(self._last_event_type)
            curr_event_index = self._event_order.index(event_type)
            valid = last_event_index <= curr_event_index

        if valid:
            self._last_event_type = event_type
        return valid

event(event_type, global_step=0, **kwargs)

Handle a compression event.

Parameters:

Name Type Description Default
event_type EventType

The type of event to handle

required
kwargs

Additional arguments to pass to the event handlers

{}

Returns:

Type Description
List[Any]

List of data returned from handling the event by modifiers

Raises:

Type Description
ValueError

If called before initialization, after finalization, or for an invalid event type

Source code in src/llmcompressor/core/lifecycle.py
def event(
    self, event_type: EventType, global_step: Optional[int] = 0, **kwargs
) -> List[Any]:
    """
    Handle a compression event.

    :param event_type: The type of event to handle
    :type event_type: EventType
    :param kwargs: Additional arguments to pass to the event handlers
    :return: List of data returned from handling the event by modifiers
    :rtype: List[Any]
    :raises ValueError: If called before initialization, after finalization,
        or for an invalid event type
    """
    if not self.initialized_:
        logger.error("Cannot invoke event before initializing")
        raise ValueError("Cannot invoke event before initializing")

    if self.finalized:
        logger.error("Cannot invoke event after finalizing")
        raise ValueError("Cannot invoke event after finalizing")

    if event_type in [EventType.INITIALIZE, EventType.FINALIZE]:
        logger.error(
            "Cannot invoke {} event. Use the corresponding method instead.",
            event_type,
        )
        raise ValueError(
            f"Cannot invoke {event_type} event. "
            f"Use the corresponding method instead."
        )

    if not self._validate_event_order(event_type):
        raise ValueError(
            f"Lifecycle events must appear following order: {self._event_order}. "
            f"Instead, {self._last_event_type} was called before {event_type}"
        )

    if event_type == EventType.LOSS_CALCULATED and (
        "loss" not in kwargs or kwargs["loss"] is None
    ):
        logger.error("Loss must be provided for loss calculated event")
        raise ValueError("Loss must be provided for loss calculated event")

    logger.debug("Handling event: {}", event_type)

    # update global step
    if global_step is not None:
        self.global_step = global_step

    event = Event(type_=event_type)
    mod_data = []
    for mod in self.modifiers:
        data = mod.update_event(state=self.state, event=event, **kwargs)
        logger.debug("Updated event with modifier: {}", mod)
        if data is not None:
            mod_data.append(data)

    assert (
        event is not None
    ), f"Event lifecycle did not return an event for {event_type}"

    return mod_data

finalize(**kwargs)

Finalize the compression lifecycle.

Parameters:

Name Type Description Default
kwargs

Additional arguments to update the state with

{}

Returns:

Type Description
List[Any]

List of data returned from finalizing modifiers

Raises:

Type Description
ValueError

If called before initialization or more than once

Source code in src/llmcompressor/core/lifecycle.py
def finalize(self, **kwargs) -> List[Any]:
    """
    Finalize the compression lifecycle.

    :param kwargs: Additional arguments to update the state with
    :return: List of data returned from finalizing modifiers
    :rtype: List[Any]
    :raises ValueError: If called before initialization or more than once
    """
    if not self.initialized_:
        logger.error("Cannot finalize before initializing")
        raise ValueError("Cannot finalize before initializing")

    if self.finalized:
        logger.error("Cannot finalize more than once")
        raise ValueError("Cannot finalize more than once")

    logger.debug("Finalizing compression lifecycle")
    mod_data = []
    for mod in self.modifiers:
        data = mod.finalize(state=self.state, **kwargs)
        logger.debug("Finalized modifier: {}", mod)
        if data is not None:
            mod_data.append(data)

    self.finalized = True
    applied_stage_names = [mod.unique_id for mod in self.modifiers if mod.applied]
    self.recipe_container.update_applied_stages(applied_stage_names)

    logger.info(
        "Compression lifecycle finalized for {} modifiers", len(self.modifiers)
    )

    return mod_data

initialize(recipe=None, recipe_stage=None, recipe_args=None, **kwargs)

Initialize the compression lifecycle.

Parameters:

Name Type Description Default
kwargs

Additional arguments to update the state with

{}

Returns:

Type Description
List[Any]

List of data returned from initialization of modifiers

Source code in src/llmcompressor/core/lifecycle.py
def initialize(
    self,
    recipe: Optional[RecipeInput] = None,
    recipe_stage: Optional[RecipeStageInput] = None,
    recipe_args: Optional[RecipeArgsInput] = None,
    **kwargs,
) -> List[Any]:
    """
    Initialize the compression lifecycle.

    :param kwargs: Additional arguments to update the state with
    :return: List of data returned from initialization of modifiers
    :rtype: List[Any]
    """
    self.state.update(**kwargs)
    if self.initialized_:  # TODO: do not initialize twice
        return

    logger.debug("Initializing compression lifecycle")
    self.recipe_container.append(recipe, recipe_stage, recipe_args)
    self.modifiers = self.recipe_container.get_modifiers()

    mod_data = []
    for mod in self.modifiers:
        data = mod.initialize(state=self.state, **kwargs)
        logger.debug("Initialized modifier: {}", mod)
        if data is not None:
            mod_data.append(data)

    self.initialized_ = True
    logger.info(
        "Compression lifecycle initialized for {} modifiers", len(self.modifiers)
    )

    return mod_data

reset()

Reset the compression lifecycle, finalizing any active modifiers and resetting all attributes.

Source code in src/llmcompressor/core/lifecycle.py
def reset(self):
    """
    Reset the compression lifecycle, finalizing any active modifiers
    and resetting all attributes.
    """
    logger.debug("Resetting compression lifecycle")

    for mod in self.modifiers:
        if not mod.initialized or mod.finalized:
            continue
        try:
            mod.finalize(self.state)
            logger.debug("Finalized modifier: {}", mod)
        except Exception as e:
            logger.warning(f"Exception during finalizing modifier: {e}")

    self.__init__()
    logger.info("Compression lifecycle reset")

CompressionSession

A session for compression that holds the lifecycle and state for the current compression session

Source code in src/llmcompressor/core/session.py
class CompressionSession:
    """
    A session for compression that holds the lifecycle
    and state for the current compression session
    """

    def __init__(self):
        self._lifecycle = CompressionLifecycle()

    @property
    def lifecycle(self) -> CompressionLifecycle:
        """
        Lifecycle is used to keep track of where we are in the compression
        process and what modifiers are active. It also provides the ability
        to invoke events on the lifecycle.

        :return: the lifecycle for the session
        """
        return self._lifecycle

    @property
    def state(self) -> State:
        """
        State of the current compression session. State instance
        is used to store all information such as the recipe, model
        optimizer, data, etc. that is needed for compression.

        :return: the current state of the session
        """
        return self._lifecycle.state

    def initialize(
        self,
        recipe: Union[str, List[str], "Recipe", List["Recipe"], None] = None,
        recipe_stage: Union[str, List[str], None] = None,
        recipe_args: Union[Dict[str, Any], None] = None,
        model: Optional[Any] = None,
        teacher_model: Optional[Any] = None,
        optimizer: Optional[Any] = None,
        attach_optim_callbacks: bool = True,
        train_data: Optional[Any] = None,
        val_data: Optional[Any] = None,
        test_data: Optional[Any] = None,
        calib_data: Optional[Any] = None,
        copy_data: bool = True,
        start: Optional[float] = None,
        steps_per_epoch: Optional[int] = None,
        batches_per_step: Optional[int] = None,
        loggers: Union[None, LoggerManager, List[BaseLogger]] = None,
        **kwargs,
    ) -> ModifiedState:
        """
        Initialize the session for compression. This will run the initialize method
        for each modifier in the session's lifecycle. This will also set the session's
        state to the initialized state.

        :param recipe: the recipe to use for the compression, can be a path to a
            recipe file, a raw recipe string, a recipe object, or a list
            of recipe objects.
        :param recipe_stage: the stage to target for the compression
        :param recipe_args: the args to use for overriding the recipe defaults
        :param model: the model to compress
        :param teacher_model: the teacher model to use for knowledge distillation
        :param optimizer: the optimizer to use for the compression
        :param attach_optim_callbacks: True to attach the optimizer callbacks to the
            compression lifecycle, False otherwise
        :param train_data: the training data to use for the compression
        :param val_data: the validation data to use for the compression
        :param test_data: the testing data to use for the compression
        :param calib_data: the calibration data to use for the compression
        :param copy_data: True to copy the data, False otherwise
        :param start: the start epoch to use for the compression
        :param steps_per_epoch: the number of steps per epoch to use for the
            compression
        :param batches_per_step: the number of batches per step to use for
            compression
        :param loggers: the metrics manager to setup logging important info
            and milestones to, also accepts a list of BaseLogger(s)
        :param kwargs: additional kwargs to pass to the lifecycle's initialize method
        :return: the modified state of the session after initializing
        """
        mod_data = self._lifecycle.initialize(
            recipe=recipe,
            recipe_stage=recipe_stage,
            recipe_args=recipe_args,
            model=model,
            teacher_model=teacher_model,
            optimizer=optimizer,
            attach_optim_callbacks=attach_optim_callbacks,
            train_data=train_data,
            val_data=val_data,
            test_data=test_data,
            calib_data=calib_data,
            copy_data=copy_data,
            start=start,
            steps_per_epoch=steps_per_epoch,
            batches_per_step=batches_per_step,
            loggers=loggers,
            **kwargs,
        )

        return ModifiedState(
            model=self.state.model,
            optimizer=self.state.optimizer,
            loss=self.state.loss,
            modifier_data=mod_data,
        )

    def finalize(self, **kwargs) -> ModifiedState:
        """
        Finalize the session for compression. This will run the finalize method
        for each modifier in the session's lifecycle. This will also set the session's
        state to the finalized state.

        :param kwargs: additional kwargs to pass to the lifecycle's finalize method
        :return: the modified state of the session after finalizing
        """
        mod_data = self._lifecycle.finalize(**kwargs)

        return ModifiedState(
            model=self.state.model,
            optimizer=self.state.optimizer,
            loss=self.state.loss,
            modifier_data=mod_data,
        )

    def event(
        self,
        event_type: EventType,
        batch_data: Optional[Any] = None,
        loss: Optional[Any] = None,
        **kwargs,
    ) -> ModifiedState:
        """
        Invoke an event for current CompressionSession.

        :param event_type: the event type to invoke
        :param batch_data: the batch data to use for the event
        :param loss: the loss to use for the event if any
        :param kwargs: additional kwargs to pass to the lifecycle's event method
        :return: the modified state of the session after invoking the event
        """
        mod_data = self._lifecycle.event(
            event_type=event_type, batch_data=batch_data, loss=loss, **kwargs
        )
        return ModifiedState(
            model=self.state.model,
            optimizer=self.state.optimizer,
            loss=self.state.loss,  # TODO: is this supposed to be a different type?
            modifier_data=mod_data,
        )

    def log(self, event_type: EventType, loss: Optional[Any] = None):
        """
        Log model and loss information for the current event type

        :param event_type: the event type to log for
        :param loss: the loss to log if any
        """
        self._log_model_info()
        self._log_loss(event_type=event_type, loss=loss)

    def reset(self):
        """
        Reset the session to its initial state
        """
        self._lifecycle.reset()

    def reset_stage(self):
        """
        Reset the session for starting a new stage, recipe and model stays intact
        """
        self.lifecycle.initialized_ = False
        self.lifecycle.finalized = False

    def get_serialized_recipe(self) -> Optional[str]:
        """
        :return: serialized string of the current compiled recipe
        """
        recipe = self.lifecycle.recipe_container.compiled_recipe

        if recipe is not None and hasattr(recipe, "yaml"):
            return recipe.yaml()

        logger.warning("Recipe not found in session - it may have been reset")

    def get_modifiers(self):
        """
        Get all modifiers across all stages
        """
        stage_modifiers = self.lifecycle.modifiers
        return [
            modifier
            for stage_modifier in stage_modifiers
            for modifier in stage_modifier.modifiers
        ]  # noqa: E127

    def _log_model_info(self):
        # Log model level logs if cadence reached
        current_index = self._lifecycle.global_step

        if (
            should_log_model_info(
                model=self.state.model,
                loggers=self.state.loggers,
                current_log_step=current_index,
                last_log_step=self.state._last_log_step,
            )
            and self.state.loggers.frequency_manager.is_epoch_frequency_manager
        ):
            log_model_info(
                state=self.state,
                current_log_step=current_index,
            )
            # update last log epoch
            self.state.loggers.log_written(current_index)

    def _log_loss(self, event_type: EventType, loss: Any):
        if event_type != EventType.LOSS_CALCULATED:
            # only log loss when loss is calculated
            return

        current_index = self._lifecycle.global_step

        # always log loss if available
        if loss is not None:
            loss = loss if isinstance(loss, dict) else {"loss": loss}
            self.state.loggers.metric.log_scalars(
                tag="Loss", values=loss, step=current_index
            )

lifecycle property

Lifecycle is used to keep track of where we are in the compression process and what modifiers are active. It also provides the ability to invoke events on the lifecycle.

Returns:

Type Description
CompressionLifecycle

the lifecycle for the session

state property

State of the current compression session. State instance is used to store all information such as the recipe, model optimizer, data, etc. that is needed for compression.

Returns:

Type Description
State

the current state of the session

event(event_type, batch_data=None, loss=None, **kwargs)

Invoke an event for current CompressionSession.

Parameters:

Name Type Description Default
event_type EventType

the event type to invoke

required
batch_data Optional[Any]

the batch data to use for the event

None
loss Optional[Any]

the loss to use for the event if any

None
kwargs

additional kwargs to pass to the lifecycle's event method

{}

Returns:

Type Description
ModifiedState

the modified state of the session after invoking the event

Source code in src/llmcompressor/core/session.py
def event(
    self,
    event_type: EventType,
    batch_data: Optional[Any] = None,
    loss: Optional[Any] = None,
    **kwargs,
) -> ModifiedState:
    """
    Invoke an event for current CompressionSession.

    :param event_type: the event type to invoke
    :param batch_data: the batch data to use for the event
    :param loss: the loss to use for the event if any
    :param kwargs: additional kwargs to pass to the lifecycle's event method
    :return: the modified state of the session after invoking the event
    """
    mod_data = self._lifecycle.event(
        event_type=event_type, batch_data=batch_data, loss=loss, **kwargs
    )
    return ModifiedState(
        model=self.state.model,
        optimizer=self.state.optimizer,
        loss=self.state.loss,  # TODO: is this supposed to be a different type?
        modifier_data=mod_data,
    )

finalize(**kwargs)

Finalize the session for compression. This will run the finalize method for each modifier in the session's lifecycle. This will also set the session's state to the finalized state.

Parameters:

Name Type Description Default
kwargs

additional kwargs to pass to the lifecycle's finalize method

{}

Returns:

Type Description
ModifiedState

the modified state of the session after finalizing

Source code in src/llmcompressor/core/session.py
def finalize(self, **kwargs) -> ModifiedState:
    """
    Finalize the session for compression. This will run the finalize method
    for each modifier in the session's lifecycle. This will also set the session's
    state to the finalized state.

    :param kwargs: additional kwargs to pass to the lifecycle's finalize method
    :return: the modified state of the session after finalizing
    """
    mod_data = self._lifecycle.finalize(**kwargs)

    return ModifiedState(
        model=self.state.model,
        optimizer=self.state.optimizer,
        loss=self.state.loss,
        modifier_data=mod_data,
    )

get_modifiers()

Get all modifiers across all stages

Source code in src/llmcompressor/core/session.py
def get_modifiers(self):
    """
    Get all modifiers across all stages
    """
    stage_modifiers = self.lifecycle.modifiers
    return [
        modifier
        for stage_modifier in stage_modifiers
        for modifier in stage_modifier.modifiers
    ]  # noqa: E127

get_serialized_recipe()

Returns:

Type Description
Optional[str]

serialized string of the current compiled recipe

Source code in src/llmcompressor/core/session.py
def get_serialized_recipe(self) -> Optional[str]:
    """
    :return: serialized string of the current compiled recipe
    """
    recipe = self.lifecycle.recipe_container.compiled_recipe

    if recipe is not None and hasattr(recipe, "yaml"):
        return recipe.yaml()

    logger.warning("Recipe not found in session - it may have been reset")

initialize(recipe=None, recipe_stage=None, recipe_args=None, model=None, teacher_model=None, optimizer=None, attach_optim_callbacks=True, train_data=None, val_data=None, test_data=None, calib_data=None, copy_data=True, start=None, steps_per_epoch=None, batches_per_step=None, loggers=None, **kwargs)

Initialize the session for compression. This will run the initialize method for each modifier in the session's lifecycle. This will also set the session's state to the initialized state.

Parameters:

Name Type Description Default
recipe Union[str, List[str], Recipe, List[Recipe], None]

the recipe to use for the compression, can be a path to a recipe file, a raw recipe string, a recipe object, or a list of recipe objects.

None
recipe_stage Union[str, List[str], None]

the stage to target for the compression

None
recipe_args Union[Dict[str, Any], None]

the args to use for overriding the recipe defaults

None
model Optional[Any]

the model to compress

None
teacher_model Optional[Any]

the teacher model to use for knowledge distillation

None
optimizer Optional[Any]

the optimizer to use for the compression

None
attach_optim_callbacks bool

True to attach the optimizer callbacks to the compression lifecycle, False otherwise

True
train_data Optional[Any]

the training data to use for the compression

None
val_data Optional[Any]

the validation data to use for the compression

None
test_data Optional[Any]

the testing data to use for the compression

None
calib_data Optional[Any]

the calibration data to use for the compression

None
copy_data bool

True to copy the data, False otherwise

True
start Optional[float]

the start epoch to use for the compression

None
steps_per_epoch Optional[int]

the number of steps per epoch to use for the compression

None
batches_per_step Optional[int]

the number of batches per step to use for compression

None
loggers Union[None, LoggerManager, List[BaseLogger]]

the metrics manager to setup logging important info and milestones to, also accepts a list of BaseLogger(s)

None
kwargs

additional kwargs to pass to the lifecycle's initialize method

{}

Returns:

Type Description
ModifiedState

the modified state of the session after initializing

Source code in src/llmcompressor/core/session.py
def initialize(
    self,
    recipe: Union[str, List[str], "Recipe", List["Recipe"], None] = None,
    recipe_stage: Union[str, List[str], None] = None,
    recipe_args: Union[Dict[str, Any], None] = None,
    model: Optional[Any] = None,
    teacher_model: Optional[Any] = None,
    optimizer: Optional[Any] = None,
    attach_optim_callbacks: bool = True,
    train_data: Optional[Any] = None,
    val_data: Optional[Any] = None,
    test_data: Optional[Any] = None,
    calib_data: Optional[Any] = None,
    copy_data: bool = True,
    start: Optional[float] = None,
    steps_per_epoch: Optional[int] = None,
    batches_per_step: Optional[int] = None,
    loggers: Union[None, LoggerManager, List[BaseLogger]] = None,
    **kwargs,
) -> ModifiedState:
    """
    Initialize the session for compression. This will run the initialize method
    for each modifier in the session's lifecycle. This will also set the session's
    state to the initialized state.

    :param recipe: the recipe to use for the compression, can be a path to a
        recipe file, a raw recipe string, a recipe object, or a list
        of recipe objects.
    :param recipe_stage: the stage to target for the compression
    :param recipe_args: the args to use for overriding the recipe defaults
    :param model: the model to compress
    :param teacher_model: the teacher model to use for knowledge distillation
    :param optimizer: the optimizer to use for the compression
    :param attach_optim_callbacks: True to attach the optimizer callbacks to the
        compression lifecycle, False otherwise
    :param train_data: the training data to use for the compression
    :param val_data: the validation data to use for the compression
    :param test_data: the testing data to use for the compression
    :param calib_data: the calibration data to use for the compression
    :param copy_data: True to copy the data, False otherwise
    :param start: the start epoch to use for the compression
    :param steps_per_epoch: the number of steps per epoch to use for the
        compression
    :param batches_per_step: the number of batches per step to use for
        compression
    :param loggers: the metrics manager to setup logging important info
        and milestones to, also accepts a list of BaseLogger(s)
    :param kwargs: additional kwargs to pass to the lifecycle's initialize method
    :return: the modified state of the session after initializing
    """
    mod_data = self._lifecycle.initialize(
        recipe=recipe,
        recipe_stage=recipe_stage,
        recipe_args=recipe_args,
        model=model,
        teacher_model=teacher_model,
        optimizer=optimizer,
        attach_optim_callbacks=attach_optim_callbacks,
        train_data=train_data,
        val_data=val_data,
        test_data=test_data,
        calib_data=calib_data,
        copy_data=copy_data,
        start=start,
        steps_per_epoch=steps_per_epoch,
        batches_per_step=batches_per_step,
        loggers=loggers,
        **kwargs,
    )

    return ModifiedState(
        model=self.state.model,
        optimizer=self.state.optimizer,
        loss=self.state.loss,
        modifier_data=mod_data,
    )

log(event_type, loss=None)

Log model and loss information for the current event type

Parameters:

Name Type Description Default
event_type EventType

the event type to log for

required
loss Optional[Any]

the loss to log if any

None
Source code in src/llmcompressor/core/session.py
def log(self, event_type: EventType, loss: Optional[Any] = None):
    """
    Log model and loss information for the current event type

    :param event_type: the event type to log for
    :param loss: the loss to log if any
    """
    self._log_model_info()
    self._log_loss(event_type=event_type, loss=loss)

reset()

Reset the session to its initial state

Source code in src/llmcompressor/core/session.py
def reset(self):
    """
    Reset the session to its initial state
    """
    self._lifecycle.reset()

reset_stage()

Reset the session for starting a new stage, recipe and model stays intact

Source code in src/llmcompressor/core/session.py
def reset_stage(self):
    """
    Reset the session for starting a new stage, recipe and model stays intact
    """
    self.lifecycle.initialized_ = False
    self.lifecycle.finalized = False

Data dataclass

A dataclass to hold different data sets for training, validation, testing, and/or calibration. Each data set is a ModifiableData instance.

Parameters:

Name Type Description Default
train Optional[Any]

The training data set

None
val Optional[Any]

The validation data set

None
test Optional[Any]

The testing data set

None
calib Optional[Any]

The calibration data set

None
Source code in src/llmcompressor/core/state.py
@dataclass
class Data:
    """
    A dataclass to hold different data sets for training, validation,
    testing, and/or calibration. Each data set is a ModifiableData instance.

    :param train: The training data set
    :type train: Optional[Any]
    :param val: The validation data set
    :type val: Optional[Any]
    :param test: The testing data set
    :type test: Optional[Any]
    :param calib: The calibration data set
    :type calib: Optional[Any]
    """

    train: Optional[Any] = None
    val: Optional[Any] = None
    test: Optional[Any] = None
    calib: Optional[Any] = None

Event dataclass

A class for defining an event that can be triggered during sparsification.

Parameters:

Name Type Description Default
type_ Optional[EventType]

The type of event.

None
steps_per_epoch Optional[int]

The number of steps per epoch.

None
batches_per_step Optional[int]

The number of batches per step where step is an optimizer step invocation. For most pathways, these are the same. See the invocations_per_step parameter for more details when they are not.

None
invocations_per_step int

The number of invocations of the step wrapper before optimizer.step was called. Generally can be left as 1 (default). For older amp pathways, this is the number of times the scaler wrapper was invoked before the wrapped optimizer step function was called to handle accumulation in fp16.

1
global_step int

The current global step.

0
global_batch int

The current global batch.

0
Source code in src/llmcompressor/core/events/event.py
@dataclass
class Event:
    """
    A class for defining an event that can be triggered during sparsification.

    :param type_: The type of event.
    :type type_: Optional[EventType]
    :param steps_per_epoch: The number of steps per epoch.
    :type steps_per_epoch: Optional[int]
    :param batches_per_step: The number of batches per step where step is an
        optimizer step invocation. For most pathways, these are the same.
        See the invocations_per_step parameter for more details when they are not.
    :type batches_per_step: Optional[int]
    :param invocations_per_step: The number of invocations of the step wrapper
        before optimizer.step was called. Generally can be left as 1 (default).
        For older amp pathways, this is the number of times the scaler wrapper
        was invoked before the wrapped optimizer step function was called to
        handle accumulation in fp16.
    :type invocations_per_step: Optional[int]
    :param global_step: The current global step.
    :type global_step: int
    :param global_batch: The current global batch.
    :type global_batch: int
    """

    type_: Optional[EventType] = None
    steps_per_epoch: Optional[int] = None
    batches_per_step: Optional[int] = None
    invocations_per_step: int = 1
    global_step: int = 0
    global_batch: int = 0

    @property
    def epoch_based(self) -> bool:
        """
        Determines if the event is based on epochs.

        :return: True if the event is based on epochs, False otherwise.
        :rtype: bool
        """
        return self.steps_per_epoch is not None

    @property
    def epoch(self) -> int:
        """
        Calculates the current epoch.

        :raises ValueError: if the event is not epoch based.
        :return: The current epoch.
        :rtype: int
        """
        if not self.epoch_based:
            logger.error("Attempt to access epoch for a non-epoch based event")
            raise ValueError("Event is not epoch based")
        return self.global_step // self.steps_per_epoch

    @property
    def epoch_full(self) -> float:
        """
        Calculates the current epoch with the fraction of the current step.

        :raises ValueError: if the event is not epoch based.
        :return: The current epoch with the fraction of the current step.
        :rtype: float
        """
        if not self.epoch_based:
            logger.error("Attempt to access epoch_full for a non-epoch based event")
            raise ValueError("Event is not epoch based")
        return self.global_step / float(self.steps_per_epoch)

    @property
    def epoch_step(self) -> int:
        """
        Calculates the current step within the current epoch.

        :raises ValueError: if the event is not epoch based.
        :return: The current step within the current epoch.
        :rtype: int
        """
        if not self.epoch_based:
            logger.error("Attempt to access epoch_step for a non-epoch based event")
            raise ValueError("Event is not epoch based")
        return self.global_step % self.steps_per_epoch

    @property
    def epoch_batch(self) -> int:
        """
        Calculates the current batch within the current epoch.

        :raises ValueError: if the event is not epoch based.
        :return: The current batch within the current epoch.
        :rtype: int
        """
        if not self.epoch_based:
            logger.error("Attempt to access epoch_batch for a non-epoch based event")
            raise ValueError("Event is not epoch based")
        batches_per_epoch = (
            self.steps_per_epoch * self.batches_per_step
            if self.batches_per_step
            else self.steps_per_epoch
        )
        return self.global_batch % batches_per_epoch

    @property
    def current_index(self) -> float:
        """
        Calculates the current index of the event.

        :raises ValueError: if the event is not epoch based or
            if the steps per epoch are too many.
        :return: The current index of the event, which is either the global step
            or the epoch with the fraction of the current step.
        :rtype: float
        """
        if not self.epoch_based:
            return self.global_step
        epoch_full = self.epoch_full
        if epoch_full - self.epoch > 1.0:
            logger.error("Too many steps per epoch for epoch based event")
            raise ValueError("Too many steps per epoch for epoch based event")
        return epoch_full

    @current_index.setter
    def current_index(self, value: float):
        """
        Sets the current index of the event.

        :param value: The current index value.
        :type value: float
        """
        logger.debug("Setting current index: {}", value)
        if not self.epoch_based:
            self.global_step = int(value)
            self.global_batch = (
                self.global_step
                if self.batches_per_step is None or self.batches_per_step < 2
                else self.global_step * self.batches_per_step
            )
        else:
            self.global_step = int(value * self.steps_per_epoch)
            self.global_batch = (
                self.global_step
                if self.batches_per_step is None or self.batches_per_step < 2
                else self.global_step * self.batches_per_step
            )

    def should_update(
        self, start: Optional[float], end: Optional[float], update: Optional[float]
    ) -> bool:
        """
        Determines if the event should trigger an update.

        :param start: The start index to check against, set to None to ignore start.
        :type start: Optional[float]
        :param end: The end index to check against, set to None to ignore end.
        :type end: Optional[float]
        :param update: The update interval, set to None or 0.0 to always update,
            otherwise must be greater than 0.0, defaults to None.
        :type update: Optional[float]
        :return: True if the event should trigger an update, False otherwise.
        :rtype: bool
        """
        current = self.current_index
        logger.debug(
            "Checking if event should update: "
            "current_index={}, start={}, end={}, update={}",
            current,
            start,
            end,
            update,
        )
        if start is not None and current < start:
            return False
        if end is not None and current > end:
            return False
        return update is None or update <= 0.0 or current % update < 1e-10

    def new_instance(self, **kwargs) -> "Event":
        """
        Creates a new instance of the event with the provided keyword arguments.

        :param kwargs: Keyword arguments to set in the new instance.
        :return: A new instance of the event with the provided kwargs.
        :rtype: Event
        """
        logger.debug("Creating new instance of event with kwargs: {}", kwargs)
        instance = deepcopy(self)
        for key, value in kwargs.items():
            setattr(instance, key, value)
        return instance

current_index property writable

Calculates the current index of the event.

Returns:

Type Description
float

The current index of the event, which is either the global step or the epoch with the fraction of the current step.

Raises:

Type Description
ValueError

if the event is not epoch based or if the steps per epoch are too many.

epoch property

Calculates the current epoch.

Returns:

Type Description
int

The current epoch.

Raises:

Type Description
ValueError

if the event is not epoch based.

epoch_based property

Determines if the event is based on epochs.

Returns:

Type Description
bool

True if the event is based on epochs, False otherwise.

epoch_batch property

Calculates the current batch within the current epoch.

Returns:

Type Description
int

The current batch within the current epoch.

Raises:

Type Description
ValueError

if the event is not epoch based.

epoch_full property

Calculates the current epoch with the fraction of the current step.

Returns:

Type Description
float

The current epoch with the fraction of the current step.

Raises:

Type Description
ValueError

if the event is not epoch based.

epoch_step property

Calculates the current step within the current epoch.

Returns:

Type Description
int

The current step within the current epoch.

Raises:

Type Description
ValueError

if the event is not epoch based.

new_instance(**kwargs)

Creates a new instance of the event with the provided keyword arguments.

Parameters:

Name Type Description Default
kwargs

Keyword arguments to set in the new instance.

{}

Returns:

Type Description
Event

A new instance of the event with the provided kwargs.

Source code in src/llmcompressor/core/events/event.py
def new_instance(self, **kwargs) -> "Event":
    """
    Creates a new instance of the event with the provided keyword arguments.

    :param kwargs: Keyword arguments to set in the new instance.
    :return: A new instance of the event with the provided kwargs.
    :rtype: Event
    """
    logger.debug("Creating new instance of event with kwargs: {}", kwargs)
    instance = deepcopy(self)
    for key, value in kwargs.items():
        setattr(instance, key, value)
    return instance

should_update(start, end, update)

Determines if the event should trigger an update.

Parameters:

Name Type Description Default
start Optional[float]

The start index to check against, set to None to ignore start.

required
end Optional[float]

The end index to check against, set to None to ignore end.

required
update Optional[float]

The update interval, set to None or 0.0 to always update, otherwise must be greater than 0.0, defaults to None.

required

Returns:

Type Description
bool

True if the event should trigger an update, False otherwise.

Source code in src/llmcompressor/core/events/event.py
def should_update(
    self, start: Optional[float], end: Optional[float], update: Optional[float]
) -> bool:
    """
    Determines if the event should trigger an update.

    :param start: The start index to check against, set to None to ignore start.
    :type start: Optional[float]
    :param end: The end index to check against, set to None to ignore end.
    :type end: Optional[float]
    :param update: The update interval, set to None or 0.0 to always update,
        otherwise must be greater than 0.0, defaults to None.
    :type update: Optional[float]
    :return: True if the event should trigger an update, False otherwise.
    :rtype: bool
    """
    current = self.current_index
    logger.debug(
        "Checking if event should update: "
        "current_index={}, start={}, end={}, update={}",
        current,
        start,
        end,
        update,
    )
    if start is not None and current < start:
        return False
    if end is not None and current > end:
        return False
    return update is None or update <= 0.0 or current % update < 1e-10

EventType

Bases: Enum

An Enum for defining the different types of events that can be triggered during model compression lifecycles. The purpose of each EventType is to trigger the corresponding modifier callback during training or post training pipelines.

Parameters:

Name Type Description Default
INITIALIZE

Event type for initialization.

required
FINALIZE

Event type for finalization.

required
BATCH_START

Event type for the start of a batch.

required
LOSS_CALCULATED

Event type for when loss is calculated.

required
BATCH_END

Event type for the end of a batch.

required
CALIBRATION_EPOCH_START

Event type for the start of a calibration epoch.

required
SEQUENTIAL_EPOCH_END

Event type for the end of a layer calibration epoch, specifically used by src/llmcompressor/pipelines/sequential/pipeline.py

required
CALIBRATION_EPOCH_END

Event type for the end of a calibration epoch.

required
OPTIM_PRE_STEP

Event type for pre-optimization step.

required
OPTIM_POST_STEP

Event type for post-optimization step.

required
Source code in src/llmcompressor/core/events/event.py
@unique
class EventType(Enum):
    """
    An Enum for defining the different types of events that can be triggered
    during model compression lifecycles.
    The purpose of each EventType is to trigger the corresponding
    modifier callback during training or post training pipelines.

    :param INITIALIZE: Event type for initialization.
    :param FINALIZE: Event type for finalization.
    :param BATCH_START: Event type for the start of a batch.
    :param LOSS_CALCULATED: Event type for when loss is calculated.
    :param BATCH_END: Event type for the end of a batch.
    :param CALIBRATION_EPOCH_START: Event type for the start of a calibration epoch.
    :param SEQUENTIAL_EPOCH_END: Event type for the end of a layer calibration epoch,
        specifically used by `src/llmcompressor/pipelines/sequential/pipeline.py`
    :param CALIBRATION_EPOCH_END: Event type for the end of a calibration epoch.
    :param OPTIM_PRE_STEP: Event type for pre-optimization step.
    :param OPTIM_POST_STEP: Event type for post-optimization step.
    """

    # training lifecycle
    INITIALIZE = "initialize"
    FINALIZE = "finalize"

    # batch lifecycle
    BATCH_START = "batch_start"
    LOSS_CALCULATED = "loss_calculated"
    BATCH_END = "batch_end"

    # calibration lifecycle
    CALIBRATION_EPOCH_START = "calibration_epoch_start"
    SEQUENTIAL_EPOCH_END = "sequential_epoch_end"
    CALIBRATION_EPOCH_END = "calibration_epoch_end"

    # step lifecycle
    OPTIM_PRE_STEP = "optim_pre_step"
    OPTIM_POST_STEP = "optim_post_step"

Hardware dataclass

A dataclass to hold information about the hardware being used.

Parameters:

Name Type Description Default
device Optional[str]

The current device being used for training

None
devices Optional[List[str]]

List of all devices to be used for training

None
rank Optional[int]

The rank of the current device

None
world_size Optional[int]

The total number of devices being used

None
local_rank Optional[int]

The local rank of the current device

None
local_world_size Optional[int]

The total number of devices being used on the local machine

None
distributed Optional[bool]

Whether or not distributed training is being used

None
distributed_strategy Optional[str]

The distributed strategy being used

None
Source code in src/llmcompressor/core/state.py
@dataclass
class Hardware:
    """
    A dataclass to hold information about the hardware being used.

    :param device: The current device being used for training
    :type device: Optional[str]
    :param devices: List of all devices to be used for training
    :type devices: Optional[List[str]]
    :param rank: The rank of the current device
    :type rank: Optional[int]
    :param world_size: The total number of devices being used
    :type world_size: Optional[int]
    :param local_rank: The local rank of the current device
    :type local_rank: Optional[int]
    :param local_world_size: The total number of devices being used on the local machine
    :type local_world_size: Optional[int]
    :param distributed: Whether or not distributed training is being used
    :type distributed: Optional[bool]
    :param distributed_strategy: The distributed strategy being used
    :type distributed_strategy: Optional[str]
    """

    device: Optional[str] = None
    devices: Optional[List[str]] = None
    rank: Optional[int] = None
    world_size: Optional[int] = None
    local_rank: Optional[int] = None
    local_world_size: Optional[int] = None
    distributed: Optional[bool] = None
    distributed_strategy: Optional[str] = None

LifecycleCallbacks

A class for invoking lifecycle events for the active session

Source code in src/llmcompressor/core/session_functions.py
class LifecycleCallbacks:
    """
    A class for invoking lifecycle events for the active session
    """

    @classmethod
    def event(cls, event_type: EventType, **kwargs) -> ModifiedState:
        """
        Invoke an event for the active session

        :param event_type: the event type to invoke
        :param kwargs: additional kwargs to pass to the current session's event method
        :return: the modified state of the active session after invoking the event
        """
        if event_type in [EventType.INITIALIZE, EventType.FINALIZE]:
            raise ValueError(
                f"Cannot invoke {event_type} event. "
                f"Use the corresponding method instead."
            )

        # skip event callbacks if no recipe was provided
        if not active_session().lifecycle.recipe_container.check_any_recipe_exists():
            return

        return active_session().event(event_type, **kwargs)

    @classmethod
    def batch_start(cls, batch_data: Optional[Any] = None, **kwargs) -> ModifiedState:
        """
        Invoke a batch start event for the active session

        :param batch_data: the batch data to use for the event
        :param kwargs: additional kwargs to pass to the current session's event method
        :return: the modified state of the active session after invoking the event
        """
        return cls.event(EventType.BATCH_START, batch_data=batch_data, **kwargs)

    @classmethod
    def loss_calculated(cls, loss: Optional[Any] = None, **kwargs) -> ModifiedState:
        """
        Invoke a loss calculated event for the active session

        :param loss: the loss to use for the event
        :param kwargs: additional kwargs to pass to the current session's event method
        :return: the modified state of the active session after invoking the event
        """
        # log loss if loss calculated
        active_session()._log_loss(event_type=EventType.LOSS_CALCULATED, loss=loss)
        return cls.event(EventType.LOSS_CALCULATED, loss=loss, **kwargs)

    @classmethod
    def optim_pre_step(cls, **kwargs) -> ModifiedState:
        """
        Invoke an optimizer pre-step event for the active session

        :param kwargs: additional kwargs to pass to the current session's event method
        :return: the modified state of the active session after invoking the event
        """
        return cls.event(EventType.OPTIM_PRE_STEP, **kwargs)

    @classmethod
    def optim_post_step(cls, **kwargs) -> ModifiedState:
        """
        Invoke an optimizer post-step event for the active session

        :param kwargs: additional kwargs to pass to the current session's event method
        :return: the modified state of the active session after invoking the event
        """
        return cls.event(EventType.OPTIM_POST_STEP, **kwargs)

    @classmethod
    def batch_end(cls, **kwargs) -> ModifiedState:
        """
        Invoke a batch end event for the active session

        :param kwargs: additional kwargs to pass to the current session's event method
        :return: the modified state of the active session after invoking the event
        """
        active_session()._log_model_info()
        return cls.event(EventType.BATCH_END, **kwargs)

    @classmethod
    def calibration_epoch_start(cls, **kwargs) -> ModifiedState:
        """
        Invoke a epoch start event for the active session during calibration. This event
        should be called before calibration starts for one epoch

        see `src/llmcompressor/pipelines/basic/pipeline.py` for usage example
        """
        return cls.event(EventType.CALIBRATION_EPOCH_START, **kwargs)

    @classmethod
    def sequential_epoch_end(cls, **kwargs) -> ModifiedState:
        """
        Invoke a sequential epoch end event for the active session. This event should be
        called after one sequential layer has been calibrated/trained for one epoch

        This is called after a sequential layer has been calibrated with one batch, see
        `src/llmcompressor/pipelines/sequential/pipeline.py` for usage example
        """
        return cls.event(EventType.SEQUENTIAL_EPOCH_END, **kwargs)

    @classmethod
    def calibration_epoch_end(cls, **kwargs) -> ModifiedState:
        """
        Invoke a epoch end event for the active session during calibration. This event
        should be called after the model has been calibrated for one epoch

        see `src/llmcompressor/pipelines/basic/pipeline.py` for usage example
        """
        return cls.event(EventType.CALIBRATION_EPOCH_END, **kwargs)

batch_end(**kwargs) classmethod

Invoke a batch end event for the active session

Parameters:

Name Type Description Default
kwargs

additional kwargs to pass to the current session's event method

{}

Returns:

Type Description
ModifiedState

the modified state of the active session after invoking the event

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def batch_end(cls, **kwargs) -> ModifiedState:
    """
    Invoke a batch end event for the active session

    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    active_session()._log_model_info()
    return cls.event(EventType.BATCH_END, **kwargs)

batch_start(batch_data=None, **kwargs) classmethod

Invoke a batch start event for the active session

Parameters:

Name Type Description Default
batch_data Optional[Any]

the batch data to use for the event

None
kwargs

additional kwargs to pass to the current session's event method

{}

Returns:

Type Description
ModifiedState

the modified state of the active session after invoking the event

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def batch_start(cls, batch_data: Optional[Any] = None, **kwargs) -> ModifiedState:
    """
    Invoke a batch start event for the active session

    :param batch_data: the batch data to use for the event
    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    return cls.event(EventType.BATCH_START, batch_data=batch_data, **kwargs)

calibration_epoch_end(**kwargs) classmethod

Invoke a epoch end event for the active session during calibration. This event should be called after the model has been calibrated for one epoch

see src/llmcompressor/pipelines/basic/pipeline.py for usage example

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def calibration_epoch_end(cls, **kwargs) -> ModifiedState:
    """
    Invoke a epoch end event for the active session during calibration. This event
    should be called after the model has been calibrated for one epoch

    see `src/llmcompressor/pipelines/basic/pipeline.py` for usage example
    """
    return cls.event(EventType.CALIBRATION_EPOCH_END, **kwargs)

calibration_epoch_start(**kwargs) classmethod

Invoke a epoch start event for the active session during calibration. This event should be called before calibration starts for one epoch

see src/llmcompressor/pipelines/basic/pipeline.py for usage example

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def calibration_epoch_start(cls, **kwargs) -> ModifiedState:
    """
    Invoke a epoch start event for the active session during calibration. This event
    should be called before calibration starts for one epoch

    see `src/llmcompressor/pipelines/basic/pipeline.py` for usage example
    """
    return cls.event(EventType.CALIBRATION_EPOCH_START, **kwargs)

event(event_type, **kwargs) classmethod

Invoke an event for the active session

Parameters:

Name Type Description Default
event_type EventType

the event type to invoke

required
kwargs

additional kwargs to pass to the current session's event method

{}

Returns:

Type Description
ModifiedState

the modified state of the active session after invoking the event

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def event(cls, event_type: EventType, **kwargs) -> ModifiedState:
    """
    Invoke an event for the active session

    :param event_type: the event type to invoke
    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    if event_type in [EventType.INITIALIZE, EventType.FINALIZE]:
        raise ValueError(
            f"Cannot invoke {event_type} event. "
            f"Use the corresponding method instead."
        )

    # skip event callbacks if no recipe was provided
    if not active_session().lifecycle.recipe_container.check_any_recipe_exists():
        return

    return active_session().event(event_type, **kwargs)

loss_calculated(loss=None, **kwargs) classmethod

Invoke a loss calculated event for the active session

Parameters:

Name Type Description Default
loss Optional[Any]

the loss to use for the event

None
kwargs

additional kwargs to pass to the current session's event method

{}

Returns:

Type Description
ModifiedState

the modified state of the active session after invoking the event

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def loss_calculated(cls, loss: Optional[Any] = None, **kwargs) -> ModifiedState:
    """
    Invoke a loss calculated event for the active session

    :param loss: the loss to use for the event
    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    # log loss if loss calculated
    active_session()._log_loss(event_type=EventType.LOSS_CALCULATED, loss=loss)
    return cls.event(EventType.LOSS_CALCULATED, loss=loss, **kwargs)

optim_post_step(**kwargs) classmethod

Invoke an optimizer post-step event for the active session

Parameters:

Name Type Description Default
kwargs

additional kwargs to pass to the current session's event method

{}

Returns:

Type Description
ModifiedState

the modified state of the active session after invoking the event

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def optim_post_step(cls, **kwargs) -> ModifiedState:
    """
    Invoke an optimizer post-step event for the active session

    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    return cls.event(EventType.OPTIM_POST_STEP, **kwargs)

optim_pre_step(**kwargs) classmethod

Invoke an optimizer pre-step event for the active session

Parameters:

Name Type Description Default
kwargs

additional kwargs to pass to the current session's event method

{}

Returns:

Type Description
ModifiedState

the modified state of the active session after invoking the event

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def optim_pre_step(cls, **kwargs) -> ModifiedState:
    """
    Invoke an optimizer pre-step event for the active session

    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    return cls.event(EventType.OPTIM_PRE_STEP, **kwargs)

sequential_epoch_end(**kwargs) classmethod

Invoke a sequential epoch end event for the active session. This event should be called after one sequential layer has been calibrated/trained for one epoch

This is called after a sequential layer has been calibrated with one batch, see src/llmcompressor/pipelines/sequential/pipeline.py for usage example

Source code in src/llmcompressor/core/session_functions.py
@classmethod
def sequential_epoch_end(cls, **kwargs) -> ModifiedState:
    """
    Invoke a sequential epoch end event for the active session. This event should be
    called after one sequential layer has been calibrated/trained for one epoch

    This is called after a sequential layer has been calibrated with one batch, see
    `src/llmcompressor/pipelines/sequential/pipeline.py` for usage example
    """
    return cls.event(EventType.SEQUENTIAL_EPOCH_END, **kwargs)

ModelParameterizedLayer dataclass

A dataclass for holding a parameter and its layer

Parameters:

Name Type Description Default
layer_name str

the name of the layer

required
layer Any

the layer object

required
param_name str

the name of the parameter

required
param Any

the parameter object

required
Source code in src/llmcompressor/core/model_layer.py
@dataclass
class ModelParameterizedLayer:
    """
    A dataclass for holding a parameter and its layer

    :param layer_name: the name of the layer
    :param layer: the layer object
    :param param_name: the name of the parameter
    :param param: the parameter object
    """

    layer_name: str
    layer: Any
    param_name: str
    param: Any

ModifiedState dataclass

A dataclass to represent a modified model, optimizer, and loss.

Parameters:

Name Type Description Default
model Optional[Any]

The modified model

required
optimizer Optional[Any]

The modified optimizer

required
loss Optional[Any]

The modified loss

required
modifier_data Optional[List[Dict[str, Any]]]

The modifier data used to modify the model, optimizer, and loss

required
Source code in src/llmcompressor/core/state.py
@dataclass
class ModifiedState:
    """
    A dataclass to represent a modified model, optimizer, and loss.

    :param model: The modified model
    :type model: Optional[Any]
    :param optimizer: The modified optimizer
    :type optimizer: Optional[Any]
    :param loss: The modified loss
    :type loss: Optional[Any]
    :param modifier_data: The modifier data used to modify the
        model, optimizer, and loss
    :type modifier_data: Optional[List[Dict[str, Any]]]
    """

    model: Optional[Any] = None
    optimizer: Optional[Any] = None
    loss: Optional[Any] = None
    modifier_data: Optional[List[Dict[str, Any]]] = None

    def __init__(self, model, optimizer, loss, modifier_data):
        """
        Initialize the ModifiedState with the given parameters.

        :param model: The modified model
        :type model: Any
        :param optimizer: The modified optimizer
        :type optimizer: Any
        :param loss: The modified loss
        :type loss: Any
        :param modifier_data: The modifier data used to modify the model, optimizer,
            and loss
        :type modifier_data: List[Dict[str, Any]]
        """
        self.model = model
        self.optimizer = optimizer
        self.loss = loss
        self.modifier_data = modifier_data

__init__(model, optimizer, loss, modifier_data)

Initialize the ModifiedState with the given parameters.

Parameters:

Name Type Description Default
model Any

The modified model

required
optimizer Any

The modified optimizer

required
loss Any

The modified loss

required
modifier_data List[Dict[str, Any]]

The modifier data used to modify the model, optimizer, and loss

required
Source code in src/llmcompressor/core/state.py
def __init__(self, model, optimizer, loss, modifier_data):
    """
    Initialize the ModifiedState with the given parameters.

    :param model: The modified model
    :type model: Any
    :param optimizer: The modified optimizer
    :type optimizer: Any
    :param loss: The modified loss
    :type loss: Any
    :param modifier_data: The modifier data used to modify the model, optimizer,
        and loss
    :type modifier_data: List[Dict[str, Any]]
    """
    self.model = model
    self.optimizer = optimizer
    self.loss = loss
    self.modifier_data = modifier_data

State dataclass

State class holds information about the current compression state.

Parameters:

Name Type Description Default
model Any

The model being used for compression

None
teacher_model Any

The teacher model being used for compression

None
optimizer Any

The optimizer being used for training

None
optim_wrapped bool

Whether or not the optimizer has been wrapped

None
loss Any

The loss function being used for training

None
batch_data Any

The current batch of data being used for compression

None
data Data

The data sets being used for training, validation, testing, and/or calibration, wrapped in a Data instance

Data()
hardware Hardware

Hardware instance holding info about the target hardware being used

Hardware()
loggers Optional[LoggerManager]

LoggerManager instance holding all the loggers to log

None
model_log_cadence Optional[float]

The cadence to log model information w.r.t epochs. If 1, logs every epoch. If 2, logs every other epoch, etc. Default is 1.

None
Source code in src/llmcompressor/core/state.py
@dataclass
class State:
    """
    State class holds information about the current compression state.

    :param model: The model being used for compression
    :type model: Any
    :param teacher_model: The teacher model being used for compression
    :type teacher_model: Any
    :param optimizer: The optimizer being used for training
    :type optimizer: Any
    :param optim_wrapped: Whether or not the optimizer has been wrapped
    :type optim_wrapped: bool
    :param loss: The loss function being used for training
    :type loss: Any
    :param batch_data: The current batch of data being used for compression
    :type batch_data: Any
    :param data: The data sets being used for training, validation, testing,
        and/or calibration, wrapped in a Data instance
    :type data: Data
    :param hardware: Hardware instance holding info about the target hardware being used
    :type hardware: Hardware
    :param loggers: LoggerManager instance holding all the loggers to log
    :type loggers: Optional[LoggerManager]
    :param model_log_cadence: The cadence to log model information w.r.t epochs.
        If 1, logs every epoch. If 2, logs every other epoch, etc. Default is 1.
    :type model_log_cadence: Optional[float]
    """

    model: Any = None
    teacher_model: Any = None
    optimizer: Any = None
    optim_wrapped: bool = None
    loss: Any = None
    batch_data: Any = None
    data: Data = field(default_factory=Data)
    hardware: Hardware = field(default_factory=Hardware)
    loggers: Optional[LoggerManager] = None
    model_log_cadence: Optional[float] = None
    _last_log_step: Union[float, int, None] = None

    @property
    def compression_ready(self) -> bool:
        """
        Check if the model and optimizer are set for compression.

        :return: True if model and optimizer are set, False otherwise
        :rtype: bool
        """
        ready = self.model is not None and self.optimizer is not None
        logger.debug("Compression ready: {}", ready)
        return ready

    def update(
        self,
        model: Any = None,
        teacher_model: Any = None,
        optimizer: Any = None,
        attach_optim_callbacks: bool = True,
        train_data: Any = None,
        val_data: Any = None,
        test_data: Any = None,
        calib_data: Any = None,
        copy_data: bool = True,
        start: float = None,
        steps_per_epoch: int = None,
        batches_per_step: int = None,
        loggers: Union[None, LoggerManager, List[BaseLogger]] = None,
        model_log_cadence: Optional[float] = None,
        **kwargs,
    ) -> Dict:
        """
        Update the state with the given parameters.

        :param model: The model to update the state with
        :type model: Any
        :param teacher_model: The teacher model to update the state with
        :type teacher_model: Any
        :param optimizer: The optimizer to update the state with
        :type optimizer: Any
        :param attach_optim_callbacks: Whether or not to attach optimizer callbacks
        :type attach_optim_callbacks: bool
        :param train_data: The training data to update the state with
        :type train_data: Any
        :param val_data: The validation data to update the state with
        :type val_data: Any
        :param test_data: The testing data to update the state with
        :type test_data: Any
        :param calib_data: The calibration data to update the state with
        :type calib_data: Any
        :param copy_data: Whether or not to copy the data
        :type copy_data: bool
        :param start: The start index to update the state with
        :type start: float
        :param steps_per_epoch: The steps per epoch to update the state with
        :type steps_per_epoch: int
        :param batches_per_step: The batches per step to update the state with
        :type batches_per_step: int
        :param loggers: The metrics manager to setup logging important info and
            milestones to, also accepts a list of BaseLogger(s)
        :type loggers: Union[None, LoggerManager, List[BaseLogger]]
        :param model_log_cadence: The cadence to log model information w.r.t epochs.
            If 1, logs every epoch. If 2, logs every other epoch, etc. Default is 1.
        :type model_log_cadence: Optional[float]
        :param kwargs: Additional keyword arguments to update the state with
        :return: The updated state as a dictionary
        :rtype: Dict
        """
        logger.debug(
            "Updating state with provided parameters: {}",
            {
                "model": model,
                "teacher_model": teacher_model,
                "optimizer": optimizer,
                "attach_optim_callbacks": attach_optim_callbacks,
                "train_data": train_data,
                "val_data": val_data,
                "test_data": test_data,
                "calib_data": calib_data,
                "copy_data": copy_data,
                "start": start,
                "steps_per_epoch": steps_per_epoch,
                "batches_per_step": batches_per_step,
                "loggers": loggers,
                "model_log_cadence": model_log_cadence,
                "kwargs": kwargs,
            },
        )

        if model is not None:
            self.model = model
        if teacher_model is not None:
            self.teacher_model = teacher_model
        if optimizer is not None:
            self.optim_wrapped = attach_optim_callbacks
            self.optimizer = optimizer
        if train_data is not None:
            self.data.train = train_data if not copy_data else deepcopy(train_data)
        if val_data is not None:
            self.data.val = val_data if not copy_data else deepcopy(val_data)
        if test_data is not None:
            self.data.test = test_data if not copy_data else deepcopy(test_data)
        if calib_data is not None:
            self.data.calib = calib_data if not copy_data else deepcopy(calib_data)

        if "device" in kwargs:
            self.hardware.device = kwargs["device"]

        loggers = loggers or []
        if isinstance(loggers, list):
            loggers = LoggerManager(loggers)
        self.loggers = loggers

        if model_log_cadence is not None:
            self.model_log_cadence = model_log_cadence

        return kwargs

compression_ready property

Check if the model and optimizer are set for compression.

Returns:

Type Description
bool

True if model and optimizer are set, False otherwise

update(model=None, teacher_model=None, optimizer=None, attach_optim_callbacks=True, train_data=None, val_data=None, test_data=None, calib_data=None, copy_data=True, start=None, steps_per_epoch=None, batches_per_step=None, loggers=None, model_log_cadence=None, **kwargs)

Update the state with the given parameters.

Parameters:

Name Type Description Default
model Any

The model to update the state with

None
teacher_model Any

The teacher model to update the state with

None
optimizer Any

The optimizer to update the state with

None
attach_optim_callbacks bool

Whether or not to attach optimizer callbacks

True
train_data Any

The training data to update the state with

None
val_data Any

The validation data to update the state with

None
test_data Any

The testing data to update the state with

None
calib_data Any

The calibration data to update the state with

None
copy_data bool

Whether or not to copy the data

True
start float

The start index to update the state with

None
steps_per_epoch int

The steps per epoch to update the state with

None
batches_per_step int

The batches per step to update the state with

None
loggers Union[None, LoggerManager, List[BaseLogger]]

The metrics manager to setup logging important info and milestones to, also accepts a list of BaseLogger(s)

None
model_log_cadence Optional[float]

The cadence to log model information w.r.t epochs. If 1, logs every epoch. If 2, logs every other epoch, etc. Default is 1.

None
kwargs

Additional keyword arguments to update the state with

{}

Returns:

Type Description
Dict

The updated state as a dictionary

Source code in src/llmcompressor/core/state.py
def update(
    self,
    model: Any = None,
    teacher_model: Any = None,
    optimizer: Any = None,
    attach_optim_callbacks: bool = True,
    train_data: Any = None,
    val_data: Any = None,
    test_data: Any = None,
    calib_data: Any = None,
    copy_data: bool = True,
    start: float = None,
    steps_per_epoch: int = None,
    batches_per_step: int = None,
    loggers: Union[None, LoggerManager, List[BaseLogger]] = None,
    model_log_cadence: Optional[float] = None,
    **kwargs,
) -> Dict:
    """
    Update the state with the given parameters.

    :param model: The model to update the state with
    :type model: Any
    :param teacher_model: The teacher model to update the state with
    :type teacher_model: Any
    :param optimizer: The optimizer to update the state with
    :type optimizer: Any
    :param attach_optim_callbacks: Whether or not to attach optimizer callbacks
    :type attach_optim_callbacks: bool
    :param train_data: The training data to update the state with
    :type train_data: Any
    :param val_data: The validation data to update the state with
    :type val_data: Any
    :param test_data: The testing data to update the state with
    :type test_data: Any
    :param calib_data: The calibration data to update the state with
    :type calib_data: Any
    :param copy_data: Whether or not to copy the data
    :type copy_data: bool
    :param start: The start index to update the state with
    :type start: float
    :param steps_per_epoch: The steps per epoch to update the state with
    :type steps_per_epoch: int
    :param batches_per_step: The batches per step to update the state with
    :type batches_per_step: int
    :param loggers: The metrics manager to setup logging important info and
        milestones to, also accepts a list of BaseLogger(s)
    :type loggers: Union[None, LoggerManager, List[BaseLogger]]
    :param model_log_cadence: The cadence to log model information w.r.t epochs.
        If 1, logs every epoch. If 2, logs every other epoch, etc. Default is 1.
    :type model_log_cadence: Optional[float]
    :param kwargs: Additional keyword arguments to update the state with
    :return: The updated state as a dictionary
    :rtype: Dict
    """
    logger.debug(
        "Updating state with provided parameters: {}",
        {
            "model": model,
            "teacher_model": teacher_model,
            "optimizer": optimizer,
            "attach_optim_callbacks": attach_optim_callbacks,
            "train_data": train_data,
            "val_data": val_data,
            "test_data": test_data,
            "calib_data": calib_data,
            "copy_data": copy_data,
            "start": start,
            "steps_per_epoch": steps_per_epoch,
            "batches_per_step": batches_per_step,
            "loggers": loggers,
            "model_log_cadence": model_log_cadence,
            "kwargs": kwargs,
        },
    )

    if model is not None:
        self.model = model
    if teacher_model is not None:
        self.teacher_model = teacher_model
    if optimizer is not None:
        self.optim_wrapped = attach_optim_callbacks
        self.optimizer = optimizer
    if train_data is not None:
        self.data.train = train_data if not copy_data else deepcopy(train_data)
    if val_data is not None:
        self.data.val = val_data if not copy_data else deepcopy(val_data)
    if test_data is not None:
        self.data.test = test_data if not copy_data else deepcopy(test_data)
    if calib_data is not None:
        self.data.calib = calib_data if not copy_data else deepcopy(calib_data)

    if "device" in kwargs:
        self.hardware.device = kwargs["device"]

    loggers = loggers or []
    if isinstance(loggers, list):
        loggers = LoggerManager(loggers)
    self.loggers = loggers

    if model_log_cadence is not None:
        self.model_log_cadence = model_log_cadence

    return kwargs

active_session()

Returns:

Type Description
CompressionSession

the active session for sparsification

Source code in src/llmcompressor/core/session_functions.py
def active_session() -> CompressionSession:
    """
    :return: the active session for sparsification
    """
    global _local_storage
    return getattr(_local_storage, "session", _global_session)

create_session()

Context manager to create and yield a new session for sparsification. This will set the active session to the new session for the duration of the context.

Returns:

Type Description
Generator[CompressionSession, None, None]

the new session

Source code in src/llmcompressor/core/session_functions.py
@contextmanager
def create_session() -> Generator[CompressionSession, None, None]:
    """
    Context manager to create and yield a new session for sparsification.
    This will set the active session to the new session for the duration
    of the context.

    :return: the new session
    """
    global _local_storage
    orig_session = getattr(_local_storage, "session", None)
    new_session = CompressionSession()
    _local_storage.session = new_session
    try:
        yield new_session
    finally:
        _local_storage.session = orig_session

reset_session()

Reset the currently active session to its initial state

Source code in src/llmcompressor/core/session_functions.py
def reset_session():
    """
    Reset the currently active session to its initial state
    """
    session = active_session()
    session._lifecycle.reset()