Skip to main content

Python module

interfaces

Interfaces for MAX pipelines.

AlwaysSignalBuffersMixin

class max.pipelines.lib.interfaces.AlwaysSignalBuffersMixin

Bases: object

Mixin for models that always require signal buffers.

Use this for models that use VocabParallelEmbedding or other distributed components that always perform allreduce, even on single-device setups.

Models using this mixin build graphs that always include signal buffer inputs, regardless of device count. This is typically because they use distributed embedding layers or other components that call allreduce operations unconditionally.

devices

devices: list[Device]

Device list that must be provided by the model class.

signal_buffers

property signal_buffers: list[Tensor]

Override to always create signal buffers.

Models using this mixin have distributed components that always perform allreduce, even for single-device setups. Therefore, signal buffers are always required to match the graph inputs.

Returns:

List of signal buffer tensors, one per device.

GenerateMixin

class max.pipelines.lib.interfaces.GenerateMixin(*args, **kwargs)

Bases: Protocol[TextGenerationContextType, RequestType]

execute()

execute(inputs)

Parameters:

inputs (TextGenerationInputs[TextGenerationContextType])

Return type:

dict[RequestID, TextGenerationOutput]

generate()

generate(prompts)

Generates outputs for the given prompts.

Parameters:

prompts (RequestType | list[RequestType])

Return type:

list[TextGenerationOutput]

generate_async()

async generate_async(prompts)

Parameters:

prompts (RequestType | list[RequestType])

Return type:

Any

kv_managers

property kv_managers: list[PagedKVCacheManager]

pipeline_config

property pipeline_config: PipelineConfig

release()

release(request_id)

Parameters:

request_id (RequestID)

Return type:

None

tokenizer

property tokenizer: PipelineTokenizer[TextGenerationContextType, ndarray[tuple[int, ...], dtype[integer[Any]]], RequestType]

KVCacheMixin

class max.pipelines.lib.interfaces.KVCacheMixin(*args, **kwargs)

Bases: Protocol

estimate_kv_cache_size()

abstract classmethod estimate_kv_cache_size(pipeline_config, available_cache_memory, devices, huggingface_config, kv_cache_config, cache_dtype)

Estimates the size of the kv cache in bytes.

Parameters:

  • pipeline_config (PipelineConfig)
  • available_cache_memory (int)
  • devices (list[Device])
  • huggingface_config (AutoConfig)
  • kv_cache_config (KVCacheConfig)
  • cache_dtype (DType)

Return type:

int

get_kv_params()

abstract classmethod get_kv_params(huggingface_config, n_devices, kv_cache_config, cache_dtype)

Returns the KV cache params for the pipeline model.

Parameters:

  • huggingface_config (AutoConfig)
  • n_devices (int)
  • kv_cache_config (KVCacheConfig)
  • cache_dtype (DType)

Return type:

KVCacheParams

get_num_layers()

abstract classmethod get_num_layers(huggingface_config)

Returns the number of layers for the pipeline model.

Parameters:

huggingface_config (AutoConfig)

Return type:

int

load_kv_manager()

load_kv_manager(session, available_cache_memory)

Provided a PipelineConfig and InferenceSession, loads the KV manager.

Parameters:

  • session (InferenceSession) – Inference session to compile and init the KV cache.
  • available_cache_memory (int | None) – Amount of memory available to the KV cache, in bytes.

Returns:

one per input modality.

Return type:

Either a single KV cache manager or a tuple of KV cache managers

ModelInputs

class max.pipelines.lib.interfaces.ModelInputs

Bases: object

Base class for model inputs. Use this class to encapsulate inputs for your model. You may store any number of dataclass fields

The following example demonstrates how to create a custom inputs class for a model:

class ReplitInputs(ModelInputs):
    tokens: Tensor
    input_row_offsets: Tensor

    def __init__(self, tokens: Tensor, input_row_offsets: Tensor):
        self.tokens = tokens
        self.input_row_offsets = input_row_offsets

tokens = Tensor.zeros((1, 2, 3), DType.int64)
input_row_offsets = Tensor.zeros((1, 1, 1), DType.int64)

# Initialize inputs
inputs = ReplitInputs(tokens=tokens, input_row_offsets=input_row_offsets)

# Access tensors
list(inputs) == [tokens, input_row_offsets]  # Output: True

kv_cache_inputs

kv_cache_inputs: KVCacheInputs | None = None

lora_ids

lora_ids: Tensor | None = None

Tensor containing the LoRA ids.

lora_ranks

lora_ranks: Tensor | None = None

Tensor containing the LoRA ranks

update()

update(**kwargs)

Return type:

None

ModelOutputs

class max.pipelines.lib.interfaces.ModelOutputs(logits: 'Tensor', next_token_logits: 'Tensor | None' = None, logit_offsets: 'Tensor | None' = None)

Bases: object

Parameters:

logit_offsets

logit_offsets: Tensor | None = None

Offsets to access variable length logits for each sequence.

logits

logits: Tensor

Logits for a variable number of tokens per sequence.

next_token_logits

next_token_logits: Tensor | None = None

Logits for just the next token.

PipelineModel

class max.pipelines.lib.interfaces.PipelineModel(pipeline_config, session, huggingface_config, encoding, devices, kv_cache_config, weights, adapter, return_logits)

Bases: ABC, Generic[BaseContextType]

A pipeline model with setup, input preparation and execution methods.

Parameters:

calculate_max_seq_len()

abstract classmethod calculate_max_seq_len(pipeline_config, huggingface_config)

Calculate the optimal max sequence length for the model. Models are expected to implement this method.

The following example shows how to implement this method for a Mistral model:

class MistralModel(PipelineModel):
    @classmethod
    def calculate_max_seq_len(cls, pipeline_config, huggingface_config) -> int:
        try:
            return upper_bounded_default(
                upper_bound=huggingface_config.max_seq_len,
                default=pipeline_config.max_length,
            )
        except ValueError as e:
            raise ValueError(
                "Unable to infer max_length for Mistral, the provided "
                f"max_length ({pipeline_config.max_length}) exceeds the "
                f"model's max_seq_len ({huggingface_config.max_seq_len})."
            ) from e

Parameters:

  • pipeline_config (PipelineConfig) – Configuration for the pipeline.
  • huggingface_config (AutoConfig) – Hugging Face model configuration.

Returns:

The maximum sequence length to use.

Return type:

int

compute_log_probabilities()

compute_log_probabilities(session, model_inputs, model_outputs, next_tokens, batch_top_n, batch_echo)

Optional method that can be overridden to compute log probabilities.

Parameters:

  • session (InferenceSession) – Inference session to compute log probabilities within.
  • model_inputs (ModelInputs) – Inputs to the model returned by prepare_*_token_inputs().
  • model_outputs (ModelOutputs) – Outputs returned by execute().
  • next_tokens (Tensor) – Sampled tokens. Should have shape=[batch size]
  • batch_top_n (list[int]) – Number of top log probabilities to return per input in the batch. For any element where top_n == 0, the LogProbabilities is skipped.
  • batch_echo (list[bool]) – Whether to include input tokens in the returned log probabilities.

Returns:

List of log probabilities.

Return type:

list[LogProbabilities | None]

dtype

property dtype: DType

estimate_activation_memory()

classmethod estimate_activation_memory(pipeline_config, huggingface_config)

Estimates the activation memory required for model execution.

This accounts for temporary memory buffers used during model execution, such as intermediate activations and working buffers.

The default implementation returns 0 for backward compatibility. Models with significant activation memory requirements should override this method to provide accurate estimates.

Parameters:

  • pipeline_config (PipelineConfig) – Pipeline configuration
  • huggingface_config (AutoConfig) – HuggingFace model configuration

Returns:

Estimated activation memory in bytes

Return type:

int

estimate_weights_size()

classmethod estimate_weights_size(pipeline_config)

Calculates the estimated memory consumption of our model.

Parameters:

pipeline_config (PipelineConfig)

Return type:

int

execute()

abstract execute(model_inputs)

Executes the graph with the given inputs.

Parameters:

model_inputs (ModelInputs) – The model inputs to execute, containing tensors and any other required data for model execution.

Returns:

ModelOutputs containing the pipeline’s output tensors.

Return type:

ModelOutputs

This is an abstract method that must be implemented by concrete PipelineModels to define their specific execution logic.

infer_optimal_batch_size()

classmethod infer_optimal_batch_size(pipeline_config, available_cache_memory, huggingface_config, devices, kv_cache_config, cache_dtype)

Returns the estimated optimal batch size to run the model given current memory constraints.

Parameters:

  • pipeline_config (PipelineConfig)
  • available_cache_memory (int)
  • huggingface_config (AutoConfig)
  • devices (list[Device])
  • kv_cache_config (KVCacheConfig)
  • cache_dtype (DType)

Return type:

int

lora_manager

property lora_manager: LoRAManager | None

prepare_initial_token_inputs()

abstract prepare_initial_token_inputs(context_batch, kv_cache_inputs=None, return_n_logits=1)

Prepares the initial inputs to be passed to .execute().

The inputs and functionality of this method can vary per model. For example, the model inputs could include:

  • Encoded tensors
  • A unique IDs for each tensor if this model uses a KV Cache manager.
  • kv_cache_inputs: The kv cache inputs required for the model. This should be None if the model does not use KV Cache. This function would batch the encoded tensors, claim a slot in the kv cache if the ID hasn’t been seen before, and return the inputs and caches as a list of tensors.

Parameters:

Return type:

ModelInputs

prepare_next_token_inputs()

abstract prepare_next_token_inputs(next_tokens, prev_model_inputs)

Prepares the secondary inputs to be passed to .execute().

While prepare_initial_token_inputs is responsible for managing the initial inputs. This function is responsible for updating the inputs, for each step in a multi-step execution pattern.

Parameters:

Return type:

ModelInputs

signal_buffers

property signal_buffers: list[Tensor]

Lazily initialize signal buffers for multi-GPU communication collectives.

Signal buffers are only needed during model execution, not during compilation. By deferring their allocation, we avoid memory allocation in compile-only mode.

Returns:

List of signal buffer tensors, one per device for multi-device setups, or an empty list for single-device setups.

get_paged_manager()

max.pipelines.lib.interfaces.get_paged_manager(pipeline)

Get the paged KV cache manager from a pipeline, if available.

Parameters:

pipeline (Pipeline[Any, Any]) – The pipeline to extract the KV cache manager from.

Returns:

The paged KV cache manager if available, None otherwise.

Return type:

PagedKVCacheManager | None

Was this page helpful?