Skip to content

generations

ImageGenerationInitKwargs

Bases: TypedDict

Optional keyword arguments accepted by ImageSingleGeneration from template helpers.

Source code in horde_sdk/worker/generations.py
class ImageGenerationInitKwargs(TypedDict, total=False):
    """Optional keyword arguments accepted by `ImageSingleGeneration` from template helpers."""

    generation_id: ID_TYPES | None
    dispatch_result_ids: Sequence[ID_TYPES] | None
    result_ids: list[ID_TYPES] | None
    requires_submit: bool
    safety_rules: SafetyRules
    black_box_mode: bool
    state_error_limits: Mapping[GENERATION_PROGRESS, int] | None
    strict_transition_mode: bool
    extra_logging: bool

generation_id instance-attribute

generation_id: ID_TYPES | None

dispatch_result_ids instance-attribute

dispatch_result_ids: Sequence[ID_TYPES] | None

result_ids instance-attribute

result_ids: list[ID_TYPES] | None

requires_submit instance-attribute

requires_submit: bool

safety_rules instance-attribute

safety_rules: SafetyRules

black_box_mode instance-attribute

black_box_mode: bool

state_error_limits instance-attribute

state_error_limits: Mapping[GENERATION_PROGRESS, int] | None

strict_transition_mode instance-attribute

strict_transition_mode: bool

extra_logging instance-attribute

extra_logging: bool

AlchemyGenerationInitKwargs

Bases: TypedDict

Optional keyword arguments accepted by AlchemySingleGeneration factory flows.

Source code in horde_sdk/worker/generations.py
class AlchemyGenerationInitKwargs(TypedDict, total=False):
    """Optional keyword arguments accepted by `AlchemySingleGeneration` factory flows."""

    generation_id: str | None
    dispatch_result_ids: Sequence[ID_TYPES] | None
    result_ids: list[ID_TYPES] | None
    requires_generation: bool
    requires_post_processing: bool
    requires_safety_check: bool
    requires_submit: bool
    safety_rules: SafetyRules
    black_box_mode: bool
    state_error_limits: Mapping[GENERATION_PROGRESS, int] | None
    strict_transition_mode: bool
    extra_logging: bool

generation_id instance-attribute

generation_id: str | None

dispatch_result_ids instance-attribute

dispatch_result_ids: Sequence[ID_TYPES] | None

result_ids instance-attribute

result_ids: list[ID_TYPES] | None

requires_generation instance-attribute

requires_generation: bool

requires_post_processing instance-attribute

requires_post_processing: bool

requires_safety_check instance-attribute

requires_safety_check: bool

requires_submit instance-attribute

requires_submit: bool

safety_rules instance-attribute

safety_rules: SafetyRules

black_box_mode instance-attribute

black_box_mode: bool

state_error_limits instance-attribute

state_error_limits: Mapping[GENERATION_PROGRESS, int] | None

strict_transition_mode instance-attribute

strict_transition_mode: bool

extra_logging instance-attribute

extra_logging: bool

TextGenerationInitKwargs

Bases: TypedDict

Optional keyword arguments accepted by TextSingleGeneration helpers.

Source code in horde_sdk/worker/generations.py
class TextGenerationInitKwargs(TypedDict, total=False):
    """Optional keyword arguments accepted by `TextSingleGeneration` helpers."""

    generation_id: ID_TYPES | None
    dispatch_result_ids: Sequence[ID_TYPES] | None
    result_ids: list[ID_TYPES] | None
    requires_post_processing: bool
    requires_safety_check: bool
    requires_submit: bool
    safety_rules: SafetyRules
    black_box_mode: bool
    state_error_limits: Mapping[GENERATION_PROGRESS, int] | None
    strict_transition_mode: bool
    extra_logging: bool

generation_id instance-attribute

generation_id: ID_TYPES | None

dispatch_result_ids instance-attribute

dispatch_result_ids: Sequence[ID_TYPES] | None

result_ids instance-attribute

result_ids: list[ID_TYPES] | None

requires_post_processing instance-attribute

requires_post_processing: bool

requires_safety_check instance-attribute

requires_safety_check: bool

requires_submit instance-attribute

requires_submit: bool

safety_rules instance-attribute

safety_rules: SafetyRules

black_box_mode instance-attribute

black_box_mode: bool

state_error_limits instance-attribute

state_error_limits: Mapping[GENERATION_PROGRESS, int] | None

strict_transition_mode instance-attribute

strict_transition_mode: bool

extra_logging instance-attribute

extra_logging: bool

ImageSingleGeneration

Bases: HordeSingleGeneration[bytes]

A single image generation.

Not to be confused with a job, which can contain multiple generations.

Source code in horde_sdk/worker/generations.py
class ImageSingleGeneration(HordeSingleGeneration[bytes]):
    """A single image generation.

    **Not to be confused with a job**, which can contain multiple generations.
    """

    @override
    @classmethod
    def default_generate_progress_transitions(cls) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
        return default_image_generate_progress_transitions

    @override
    @classmethod
    def default_generate_progress_transitions_no_submit(
        cls,
    ) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
        return default_image_generate_progress_no_submit_transitions

    @override
    @classmethod
    def does_class_require_generation(cls) -> bool:
        return True

    @override
    @classmethod
    def does_class_require_safety_check(cls) -> bool:
        return True

    generation_parameters: ImageGenerationParameters

    def __init__(
        self,
        *,
        generation_parameters: ImageGenerationParameters,
        generation_id: ID_TYPES | None = None,
        dispatch_result_ids: Sequence[ID_TYPES] | None = None,
        result_ids: list[ID_TYPES] | None = None,
        requires_submit: bool = True,
        safety_rules: SafetyRules = default_image_safety_rules,
        black_box_mode: bool = False,
        state_error_limits: (
            Mapping[GENERATION_PROGRESS, int] | None
        ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
        strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
        extra_logging: bool = False,
    ) -> None:
        """Initialize the generation.

        Args:
            generation_parameters (ImageGenerationParameters): The parameters for the generation.
            generation_id (str | None): The unique identifier for the generation. \
                If None, a random UUID will be generated.
            dispatch_result_ids (Sequence[str | uuid.UUID] | None): Result identifiers assigned by dispatch, if
                available.
            result_ids (list[ID_TYPES] | None): The unique identifiers for the generation. \
                If None, a random UUID will be generated for each result.
            requires_submit (bool, optional): Whether the generation requires submission. \
                Defaults to True.
            safety_rules (SafetyRules, optional): The safety rules to apply to the generation. \
                Defaults to default_image_safety_rules from `horde_sdk.safety`.
            black_box_mode (bool, optional): Whether the generation is in black box mode. \
                This removes all of the intermediate states between starting and finished states. \
                This should only be used when the backend has no observability into the generation process. \
                Defaults to False.
            state_error_limits (Mapping[GENERATION_PROGRESS, int], optional): The maximum number of times a \
                generation can be in an error state before it is considered failed. \
                Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.
            strict_transition_mode (bool, optional): Whether or not to enforce strict transition checking. \
                This prevents setting the same state multiple times in a row. \
                Defaults to True.
            extra_logging (bool, optional): Whether or not to enable extra debug-level logging, \
                especially for state transitions. \
                Defaults to True.
        """
        generate_progress_transitions = self.default_generate_progress_transitions()

        if not requires_submit:
            generate_progress_transitions = self.default_generate_progress_transitions_no_submit()

        if result_ids is None and generation_parameters.result_ids is not None:
            result_ids = generation_parameters.result_ids
            logger.trace(
                f"Result IDs were not provided, using result IDs from generation parameters: {result_ids}",
                extra={"generation_id": generation_id},
            )
        elif result_ids is not None and generation_parameters.result_ids is not None:
            logger.warning(
                "Both result IDs and generation parameters result IDs were provided. Using the provided result IDs.",
                extra={"generation_id": generation_id},
            )

        super().__init__(
            result_type=bytes,
            generation_parameters=generation_parameters,
            generation_id=generation_id,
            dispatch_result_ids=dispatch_result_ids,
            result_ids=result_ids,
            requires_generation=ImageSingleGeneration.does_class_require_generation(),
            requires_post_processing=generation_parameters.alchemy_params is not None,
            requires_safety_check=True,
            requires_submit=requires_submit,
            safety_rules=safety_rules,
            state_error_limits=state_error_limits,
            generate_progress_transitions=generate_progress_transitions,
            black_box_mode=black_box_mode,
            strict_transition_mode=strict_transition_mode,
            extra_logging=extra_logging,
        )

    @classmethod
    def from_template(
        cls,
        template: ImageGenerationParametersTemplate,
        *,
        generation_id: ID_TYPES | None = None,
        dispatch_result_ids: Sequence[ID_TYPES] | None = None,
        base_param_updates: BasicImageGenerationParametersTemplate | None = None,
        additional_param_updates: ImageGenerationComponentContainer | None = None,
        result_ids: Sequence[ID_TYPES] | None = None,
        allocator: ResultIdAllocator | None = None,
        seed: str = "image",
        init_kwargs: ImageGenerationInitKwargs | None = None,
    ) -> ImageSingleGeneration:
        """Instantiate an image generation from a template."""
        generation_parameters = template.to_parameters(
            base_param_updates=base_param_updates,
            result_ids=result_ids,
            allocator=allocator,
            seed=seed,
        )
        resolved_kwargs: ImageGenerationInitKwargs = {}
        if init_kwargs:
            resolved_kwargs.update(init_kwargs)
        if generation_id is not None:
            resolved_kwargs.setdefault("generation_id", generation_id)
        if dispatch_result_ids is not None:
            resolved_kwargs.setdefault("dispatch_result_ids", list(dispatch_result_ids))
        resolved_kwargs.setdefault("result_ids", generation_parameters.result_ids)
        return cls(
            generation_parameters=generation_parameters,
            **resolved_kwargs,
        )

generation_parameters instance-attribute

generation_parameters: ImageGenerationParameters

result_type property

result_type: type[GenerationResultTypeVar]

The type of the result of the generation.

batch_size property

batch_size: int

The batch size of the generation.

result_ids property

result_ids: list[ID_TYPES]

The unique identifiers for the generations in the batch.

black_box_mode property

black_box_mode: bool

Whether or not the generation is in black box mode.

generation_id instance-attribute

generation_id: ID_TYPES

Unique identifier for the generation.

dispatch_result_ids property

dispatch_result_ids: list[ID_TYPES] | None

Identifiers supplied by the dispatch source, if any.

short_id property

short_id: str

Get a short identifier for the generation.

generation_failure_count property

generation_failure_count: int

The number of times the generation has failed during any step of the generation process.

errored_states property

errored_states: list[tuple[GENERATION_PROGRESS, float]]

Return a tuple of states which occurred just before an error state and the time they were set.

error_counts property

error_counts: dict[GENERATION_PROGRESS, int]

Return a dictionary of states and the number of times they occurred before an error state.

requires_post_processing property

requires_post_processing: bool

Whether or not the generation requires post-processing.

requires_generation property

requires_generation: bool

Whether or not the generation requires generation.

requires_safety_check property

requires_safety_check: bool

Whether or not the generation requires a safety check.

requires_submit property

requires_submit: bool

Whether or not the generation requires submission.

generation_results property

generation_results: OrderedDict[
    ID_TYPES, GenerationResultTypeVar | None
]

Get the result of the generation.

default_generate_progress_transitions classmethod

default_generate_progress_transitions() -> (
    dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]
)
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def default_generate_progress_transitions(cls) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
    return default_image_generate_progress_transitions

default_generate_progress_transitions_no_submit classmethod

default_generate_progress_transitions_no_submit() -> (
    dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]
)
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def default_generate_progress_transitions_no_submit(
    cls,
) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
    return default_image_generate_progress_no_submit_transitions

does_class_require_generation classmethod

does_class_require_generation() -> bool
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def does_class_require_generation(cls) -> bool:
    return True

does_class_require_safety_check classmethod

does_class_require_safety_check() -> bool
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def does_class_require_safety_check(cls) -> bool:
    return True

__init__

__init__(
    *,
    generation_parameters: ImageGenerationParameters,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    result_ids: list[ID_TYPES] | None = None,
    requires_submit: bool = True,
    safety_rules: SafetyRules = default_image_safety_rules,
    black_box_mode: bool = False,
    state_error_limits: (
        Mapping[GENERATION_PROGRESS, int] | None
    ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
    strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
    extra_logging: bool = False
) -> None

Initialize the generation.

Parameters:

  • generation_parameters (ImageGenerationParameters) –

    The parameters for the generation.

  • generation_id (str | None, default: None ) –

    The unique identifier for the generation. If None, a random UUID will be generated.

  • dispatch_result_ids (Sequence[str | UUID] | None, default: None ) –

    Result identifiers assigned by dispatch, if available.

  • result_ids (list[ID_TYPES] | None, default: None ) –

    The unique identifiers for the generation. If None, a random UUID will be generated for each result.

  • requires_submit (bool, default: True ) –

    Whether the generation requires submission. Defaults to True.

  • safety_rules (SafetyRules, default: default_image_safety_rules ) –

    The safety rules to apply to the generation. Defaults to default_image_safety_rules from horde_sdk.safety.

  • black_box_mode (bool, default: False ) –

    Whether the generation is in black box mode. This removes all of the intermediate states between starting and finished states. This should only be used when the backend has no observability into the generation process. Defaults to False.

  • state_error_limits (Mapping[GENERATION_PROGRESS, int], default: DEFAULT_STATE_ERROR_LIMITS ) –

    The maximum number of times a generation can be in an error state before it is considered failed. Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.

  • strict_transition_mode (bool, default: DEFAULT_GENERATION_STRICT_TRANSITION_MODE ) –

    Whether or not to enforce strict transition checking. This prevents setting the same state multiple times in a row. Defaults to True.

  • extra_logging (bool, default: False ) –

    Whether or not to enable extra debug-level logging, especially for state transitions. Defaults to True.

Source code in horde_sdk/worker/generations.py
def __init__(
    self,
    *,
    generation_parameters: ImageGenerationParameters,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    result_ids: list[ID_TYPES] | None = None,
    requires_submit: bool = True,
    safety_rules: SafetyRules = default_image_safety_rules,
    black_box_mode: bool = False,
    state_error_limits: (
        Mapping[GENERATION_PROGRESS, int] | None
    ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
    strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
    extra_logging: bool = False,
) -> None:
    """Initialize the generation.

    Args:
        generation_parameters (ImageGenerationParameters): The parameters for the generation.
        generation_id (str | None): The unique identifier for the generation. \
            If None, a random UUID will be generated.
        dispatch_result_ids (Sequence[str | uuid.UUID] | None): Result identifiers assigned by dispatch, if
            available.
        result_ids (list[ID_TYPES] | None): The unique identifiers for the generation. \
            If None, a random UUID will be generated for each result.
        requires_submit (bool, optional): Whether the generation requires submission. \
            Defaults to True.
        safety_rules (SafetyRules, optional): The safety rules to apply to the generation. \
            Defaults to default_image_safety_rules from `horde_sdk.safety`.
        black_box_mode (bool, optional): Whether the generation is in black box mode. \
            This removes all of the intermediate states between starting and finished states. \
            This should only be used when the backend has no observability into the generation process. \
            Defaults to False.
        state_error_limits (Mapping[GENERATION_PROGRESS, int], optional): The maximum number of times a \
            generation can be in an error state before it is considered failed. \
            Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.
        strict_transition_mode (bool, optional): Whether or not to enforce strict transition checking. \
            This prevents setting the same state multiple times in a row. \
            Defaults to True.
        extra_logging (bool, optional): Whether or not to enable extra debug-level logging, \
            especially for state transitions. \
            Defaults to True.
    """
    generate_progress_transitions = self.default_generate_progress_transitions()

    if not requires_submit:
        generate_progress_transitions = self.default_generate_progress_transitions_no_submit()

    if result_ids is None and generation_parameters.result_ids is not None:
        result_ids = generation_parameters.result_ids
        logger.trace(
            f"Result IDs were not provided, using result IDs from generation parameters: {result_ids}",
            extra={"generation_id": generation_id},
        )
    elif result_ids is not None and generation_parameters.result_ids is not None:
        logger.warning(
            "Both result IDs and generation parameters result IDs were provided. Using the provided result IDs.",
            extra={"generation_id": generation_id},
        )

    super().__init__(
        result_type=bytes,
        generation_parameters=generation_parameters,
        generation_id=generation_id,
        dispatch_result_ids=dispatch_result_ids,
        result_ids=result_ids,
        requires_generation=ImageSingleGeneration.does_class_require_generation(),
        requires_post_processing=generation_parameters.alchemy_params is not None,
        requires_safety_check=True,
        requires_submit=requires_submit,
        safety_rules=safety_rules,
        state_error_limits=state_error_limits,
        generate_progress_transitions=generate_progress_transitions,
        black_box_mode=black_box_mode,
        strict_transition_mode=strict_transition_mode,
        extra_logging=extra_logging,
    )

from_template classmethod

from_template(
    template: ImageGenerationParametersTemplate,
    *,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    base_param_updates: (
        BasicImageGenerationParametersTemplate | None
    ) = None,
    additional_param_updates: (
        ImageGenerationComponentContainer | None
    ) = None,
    result_ids: Sequence[ID_TYPES] | None = None,
    allocator: ResultIdAllocator | None = None,
    seed: str = "image",
    init_kwargs: ImageGenerationInitKwargs | None = None
) -> ImageSingleGeneration

Instantiate an image generation from a template.

Source code in horde_sdk/worker/generations.py
@classmethod
def from_template(
    cls,
    template: ImageGenerationParametersTemplate,
    *,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    base_param_updates: BasicImageGenerationParametersTemplate | None = None,
    additional_param_updates: ImageGenerationComponentContainer | None = None,
    result_ids: Sequence[ID_TYPES] | None = None,
    allocator: ResultIdAllocator | None = None,
    seed: str = "image",
    init_kwargs: ImageGenerationInitKwargs | None = None,
) -> ImageSingleGeneration:
    """Instantiate an image generation from a template."""
    generation_parameters = template.to_parameters(
        base_param_updates=base_param_updates,
        result_ids=result_ids,
        allocator=allocator,
        seed=seed,
    )
    resolved_kwargs: ImageGenerationInitKwargs = {}
    if init_kwargs:
        resolved_kwargs.update(init_kwargs)
    if generation_id is not None:
        resolved_kwargs.setdefault("generation_id", generation_id)
    if dispatch_result_ids is not None:
        resolved_kwargs.setdefault("dispatch_result_ids", list(dispatch_result_ids))
    resolved_kwargs.setdefault("result_ids", generation_parameters.result_ids)
    return cls(
        generation_parameters=generation_parameters,
        **resolved_kwargs,
    )

default_interrupt_states classmethod

default_interrupt_states() -> set[GENERATION_PROGRESS]

Get the default interrupt states.

Source code in horde_sdk/worker/generations_base.py
@classmethod
def default_interrupt_states(cls) -> set[GENERATION_PROGRESS]:
    """Get the default interrupt states."""
    return {GENERATION_PROGRESS.ABORTED, GENERATION_PROGRESS.USER_REQUESTED_ABORT, GENERATION_PROGRESS.ABANDONED}

register_callback

register_callback(
    state: GENERATION_PROGRESS, callback: Callable[[], None]
) -> None

Register a callback to be called when the generation state is updated.

Parameters:

  • state (GENERATION_PROGRESS) –

    The state to register the callback for.

  • callback (Callable[[], None]) –

    The callback to call when the state is updated.

Source code in horde_sdk/worker/generations_base.py
def register_callback(
    self,
    state: GENERATION_PROGRESS,
    callback: Callable[[], None],
) -> None:
    """Register a callback to be called when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The state to register the callback for.
        callback (Callable[[], None]): The callback to call when the state is updated.
    """
    with self._lock:
        self._registered_callbacks[state].append(callback)

set_dispatch_result_ids

set_dispatch_result_ids(
    dispatch_result_ids: Sequence[ID_TYPES] | None,
) -> None

Bind the generation to the result identifiers supplied by dispatch.

Source code in horde_sdk/worker/generations_base.py
def set_dispatch_result_ids(self, dispatch_result_ids: Sequence[ID_TYPES] | None) -> None:
    """Bind the generation to the result identifiers supplied by dispatch."""
    with self._lock:
        self._dispatch_result_ids = list(dispatch_result_ids) if dispatch_result_ids is not None else None

get_generation_progress

get_generation_progress() -> GENERATION_PROGRESS

Return the state of the generation.

Source code in horde_sdk/worker/generations_base.py
def get_generation_progress(self) -> GENERATION_PROGRESS:
    """Return the state of the generation."""
    with self._lock:
        return self._generation_progress

get_progress_history

get_progress_history() -> (
    list[tuple[GENERATION_PROGRESS, float]]
)

Get the generation progress history.

Source code in horde_sdk/worker/generations_base.py
def get_progress_history(self) -> list[tuple[GENERATION_PROGRESS, float]]:
    """Get the generation progress history."""
    with self._lock:
        return self._progress_history.copy()

get_last_non_error_state

get_last_non_error_state() -> GENERATION_PROGRESS

Get the last non-error state.

Source code in horde_sdk/worker/generations_base.py
def get_last_non_error_state(self) -> GENERATION_PROGRESS:
    """Get the last non-error state."""
    with self._lock:
        for state, _ in reversed(self._progress_history):
            if state != GENERATION_PROGRESS.ERROR:
                return state
        raise RuntimeError("No non-error state found in progress history")

get_last_non_error_state_and_time

get_last_non_error_state_and_time() -> (
    tuple[GENERATION_PROGRESS, float]
)

Get the last non-error state and the time it was set.

Source code in horde_sdk/worker/generations_base.py
def get_last_non_error_state_and_time(self) -> tuple[GENERATION_PROGRESS, float]:
    """Get the last non-error state and the time it was set."""
    with self._lock:
        for state, time_set in reversed(self._progress_history):
            if state != GENERATION_PROGRESS.ERROR:
                return state, time_set
        raise RuntimeError("No non-error state found in progress history")

is_next_state_valid

is_next_state_valid(
    next_state: GENERATION_PROGRESS,
) -> bool

Check if the next state is valid.

Source code in horde_sdk/worker/generations_base.py
def is_next_state_valid(
    self,
    next_state: GENERATION_PROGRESS,
) -> bool:
    """Check if the next state is valid."""
    with self._lock:
        if next_state in self.default_interrupt_states():
            return True

        current_state = self._generation_progress

        if self._strict_transition_mode:
            if current_state == next_state:
                return False

            if self._any_error_count_exceeded:
                return False

            if current_state == GENERATION_PROGRESS.ERROR and next_state == GENERATION_PROGRESS.ERROR:
                return False

        if current_state == GENERATION_PROGRESS.ERROR and len(self._progress_history) < 2:
            return False

        state_error_count = self._error_counts.get(next_state, 0)
        state_error_limit = (
            self._state_error_limits.get(next_state, float("inf")) if self._state_error_limits else float("inf")
        )
        error_count_exceeded = state_error_count >= state_error_limit

        if error_count_exceeded:
            return False

    return True

on_error

on_error(
    *,
    failed_message: str,
    failure_exception: Exception | None = None
) -> GENERATION_PROGRESS

Call when an error occurs at any point in the generation process, safety checks, or submission.

This should be reserved for errors which make the current step impossible to complete. For example, if the a sub-process is OOM killed.

Contrast with the add_metadata_entry method, which is used for non-critical errors. If there is no METADATA_TYPE for the error you encountered, you can use failed_message and failure_exception instead.

Parameters:

  • failed_message (str) –

    The reason the generation failed.

  • failure_exception (Exception, default: None ) –

    The exception that caused the generation to fail. Defaults to None.

Returns:

  • GENERATION_PROGRESS ( GENERATION_PROGRESS ) –

    The new state of the generation, which will be set to GENERATION_PROGRESS.ERROR.

Raises:

  • RuntimeError

    If the generation has exceeded the maximum number of errors for the current state.

  • RuntimeError

    If the generation is in an error state and has no previous state to transition from.

Source code in horde_sdk/worker/generations_base.py
def on_error(self, *, failed_message: str, failure_exception: Exception | None = None) -> GENERATION_PROGRESS:
    """Call when an error occurs at any point in the generation process, safety checks, or submission.

    This should be reserved for errors which make the current step **impossible** to complete. For example, if the
    a sub-process is OOM killed.

    Contrast with the `add_metadata_entry` method, which is used for non-critical errors. If there is no
    METADATA_TYPE for the error you encountered, you can use `failed_message` and `failure_exception` instead.

    Args:
        failed_message (str): The reason the generation failed.
        failure_exception (Exception, optional): The exception that caused the generation to fail. \
            Defaults to None.

    Returns:
        GENERATION_PROGRESS: The new state of the generation, which will be set to `GENERATION_PROGRESS.ERROR`.

    Raises:
        RuntimeError: If the generation has exceeded the maximum number of errors for the current state.
        RuntimeError: If the generation is in an error state and has no previous state to transition from.
    """
    with self._lock:
        self._generation_failure_count += 1
        return self._set_generation_progress(
            GENERATION_PROGRESS.ERROR,
            failed_message=failed_message,
            failure_exception=failure_exception,
        )

on_abort

on_abort(
    *,
    failed_message: str,
    failure_exception: Exception | None = None
) -> GENERATION_PROGRESS

Call when the generation is aborted.

Parameters:

  • failed_message (str) –

    The reason the generation was aborted.

  • failure_exception (Exception, default: None ) –

    The exception that caused the generation to be aborted. Defaults to None.

Returns:

  • GENERATION_PROGRESS ( GENERATION_PROGRESS ) –

    The new state of the generation, which will be set to GENERATION_PROGRESS.ABORTED.

Raises:

  • RuntimeError

    If the generation has exceeded the maximum number of errors for the current state.

  • RuntimeError

    If the generation is in an error state and has no previous state to transition from.

Source code in horde_sdk/worker/generations_base.py
def on_abort(self, *, failed_message: str, failure_exception: Exception | None = None) -> GENERATION_PROGRESS:
    """Call when the generation is aborted.

    Args:
        failed_message (str): The reason the generation was aborted.
        failure_exception (Exception, optional): The exception that caused the generation to be aborted. \
            Defaults to None.

    Returns:
        GENERATION_PROGRESS: The new state of the generation, which will be set to `GENERATION_PROGRESS.ABORTED`.

    Raises:
        RuntimeError: If the generation has exceeded the maximum number of errors for the current state.
        RuntimeError: If the generation is in an error state and has no previous state to transition from.

    """
    return self._set_generation_progress(
        GENERATION_PROGRESS.ABORTED,
        failed_message=failed_message,
        failure_exception=failure_exception,
    )

on_preloading

on_preloading() -> GENERATION_PROGRESS

Call when preloading is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_preloading(self) -> GENERATION_PROGRESS:
    """Call when preloading is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.PRELOADING)

on_preloading_complete

on_preloading_complete() -> GENERATION_PROGRESS

Call after preloading is complete.

Source code in horde_sdk/worker/generations_base.py
def on_preloading_complete(self) -> GENERATION_PROGRESS:
    """Call after preloading is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.PRELOADING_COMPLETE)

on_generation_work_complete

on_generation_work_complete(
    result: (
        GenerationResultTypeVar
        | Collection[GenerationResultTypeVar]
        | None
    ) = None,
) -> GENERATION_PROGRESS

Call when the generation work is complete, such as when inference is done.

This does not mean the generation is finalized, as calling this function means that safety checks and submission may still be pending. Examples of when this function would be called are when comfyui has just returned an image, interrogating an image has just completed or when a text backend has just generated text.

Source code in horde_sdk/worker/generations_base.py
def on_generation_work_complete(
    self,
    result: GenerationResultTypeVar | Collection[GenerationResultTypeVar] | None = None,
) -> GENERATION_PROGRESS:
    """Call when the generation work is complete, such as when inference is done.

    This does not mean the generation is finalized, as calling this function means that safety checks and
    submission may still be pending. Examples of when this function would be called are when comfyui has
    just returned an image, interrogating an image has just completed or when a text backend has just generated
    text.
    """
    if self.requires_post_processing and not self._black_box_mode:
        return self._set_generation_progress(GENERATION_PROGRESS.PENDING_POST_PROCESSING)

    work_complete_progress = self._work_complete()
    if result is not None:
        self._set_generation_work_result(result)
    return work_complete_progress

on_generating

on_generating() -> GENERATION_PROGRESS

Call when the generation is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_generating(self) -> GENERATION_PROGRESS:
    """Call when the generation is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.GENERATING)

on_post_processing

on_post_processing() -> GENERATION_PROGRESS

Call when post-processing is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_post_processing(self) -> GENERATION_PROGRESS:
    """Call when post-processing is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.POST_PROCESSING)

on_post_processing_complete

on_post_processing_complete() -> GENERATION_PROGRESS

Call when post-processing is complete.

Source code in horde_sdk/worker/generations_base.py
def on_post_processing_complete(self) -> GENERATION_PROGRESS:
    """Call when post-processing is complete."""
    return self._work_complete()

on_pending_safety_check

on_pending_safety_check() -> GENERATION_PROGRESS

Call when the generation is pending safety check.

Source code in horde_sdk/worker/generations_base.py
def on_pending_safety_check(self) -> GENERATION_PROGRESS:
    """Call when the generation is pending safety check."""
    return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SAFETY_CHECK)

set_work_result

set_work_result(
    result: (
        GenerationResultTypeVar
        | Collection[GenerationResultTypeVar]
    ),
) -> None

Set the result of the generation work.

Parameters:

  • result (Any) –

    The result of the generation work.

Source code in horde_sdk/worker/generations_base.py
def set_work_result(self, result: GenerationResultTypeVar | Collection[GenerationResultTypeVar]) -> None:
    """Set the result of the generation work.

    Args:
        result (Any): The result of the generation work.
    """
    self._set_generation_work_result(result)

on_complete

on_complete() -> GENERATION_PROGRESS

Call when the generation is complete.

Source code in horde_sdk/worker/generations_base.py
def on_complete(self) -> GENERATION_PROGRESS:
    """Call when the generation is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.COMPLETE)

on_state

on_state(state: GENERATION_PROGRESS) -> GENERATION_PROGRESS

Call when the generation state is updated.

Parameters:

Source code in horde_sdk/worker/generations_base.py
def on_state(
    self,
    state: GENERATION_PROGRESS,
) -> GENERATION_PROGRESS:
    """Call when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The new state of the generation.
    """
    match state:
        case GENERATION_PROGRESS.PRELOADING:
            return self.on_preloading()
        case GENERATION_PROGRESS.PRELOADING_COMPLETE:
            return self.on_preloading_complete()
        case GENERATION_PROGRESS.GENERATING:
            return self.on_generating()
        case GENERATION_PROGRESS.POST_PROCESSING:
            return self.on_post_processing()
        case GENERATION_PROGRESS.PENDING_POST_PROCESSING:
            return self.on_post_processing_complete()
        case GENERATION_PROGRESS.PENDING_SAFETY_CHECK:
            return self.on_pending_safety_check()
        case GENERATION_PROGRESS.SAFETY_CHECKING:
            return self.on_safety_checking()
        case GENERATION_PROGRESS.PENDING_SUBMIT:
            return self.on_pending_submit()
        case GENERATION_PROGRESS.SUBMITTING:
            return self.on_submitting()
        case GENERATION_PROGRESS.SUBMIT_COMPLETE:
            return self.on_submit_complete()
        case GENERATION_PROGRESS.COMPLETE:
            return self.on_complete()
        case _:
            return self._set_generation_progress(state)

step

step(state: GENERATION_PROGRESS) -> GENERATION_PROGRESS

Call when the generation state is updated.

Parameters:

Source code in horde_sdk/worker/generations_base.py
def step(self, state: GENERATION_PROGRESS) -> GENERATION_PROGRESS:
    """Call when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The new state of the generation.
    """
    return self.on_state(state)

is_safety_checking_done_on_all_batch

is_safety_checking_done_on_all_batch() -> bool

Check if the safety check is being done on a one-on-all basis.

Returns:

  • bool ( bool ) –

    True if the safety check is being done on a one-on-all basis, False otherwise.

Source code in horde_sdk/worker/generations_base.py
def is_safety_checking_done_on_all_batch(self) -> bool:
    """Check if the safety check is being done on a one-on-all basis.

    Returns:
        bool: True if the safety check is being done on a one-on-all basis, False otherwise.
    """
    all_batch_result_complete = False
    with self._lock:
        if self._safety_results is not None:
            all_batch_result_complete = all(result is not None for result in self._safety_results)

    return all_batch_result_complete

get_safety_check_results

get_safety_check_results() -> list[SafetyResult | None]

Get the results of the safety checks.

Returns:

  • list[SafetyResult | None]

    list[SafetyResult | None]: The results of the safety checks for each batch.

Source code in horde_sdk/worker/generations_base.py
def get_safety_check_results(self) -> list[SafetyResult | None]:
    """Get the results of the safety checks.

    Returns:
        list[SafetyResult | None]: The results of the safety checks for each batch.
    """
    with self._lock:
        return self._safety_results.copy() if self._safety_results is not None else None

on_safety_checking

on_safety_checking() -> GENERATION_PROGRESS

Call when the safety check is about to start.

Source code in horde_sdk/worker/generations_base.py
def on_safety_checking(self) -> GENERATION_PROGRESS:
    """Call when the safety check is about to start."""
    return self._set_generation_progress(GENERATION_PROGRESS.SAFETY_CHECKING)

on_safety_check_complete

on_safety_check_complete(
    batch_index: int, safety_result: SafetyResult
) -> GENERATION_PROGRESS

Call when the safety check is complete.

Parameters:

  • batch_index (int) –

    The index of the batch to set the safety check result for. This is 0-indexed and must match the position of the result_ids provided during initialization.

  • safety_result (SafetyResult) –

    The result of the safety check.

Source code in horde_sdk/worker/generations_base.py
def on_safety_check_complete(self, batch_index: int, safety_result: SafetyResult) -> GENERATION_PROGRESS:
    """Call when the safety check is complete.

    Args:
        batch_index (int): The index of the batch to set the safety check result for.
            This is 0-indexed and must match the position of the result_ids provided during initialization.
        safety_result (SafetyResult): The result of the safety check.
    """
    self._set_safety_check_result(
        batch_index=batch_index,
        safety_result=safety_result,
    )

    if not self.is_safety_checking_done_on_all_batch():
        return GENERATION_PROGRESS.SAFETY_CHECKING

    if self._requires_submit:
        return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SUBMIT)

    return self._set_generation_progress(GENERATION_PROGRESS.COMPLETE)

on_pending_submit

on_pending_submit() -> GENERATION_PROGRESS

Call when the generation is pending submission.

Source code in horde_sdk/worker/generations_base.py
def on_pending_submit(self) -> GENERATION_PROGRESS:
    """Call when the generation is pending submission."""
    return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SUBMIT)

on_submitting

on_submitting() -> GENERATION_PROGRESS

Call when an attempt is going to be made to submit the generation.

Source code in horde_sdk/worker/generations_base.py
def on_submitting(self) -> GENERATION_PROGRESS:
    """Call when an attempt is going to be made to submit the generation."""
    return self._set_generation_progress(GENERATION_PROGRESS.SUBMITTING)

on_submit_complete

on_submit_complete() -> GENERATION_PROGRESS

Call when the generation has been successfully submitted.

Source code in horde_sdk/worker/generations_base.py
def on_submit_complete(self) -> GENERATION_PROGRESS:
    """Call when the generation has been successfully submitted."""
    return self._set_generation_progress(GENERATION_PROGRESS.SUBMIT_COMPLETE)

on_user_requested_abort

on_user_requested_abort() -> GENERATION_PROGRESS

Call when the user requests to abort the generation.

Source code in horde_sdk/worker/generations_base.py
def on_user_requested_abort(self) -> GENERATION_PROGRESS:
    """Call when the user requests to abort the generation."""
    return self._set_generation_progress(GENERATION_PROGRESS.USER_REQUESTED_ABORT)

on_user_abort_complete

on_user_abort_complete() -> GENERATION_PROGRESS

Call when the user requested abort is complete.

Source code in horde_sdk/worker/generations_base.py
def on_user_abort_complete(self) -> GENERATION_PROGRESS:
    """Call when the user requested abort is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.USER_ABORT_COMPLETE)

AlchemySingleGeneration

Bases: HordeSingleGeneration[bytes]

A single alchemy generation. Alchemy is generally transformative or analytical in nature.

'Generation' is used more broadly here than in the context of AI generation for the sake of naming consistency.

Generally an input might be an image and the output could be anything, such as the input image upscaled with a model, caption text, or whether the image is NSFW or not (bool).

Not to be confused with a job, which can contain multiple generations.

Source code in horde_sdk/worker/generations.py
class AlchemySingleGeneration(HordeSingleGeneration[bytes]):
    """A single alchemy generation. Alchemy is generally transformative or analytical in nature.

    'Generation' is used more broadly here than in the context of AI generation for the sake of naming consistency.

    Generally an input might be an image and the output could be anything, such as the input `image` upscaled with a \
    model, caption `text`, or whether the image is NSFW or not (`bool`).

    **Not to be confused with a job**, which can contain multiple generations.
    """

    @override
    @classmethod
    def default_generate_progress_transitions(cls) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
        return default_alchemy_generate_progress_transitions

    @override
    @classmethod
    def default_generate_progress_transitions_no_submit(
        cls,
    ) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
        return default_alchemy_generate_progress_no_submit_transitions

    @override
    @classmethod
    def does_class_require_generation(cls) -> bool:
        return False

    @override
    @classmethod
    def does_class_require_safety_check(cls) -> bool:
        return False

    generation_parameters: SingleAlchemyParameters
    """The parameters for the generation."""

    def __init__(
        self,
        *,
        generation_parameters: SingleAlchemyParameters,
        generation_id: str | None = None,
        dispatch_result_ids: Sequence[ID_TYPES] | None = None,
        result_ids: list[ID_TYPES] | None = None,
        requires_generation: bool = False,
        requires_post_processing: bool = True,
        requires_safety_check: bool = False,
        requires_submit: bool = True,
        safety_rules: SafetyRules = default_image_safety_rules,
        black_box_mode: bool = False,
        state_error_limits: (
            Mapping[GENERATION_PROGRESS, int] | None
        ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
        strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
        extra_logging: bool = False,
    ) -> None:
        """Initialize the generation.

        Args:
            generation_parameters (SingleAlchemyParameters): The parameters for the generation.
            generation_id (str | None): The unique identifier for the generation. \
                If None, a random UUID will be generated.
            dispatch_result_ids (Sequence[str | uuid.UUID] | None): Result identifiers assigned by dispatch, if
                available.
            result_ids (list[ID_TYPES] | None): The unique identifiers for the generation. \
                If None, a random UUID will be generated for each result.
            requires_generation (bool, optional): Whether the generation requires generation. \
                Defaults to False.
            requires_post_processing (bool, optional): Whether the generation requires post-processing. \
                Defaults to True.
            requires_safety_check (bool, optional): Whether the generation requires a safety check. \
                Defaults to False.
            requires_submit (bool, optional): Whether the generation requires submission. \
                Defaults to True.
            safety_rules (SafetyRules, optional): The safety rules to apply to the generation. \
                Defaults to default_image_safety_rules from `horde_sdk.safety`.
            black_box_mode (bool, optional): Whether the generation is in black box mode. \
                This removes all of the intermediate states between starting and finished states. \
                This should only be used when the backend has no observability into the generation process. \
                Defaults to False.
            state_error_limits (Mapping[GENERATION_PROGRESS, int], optional): The maximum number of times a \
                generation can be in an error state before it is considered failed. \
                Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.
            strict_transition_mode (bool, optional): Whether or not to enforce strict transition checking. \
                This prevents setting the same state multiple times in a row. \
                Defaults to True.
            extra_logging (bool, optional): Whether or not to enable extra debug-level logging, \
                especially for state transitions. \
                Defaults to True.
        """
        generate_progress_transitions = self.default_generate_progress_transitions()

        if not requires_safety_check and not requires_submit:
            generate_progress_transitions = self.default_generate_progress_transitions_no_submit()
        elif requires_safety_check and not requires_submit:
            generate_progress_transitions = base_generate_progress_no_submit_transitions
        elif requires_safety_check and requires_submit:
            generate_progress_transitions = base_generate_progress_transitions

        super().__init__(
            generation_parameters=generation_parameters,
            result_type=bytes,
            generation_id=generation_id,
            dispatch_result_ids=dispatch_result_ids,
            result_ids=result_ids,
            requires_generation=requires_generation,
            requires_post_processing=requires_post_processing,
            requires_safety_check=requires_safety_check,
            requires_submit=requires_submit,
            safety_rules=safety_rules,
            state_error_limits=state_error_limits,
            generate_progress_transitions=generate_progress_transitions,
            black_box_mode=black_box_mode,
            strict_transition_mode=strict_transition_mode,
            extra_logging=extra_logging,
        )

    @classmethod
    def from_template(
        cls,
        template: SingleAlchemyParametersTemplate,
        *,
        source_image: bytes | str | None = None,
        default_form: str | None = None,
        generation_id: ID_TYPES | None = None,
        dispatch_result_ids: Sequence[ID_TYPES] | None = None,
        result_id: ID_TYPES | None = None,
        allocator: ResultIdAllocator | None = None,
        seed: str = "alchemy",
        init_kwargs: AlchemyGenerationInitKwargs | None = None,
    ) -> AlchemySingleGeneration:
        """Instantiate an alchemy generation from a template."""
        generation_parameters = template.to_parameters(
            result_id=result_id,
            source_image=source_image,
            default_form=default_form,
            allocator=allocator,
            seed=seed,
        )
        resolved_kwargs: AlchemyGenerationInitKwargs = {}
        if init_kwargs:
            resolved_kwargs.update(init_kwargs)
        if generation_id is not None:
            resolved_kwargs.setdefault("generation_id", _stringify_id(generation_id))
        else:
            resolved_kwargs.setdefault("generation_id", _stringify_id(generation_parameters.result_id))
        if dispatch_result_ids is not None:
            resolved_kwargs.setdefault(
                "dispatch_result_ids",
                [str(identifier) for identifier in dispatch_result_ids],
            )
        resolved_kwargs.setdefault("result_ids", [generation_parameters.result_id])
        return cls(
            generation_parameters=generation_parameters,
            **resolved_kwargs,
        )

generation_parameters instance-attribute

generation_parameters: SingleAlchemyParameters

The parameters for the generation.

result_type property

result_type: type[GenerationResultTypeVar]

The type of the result of the generation.

batch_size property

batch_size: int

The batch size of the generation.

result_ids property

result_ids: list[ID_TYPES]

The unique identifiers for the generations in the batch.

black_box_mode property

black_box_mode: bool

Whether or not the generation is in black box mode.

generation_id instance-attribute

generation_id: ID_TYPES

Unique identifier for the generation.

dispatch_result_ids property

dispatch_result_ids: list[ID_TYPES] | None

Identifiers supplied by the dispatch source, if any.

short_id property

short_id: str

Get a short identifier for the generation.

generation_failure_count property

generation_failure_count: int

The number of times the generation has failed during any step of the generation process.

errored_states property

errored_states: list[tuple[GENERATION_PROGRESS, float]]

Return a tuple of states which occurred just before an error state and the time they were set.

error_counts property

error_counts: dict[GENERATION_PROGRESS, int]

Return a dictionary of states and the number of times they occurred before an error state.

requires_post_processing property

requires_post_processing: bool

Whether or not the generation requires post-processing.

requires_generation property

requires_generation: bool

Whether or not the generation requires generation.

requires_safety_check property

requires_safety_check: bool

Whether or not the generation requires a safety check.

requires_submit property

requires_submit: bool

Whether or not the generation requires submission.

generation_results property

generation_results: OrderedDict[
    ID_TYPES, GenerationResultTypeVar | None
]

Get the result of the generation.

default_generate_progress_transitions classmethod

default_generate_progress_transitions() -> (
    dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]
)
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def default_generate_progress_transitions(cls) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
    return default_alchemy_generate_progress_transitions

default_generate_progress_transitions_no_submit classmethod

default_generate_progress_transitions_no_submit() -> (
    dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]
)
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def default_generate_progress_transitions_no_submit(
    cls,
) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
    return default_alchemy_generate_progress_no_submit_transitions

does_class_require_generation classmethod

does_class_require_generation() -> bool
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def does_class_require_generation(cls) -> bool:
    return False

does_class_require_safety_check classmethod

does_class_require_safety_check() -> bool
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def does_class_require_safety_check(cls) -> bool:
    return False

__init__

__init__(
    *,
    generation_parameters: SingleAlchemyParameters,
    generation_id: str | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    result_ids: list[ID_TYPES] | None = None,
    requires_generation: bool = False,
    requires_post_processing: bool = True,
    requires_safety_check: bool = False,
    requires_submit: bool = True,
    safety_rules: SafetyRules = default_image_safety_rules,
    black_box_mode: bool = False,
    state_error_limits: (
        Mapping[GENERATION_PROGRESS, int] | None
    ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
    strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
    extra_logging: bool = False
) -> None

Initialize the generation.

Parameters:

  • generation_parameters (SingleAlchemyParameters) –

    The parameters for the generation.

  • generation_id (str | None, default: None ) –

    The unique identifier for the generation. If None, a random UUID will be generated.

  • dispatch_result_ids (Sequence[str | UUID] | None, default: None ) –

    Result identifiers assigned by dispatch, if available.

  • result_ids (list[ID_TYPES] | None, default: None ) –

    The unique identifiers for the generation. If None, a random UUID will be generated for each result.

  • requires_generation (bool, default: False ) –

    Whether the generation requires generation. Defaults to False.

  • requires_post_processing (bool, default: True ) –

    Whether the generation requires post-processing. Defaults to True.

  • requires_safety_check (bool, default: False ) –

    Whether the generation requires a safety check. Defaults to False.

  • requires_submit (bool, default: True ) –

    Whether the generation requires submission. Defaults to True.

  • safety_rules (SafetyRules, default: default_image_safety_rules ) –

    The safety rules to apply to the generation. Defaults to default_image_safety_rules from horde_sdk.safety.

  • black_box_mode (bool, default: False ) –

    Whether the generation is in black box mode. This removes all of the intermediate states between starting and finished states. This should only be used when the backend has no observability into the generation process. Defaults to False.

  • state_error_limits (Mapping[GENERATION_PROGRESS, int], default: DEFAULT_STATE_ERROR_LIMITS ) –

    The maximum number of times a generation can be in an error state before it is considered failed. Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.

  • strict_transition_mode (bool, default: DEFAULT_GENERATION_STRICT_TRANSITION_MODE ) –

    Whether or not to enforce strict transition checking. This prevents setting the same state multiple times in a row. Defaults to True.

  • extra_logging (bool, default: False ) –

    Whether or not to enable extra debug-level logging, especially for state transitions. Defaults to True.

Source code in horde_sdk/worker/generations.py
def __init__(
    self,
    *,
    generation_parameters: SingleAlchemyParameters,
    generation_id: str | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    result_ids: list[ID_TYPES] | None = None,
    requires_generation: bool = False,
    requires_post_processing: bool = True,
    requires_safety_check: bool = False,
    requires_submit: bool = True,
    safety_rules: SafetyRules = default_image_safety_rules,
    black_box_mode: bool = False,
    state_error_limits: (
        Mapping[GENERATION_PROGRESS, int] | None
    ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
    strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
    extra_logging: bool = False,
) -> None:
    """Initialize the generation.

    Args:
        generation_parameters (SingleAlchemyParameters): The parameters for the generation.
        generation_id (str | None): The unique identifier for the generation. \
            If None, a random UUID will be generated.
        dispatch_result_ids (Sequence[str | uuid.UUID] | None): Result identifiers assigned by dispatch, if
            available.
        result_ids (list[ID_TYPES] | None): The unique identifiers for the generation. \
            If None, a random UUID will be generated for each result.
        requires_generation (bool, optional): Whether the generation requires generation. \
            Defaults to False.
        requires_post_processing (bool, optional): Whether the generation requires post-processing. \
            Defaults to True.
        requires_safety_check (bool, optional): Whether the generation requires a safety check. \
            Defaults to False.
        requires_submit (bool, optional): Whether the generation requires submission. \
            Defaults to True.
        safety_rules (SafetyRules, optional): The safety rules to apply to the generation. \
            Defaults to default_image_safety_rules from `horde_sdk.safety`.
        black_box_mode (bool, optional): Whether the generation is in black box mode. \
            This removes all of the intermediate states between starting and finished states. \
            This should only be used when the backend has no observability into the generation process. \
            Defaults to False.
        state_error_limits (Mapping[GENERATION_PROGRESS, int], optional): The maximum number of times a \
            generation can be in an error state before it is considered failed. \
            Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.
        strict_transition_mode (bool, optional): Whether or not to enforce strict transition checking. \
            This prevents setting the same state multiple times in a row. \
            Defaults to True.
        extra_logging (bool, optional): Whether or not to enable extra debug-level logging, \
            especially for state transitions. \
            Defaults to True.
    """
    generate_progress_transitions = self.default_generate_progress_transitions()

    if not requires_safety_check and not requires_submit:
        generate_progress_transitions = self.default_generate_progress_transitions_no_submit()
    elif requires_safety_check and not requires_submit:
        generate_progress_transitions = base_generate_progress_no_submit_transitions
    elif requires_safety_check and requires_submit:
        generate_progress_transitions = base_generate_progress_transitions

    super().__init__(
        generation_parameters=generation_parameters,
        result_type=bytes,
        generation_id=generation_id,
        dispatch_result_ids=dispatch_result_ids,
        result_ids=result_ids,
        requires_generation=requires_generation,
        requires_post_processing=requires_post_processing,
        requires_safety_check=requires_safety_check,
        requires_submit=requires_submit,
        safety_rules=safety_rules,
        state_error_limits=state_error_limits,
        generate_progress_transitions=generate_progress_transitions,
        black_box_mode=black_box_mode,
        strict_transition_mode=strict_transition_mode,
        extra_logging=extra_logging,
    )

from_template classmethod

from_template(
    template: SingleAlchemyParametersTemplate,
    *,
    source_image: bytes | str | None = None,
    default_form: str | None = None,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    result_id: ID_TYPES | None = None,
    allocator: ResultIdAllocator | None = None,
    seed: str = "alchemy",
    init_kwargs: AlchemyGenerationInitKwargs | None = None
) -> AlchemySingleGeneration

Instantiate an alchemy generation from a template.

Source code in horde_sdk/worker/generations.py
@classmethod
def from_template(
    cls,
    template: SingleAlchemyParametersTemplate,
    *,
    source_image: bytes | str | None = None,
    default_form: str | None = None,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    result_id: ID_TYPES | None = None,
    allocator: ResultIdAllocator | None = None,
    seed: str = "alchemy",
    init_kwargs: AlchemyGenerationInitKwargs | None = None,
) -> AlchemySingleGeneration:
    """Instantiate an alchemy generation from a template."""
    generation_parameters = template.to_parameters(
        result_id=result_id,
        source_image=source_image,
        default_form=default_form,
        allocator=allocator,
        seed=seed,
    )
    resolved_kwargs: AlchemyGenerationInitKwargs = {}
    if init_kwargs:
        resolved_kwargs.update(init_kwargs)
    if generation_id is not None:
        resolved_kwargs.setdefault("generation_id", _stringify_id(generation_id))
    else:
        resolved_kwargs.setdefault("generation_id", _stringify_id(generation_parameters.result_id))
    if dispatch_result_ids is not None:
        resolved_kwargs.setdefault(
            "dispatch_result_ids",
            [str(identifier) for identifier in dispatch_result_ids],
        )
    resolved_kwargs.setdefault("result_ids", [generation_parameters.result_id])
    return cls(
        generation_parameters=generation_parameters,
        **resolved_kwargs,
    )

default_interrupt_states classmethod

default_interrupt_states() -> set[GENERATION_PROGRESS]

Get the default interrupt states.

Source code in horde_sdk/worker/generations_base.py
@classmethod
def default_interrupt_states(cls) -> set[GENERATION_PROGRESS]:
    """Get the default interrupt states."""
    return {GENERATION_PROGRESS.ABORTED, GENERATION_PROGRESS.USER_REQUESTED_ABORT, GENERATION_PROGRESS.ABANDONED}

register_callback

register_callback(
    state: GENERATION_PROGRESS, callback: Callable[[], None]
) -> None

Register a callback to be called when the generation state is updated.

Parameters:

  • state (GENERATION_PROGRESS) –

    The state to register the callback for.

  • callback (Callable[[], None]) –

    The callback to call when the state is updated.

Source code in horde_sdk/worker/generations_base.py
def register_callback(
    self,
    state: GENERATION_PROGRESS,
    callback: Callable[[], None],
) -> None:
    """Register a callback to be called when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The state to register the callback for.
        callback (Callable[[], None]): The callback to call when the state is updated.
    """
    with self._lock:
        self._registered_callbacks[state].append(callback)

set_dispatch_result_ids

set_dispatch_result_ids(
    dispatch_result_ids: Sequence[ID_TYPES] | None,
) -> None

Bind the generation to the result identifiers supplied by dispatch.

Source code in horde_sdk/worker/generations_base.py
def set_dispatch_result_ids(self, dispatch_result_ids: Sequence[ID_TYPES] | None) -> None:
    """Bind the generation to the result identifiers supplied by dispatch."""
    with self._lock:
        self._dispatch_result_ids = list(dispatch_result_ids) if dispatch_result_ids is not None else None

get_generation_progress

get_generation_progress() -> GENERATION_PROGRESS

Return the state of the generation.

Source code in horde_sdk/worker/generations_base.py
def get_generation_progress(self) -> GENERATION_PROGRESS:
    """Return the state of the generation."""
    with self._lock:
        return self._generation_progress

get_progress_history

get_progress_history() -> (
    list[tuple[GENERATION_PROGRESS, float]]
)

Get the generation progress history.

Source code in horde_sdk/worker/generations_base.py
def get_progress_history(self) -> list[tuple[GENERATION_PROGRESS, float]]:
    """Get the generation progress history."""
    with self._lock:
        return self._progress_history.copy()

get_last_non_error_state

get_last_non_error_state() -> GENERATION_PROGRESS

Get the last non-error state.

Source code in horde_sdk/worker/generations_base.py
def get_last_non_error_state(self) -> GENERATION_PROGRESS:
    """Get the last non-error state."""
    with self._lock:
        for state, _ in reversed(self._progress_history):
            if state != GENERATION_PROGRESS.ERROR:
                return state
        raise RuntimeError("No non-error state found in progress history")

get_last_non_error_state_and_time

get_last_non_error_state_and_time() -> (
    tuple[GENERATION_PROGRESS, float]
)

Get the last non-error state and the time it was set.

Source code in horde_sdk/worker/generations_base.py
def get_last_non_error_state_and_time(self) -> tuple[GENERATION_PROGRESS, float]:
    """Get the last non-error state and the time it was set."""
    with self._lock:
        for state, time_set in reversed(self._progress_history):
            if state != GENERATION_PROGRESS.ERROR:
                return state, time_set
        raise RuntimeError("No non-error state found in progress history")

is_next_state_valid

is_next_state_valid(
    next_state: GENERATION_PROGRESS,
) -> bool

Check if the next state is valid.

Source code in horde_sdk/worker/generations_base.py
def is_next_state_valid(
    self,
    next_state: GENERATION_PROGRESS,
) -> bool:
    """Check if the next state is valid."""
    with self._lock:
        if next_state in self.default_interrupt_states():
            return True

        current_state = self._generation_progress

        if self._strict_transition_mode:
            if current_state == next_state:
                return False

            if self._any_error_count_exceeded:
                return False

            if current_state == GENERATION_PROGRESS.ERROR and next_state == GENERATION_PROGRESS.ERROR:
                return False

        if current_state == GENERATION_PROGRESS.ERROR and len(self._progress_history) < 2:
            return False

        state_error_count = self._error_counts.get(next_state, 0)
        state_error_limit = (
            self._state_error_limits.get(next_state, float("inf")) if self._state_error_limits else float("inf")
        )
        error_count_exceeded = state_error_count >= state_error_limit

        if error_count_exceeded:
            return False

    return True

on_error

on_error(
    *,
    failed_message: str,
    failure_exception: Exception | None = None
) -> GENERATION_PROGRESS

Call when an error occurs at any point in the generation process, safety checks, or submission.

This should be reserved for errors which make the current step impossible to complete. For example, if the a sub-process is OOM killed.

Contrast with the add_metadata_entry method, which is used for non-critical errors. If there is no METADATA_TYPE for the error you encountered, you can use failed_message and failure_exception instead.

Parameters:

  • failed_message (str) –

    The reason the generation failed.

  • failure_exception (Exception, default: None ) –

    The exception that caused the generation to fail. Defaults to None.

Returns:

  • GENERATION_PROGRESS ( GENERATION_PROGRESS ) –

    The new state of the generation, which will be set to GENERATION_PROGRESS.ERROR.

Raises:

  • RuntimeError

    If the generation has exceeded the maximum number of errors for the current state.

  • RuntimeError

    If the generation is in an error state and has no previous state to transition from.

Source code in horde_sdk/worker/generations_base.py
def on_error(self, *, failed_message: str, failure_exception: Exception | None = None) -> GENERATION_PROGRESS:
    """Call when an error occurs at any point in the generation process, safety checks, or submission.

    This should be reserved for errors which make the current step **impossible** to complete. For example, if the
    a sub-process is OOM killed.

    Contrast with the `add_metadata_entry` method, which is used for non-critical errors. If there is no
    METADATA_TYPE for the error you encountered, you can use `failed_message` and `failure_exception` instead.

    Args:
        failed_message (str): The reason the generation failed.
        failure_exception (Exception, optional): The exception that caused the generation to fail. \
            Defaults to None.

    Returns:
        GENERATION_PROGRESS: The new state of the generation, which will be set to `GENERATION_PROGRESS.ERROR`.

    Raises:
        RuntimeError: If the generation has exceeded the maximum number of errors for the current state.
        RuntimeError: If the generation is in an error state and has no previous state to transition from.
    """
    with self._lock:
        self._generation_failure_count += 1
        return self._set_generation_progress(
            GENERATION_PROGRESS.ERROR,
            failed_message=failed_message,
            failure_exception=failure_exception,
        )

on_abort

on_abort(
    *,
    failed_message: str,
    failure_exception: Exception | None = None
) -> GENERATION_PROGRESS

Call when the generation is aborted.

Parameters:

  • failed_message (str) –

    The reason the generation was aborted.

  • failure_exception (Exception, default: None ) –

    The exception that caused the generation to be aborted. Defaults to None.

Returns:

  • GENERATION_PROGRESS ( GENERATION_PROGRESS ) –

    The new state of the generation, which will be set to GENERATION_PROGRESS.ABORTED.

Raises:

  • RuntimeError

    If the generation has exceeded the maximum number of errors for the current state.

  • RuntimeError

    If the generation is in an error state and has no previous state to transition from.

Source code in horde_sdk/worker/generations_base.py
def on_abort(self, *, failed_message: str, failure_exception: Exception | None = None) -> GENERATION_PROGRESS:
    """Call when the generation is aborted.

    Args:
        failed_message (str): The reason the generation was aborted.
        failure_exception (Exception, optional): The exception that caused the generation to be aborted. \
            Defaults to None.

    Returns:
        GENERATION_PROGRESS: The new state of the generation, which will be set to `GENERATION_PROGRESS.ABORTED`.

    Raises:
        RuntimeError: If the generation has exceeded the maximum number of errors for the current state.
        RuntimeError: If the generation is in an error state and has no previous state to transition from.

    """
    return self._set_generation_progress(
        GENERATION_PROGRESS.ABORTED,
        failed_message=failed_message,
        failure_exception=failure_exception,
    )

on_preloading

on_preloading() -> GENERATION_PROGRESS

Call when preloading is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_preloading(self) -> GENERATION_PROGRESS:
    """Call when preloading is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.PRELOADING)

on_preloading_complete

on_preloading_complete() -> GENERATION_PROGRESS

Call after preloading is complete.

Source code in horde_sdk/worker/generations_base.py
def on_preloading_complete(self) -> GENERATION_PROGRESS:
    """Call after preloading is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.PRELOADING_COMPLETE)

on_generation_work_complete

on_generation_work_complete(
    result: (
        GenerationResultTypeVar
        | Collection[GenerationResultTypeVar]
        | None
    ) = None,
) -> GENERATION_PROGRESS

Call when the generation work is complete, such as when inference is done.

This does not mean the generation is finalized, as calling this function means that safety checks and submission may still be pending. Examples of when this function would be called are when comfyui has just returned an image, interrogating an image has just completed or when a text backend has just generated text.

Source code in horde_sdk/worker/generations_base.py
def on_generation_work_complete(
    self,
    result: GenerationResultTypeVar | Collection[GenerationResultTypeVar] | None = None,
) -> GENERATION_PROGRESS:
    """Call when the generation work is complete, such as when inference is done.

    This does not mean the generation is finalized, as calling this function means that safety checks and
    submission may still be pending. Examples of when this function would be called are when comfyui has
    just returned an image, interrogating an image has just completed or when a text backend has just generated
    text.
    """
    if self.requires_post_processing and not self._black_box_mode:
        return self._set_generation_progress(GENERATION_PROGRESS.PENDING_POST_PROCESSING)

    work_complete_progress = self._work_complete()
    if result is not None:
        self._set_generation_work_result(result)
    return work_complete_progress

on_generating

on_generating() -> GENERATION_PROGRESS

Call when the generation is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_generating(self) -> GENERATION_PROGRESS:
    """Call when the generation is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.GENERATING)

on_post_processing

on_post_processing() -> GENERATION_PROGRESS

Call when post-processing is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_post_processing(self) -> GENERATION_PROGRESS:
    """Call when post-processing is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.POST_PROCESSING)

on_post_processing_complete

on_post_processing_complete() -> GENERATION_PROGRESS

Call when post-processing is complete.

Source code in horde_sdk/worker/generations_base.py
def on_post_processing_complete(self) -> GENERATION_PROGRESS:
    """Call when post-processing is complete."""
    return self._work_complete()

on_pending_safety_check

on_pending_safety_check() -> GENERATION_PROGRESS

Call when the generation is pending safety check.

Source code in horde_sdk/worker/generations_base.py
def on_pending_safety_check(self) -> GENERATION_PROGRESS:
    """Call when the generation is pending safety check."""
    return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SAFETY_CHECK)

set_work_result

set_work_result(
    result: (
        GenerationResultTypeVar
        | Collection[GenerationResultTypeVar]
    ),
) -> None

Set the result of the generation work.

Parameters:

  • result (Any) –

    The result of the generation work.

Source code in horde_sdk/worker/generations_base.py
def set_work_result(self, result: GenerationResultTypeVar | Collection[GenerationResultTypeVar]) -> None:
    """Set the result of the generation work.

    Args:
        result (Any): The result of the generation work.
    """
    self._set_generation_work_result(result)

on_complete

on_complete() -> GENERATION_PROGRESS

Call when the generation is complete.

Source code in horde_sdk/worker/generations_base.py
def on_complete(self) -> GENERATION_PROGRESS:
    """Call when the generation is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.COMPLETE)

on_state

on_state(state: GENERATION_PROGRESS) -> GENERATION_PROGRESS

Call when the generation state is updated.

Parameters:

Source code in horde_sdk/worker/generations_base.py
def on_state(
    self,
    state: GENERATION_PROGRESS,
) -> GENERATION_PROGRESS:
    """Call when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The new state of the generation.
    """
    match state:
        case GENERATION_PROGRESS.PRELOADING:
            return self.on_preloading()
        case GENERATION_PROGRESS.PRELOADING_COMPLETE:
            return self.on_preloading_complete()
        case GENERATION_PROGRESS.GENERATING:
            return self.on_generating()
        case GENERATION_PROGRESS.POST_PROCESSING:
            return self.on_post_processing()
        case GENERATION_PROGRESS.PENDING_POST_PROCESSING:
            return self.on_post_processing_complete()
        case GENERATION_PROGRESS.PENDING_SAFETY_CHECK:
            return self.on_pending_safety_check()
        case GENERATION_PROGRESS.SAFETY_CHECKING:
            return self.on_safety_checking()
        case GENERATION_PROGRESS.PENDING_SUBMIT:
            return self.on_pending_submit()
        case GENERATION_PROGRESS.SUBMITTING:
            return self.on_submitting()
        case GENERATION_PROGRESS.SUBMIT_COMPLETE:
            return self.on_submit_complete()
        case GENERATION_PROGRESS.COMPLETE:
            return self.on_complete()
        case _:
            return self._set_generation_progress(state)

step

step(state: GENERATION_PROGRESS) -> GENERATION_PROGRESS

Call when the generation state is updated.

Parameters:

Source code in horde_sdk/worker/generations_base.py
def step(self, state: GENERATION_PROGRESS) -> GENERATION_PROGRESS:
    """Call when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The new state of the generation.
    """
    return self.on_state(state)

is_safety_checking_done_on_all_batch

is_safety_checking_done_on_all_batch() -> bool

Check if the safety check is being done on a one-on-all basis.

Returns:

  • bool ( bool ) –

    True if the safety check is being done on a one-on-all basis, False otherwise.

Source code in horde_sdk/worker/generations_base.py
def is_safety_checking_done_on_all_batch(self) -> bool:
    """Check if the safety check is being done on a one-on-all basis.

    Returns:
        bool: True if the safety check is being done on a one-on-all basis, False otherwise.
    """
    all_batch_result_complete = False
    with self._lock:
        if self._safety_results is not None:
            all_batch_result_complete = all(result is not None for result in self._safety_results)

    return all_batch_result_complete

get_safety_check_results

get_safety_check_results() -> list[SafetyResult | None]

Get the results of the safety checks.

Returns:

  • list[SafetyResult | None]

    list[SafetyResult | None]: The results of the safety checks for each batch.

Source code in horde_sdk/worker/generations_base.py
def get_safety_check_results(self) -> list[SafetyResult | None]:
    """Get the results of the safety checks.

    Returns:
        list[SafetyResult | None]: The results of the safety checks for each batch.
    """
    with self._lock:
        return self._safety_results.copy() if self._safety_results is not None else None

on_safety_checking

on_safety_checking() -> GENERATION_PROGRESS

Call when the safety check is about to start.

Source code in horde_sdk/worker/generations_base.py
def on_safety_checking(self) -> GENERATION_PROGRESS:
    """Call when the safety check is about to start."""
    return self._set_generation_progress(GENERATION_PROGRESS.SAFETY_CHECKING)

on_safety_check_complete

on_safety_check_complete(
    batch_index: int, safety_result: SafetyResult
) -> GENERATION_PROGRESS

Call when the safety check is complete.

Parameters:

  • batch_index (int) –

    The index of the batch to set the safety check result for. This is 0-indexed and must match the position of the result_ids provided during initialization.

  • safety_result (SafetyResult) –

    The result of the safety check.

Source code in horde_sdk/worker/generations_base.py
def on_safety_check_complete(self, batch_index: int, safety_result: SafetyResult) -> GENERATION_PROGRESS:
    """Call when the safety check is complete.

    Args:
        batch_index (int): The index of the batch to set the safety check result for.
            This is 0-indexed and must match the position of the result_ids provided during initialization.
        safety_result (SafetyResult): The result of the safety check.
    """
    self._set_safety_check_result(
        batch_index=batch_index,
        safety_result=safety_result,
    )

    if not self.is_safety_checking_done_on_all_batch():
        return GENERATION_PROGRESS.SAFETY_CHECKING

    if self._requires_submit:
        return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SUBMIT)

    return self._set_generation_progress(GENERATION_PROGRESS.COMPLETE)

on_pending_submit

on_pending_submit() -> GENERATION_PROGRESS

Call when the generation is pending submission.

Source code in horde_sdk/worker/generations_base.py
def on_pending_submit(self) -> GENERATION_PROGRESS:
    """Call when the generation is pending submission."""
    return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SUBMIT)

on_submitting

on_submitting() -> GENERATION_PROGRESS

Call when an attempt is going to be made to submit the generation.

Source code in horde_sdk/worker/generations_base.py
def on_submitting(self) -> GENERATION_PROGRESS:
    """Call when an attempt is going to be made to submit the generation."""
    return self._set_generation_progress(GENERATION_PROGRESS.SUBMITTING)

on_submit_complete

on_submit_complete() -> GENERATION_PROGRESS

Call when the generation has been successfully submitted.

Source code in horde_sdk/worker/generations_base.py
def on_submit_complete(self) -> GENERATION_PROGRESS:
    """Call when the generation has been successfully submitted."""
    return self._set_generation_progress(GENERATION_PROGRESS.SUBMIT_COMPLETE)

on_user_requested_abort

on_user_requested_abort() -> GENERATION_PROGRESS

Call when the user requests to abort the generation.

Source code in horde_sdk/worker/generations_base.py
def on_user_requested_abort(self) -> GENERATION_PROGRESS:
    """Call when the user requests to abort the generation."""
    return self._set_generation_progress(GENERATION_PROGRESS.USER_REQUESTED_ABORT)

on_user_abort_complete

on_user_abort_complete() -> GENERATION_PROGRESS

Call when the user requested abort is complete.

Source code in horde_sdk/worker/generations_base.py
def on_user_abort_complete(self) -> GENERATION_PROGRESS:
    """Call when the user requested abort is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.USER_ABORT_COMPLETE)

TextSingleGeneration

Bases: HordeSingleGeneration[str]

A single text generation.

Not to be confused with a job, which can contain multiple generations.

Source code in horde_sdk/worker/generations.py
class TextSingleGeneration(HordeSingleGeneration[str]):
    """A single text generation.

    **Not to be confused with a job**, which can contain multiple generations.
    """

    @override
    @classmethod
    def default_generate_progress_transitions(cls) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
        return default_text_generate_progress_transitions

    @override
    @classmethod
    def default_generate_progress_transitions_no_submit(
        cls,
    ) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
        return default_text_generate_progress_no_submit_transitions

    @override
    @classmethod
    def does_class_require_generation(cls) -> bool:
        return True

    @override
    @classmethod
    def does_class_require_safety_check(cls) -> bool:
        return False

    generation_parameters: TextGenerationParameters
    """The parameters for the generation."""

    def __init__(
        self,
        *,
        generation_parameters: TextGenerationParameters,
        generation_id: ID_TYPES | None = None,
        dispatch_result_ids: Sequence[ID_TYPES] | None = None,
        result_ids: list[ID_TYPES] | None = None,
        requires_generation: bool = True,
        requires_post_processing: bool = False,
        requires_safety_check: bool = False,
        requires_submit: bool = True,
        safety_rules: SafetyRules = default_text_safety_rules,
        black_box_mode: bool = False,
        state_error_limits: (
            Mapping[GENERATION_PROGRESS, int] | None
        ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
        strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
        extra_logging: bool = False,
    ) -> None:
        """Initialize the generation.

        Args:
            generation_parameters (TextGenerationParameters): The parameters for the generation.
            generation_id (str | None): The unique identifier for the generation. \
                If None, a random UUID will be generated.
            dispatch_result_ids (Sequence[str | uuid.UUID] | None): Result identifiers assigned by dispatch, if
                available.
            result_ids (list[ID_TYPES] | None): The unique identifiers for the generation. \
                If None, a random UUID will be generated for each result.
            requires_generation (bool, optional): Whether the generation requires generation. \
                Defaults to True.
            requires_post_processing (bool, optional): Whether the generation requires post-processing. \
                Defaults to False.
            requires_safety_check (bool, optional): Whether the generation requires a safety check. \
                Defaults to False.
            requires_submit (bool, optional): Whether the generation requires submission. \
                Defaults to True.
            safety_rules (SafetyRules, optional): The safety rules to apply to the generation. \
                Defaults to default_text_safety_rules from `horde_sdk.safety`.
            black_box_mode (bool, optional): Whether the generation is in black box mode. \
                This removes all of the intermediate states between starting and finished states. \
                This should only be used when the backend has no observability into the generation process. \
                Defaults to False.
            state_error_limits (Mapping[GENERATION_PROGRESS, int], optional): The maximum number of times a \
                generation can be in an error state before it is considered failed. \
                Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.
            strict_transition_mode (bool, optional): Whether or not to enforce strict transition checking. \
                This prevents setting the same state multiple times in a row. \
                Defaults to True.
            extra_logging (bool, optional): Whether or not to enable extra debug-level logging, \
                especially for state transitions. \
                Defaults to True.
        """
        if requires_generation is False:
            raise ValueError("requires_generation must be True for TextSingleGeneration")

        generate_progress_transitions = self.default_generate_progress_transitions()

        if not requires_safety_check and not requires_submit:
            generate_progress_transitions = self.default_generate_progress_transitions_no_submit()
        if requires_safety_check and requires_submit:
            generate_progress_transitions = base_generate_progress_transitions
        elif requires_safety_check and not requires_submit:
            generate_progress_transitions = base_generate_progress_no_submit_transitions

        super().__init__(
            result_type=str,
            generation_parameters=generation_parameters,
            generation_id=generation_id,
            dispatch_result_ids=dispatch_result_ids,
            result_ids=result_ids,
            requires_generation=True,
            requires_post_processing=requires_post_processing,
            requires_safety_check=requires_safety_check,
            requires_submit=requires_submit,
            safety_rules=safety_rules,
            state_error_limits=state_error_limits,
            generate_progress_transitions=generate_progress_transitions,
            black_box_mode=black_box_mode,
            strict_transition_mode=strict_transition_mode,
            extra_logging=extra_logging,
        )

    @classmethod
    def from_template(
        cls,
        template: TextGenerationParametersTemplate,
        *,
        generation_id: ID_TYPES | None = None,
        dispatch_result_ids: Sequence[ID_TYPES] | None = None,
        base_param_updates: BasicTextGenerationParametersTemplate | None = None,
        result_ids: Sequence[ID_TYPES] | None = None,
        allocator: ResultIdAllocator | None = None,
        seed: str = "text",
        init_kwargs: TextGenerationInitKwargs | None = None,
    ) -> TextSingleGeneration:
        """Instantiate a text generation from a template."""
        generation_parameters = template.to_parameters(
            base_param_updates=base_param_updates,
            result_ids=result_ids,
            allocator=allocator,
            seed=seed,
        )
        resolved_kwargs: TextGenerationInitKwargs = {}
        if init_kwargs:
            resolved_kwargs.update(init_kwargs)
        if generation_id is not None:
            resolved_kwargs.setdefault("generation_id", generation_id)
        if dispatch_result_ids is not None:
            resolved_kwargs.setdefault("dispatch_result_ids", list(dispatch_result_ids))
        resolved_kwargs.setdefault("result_ids", generation_parameters.result_ids)
        return cls(
            generation_parameters=generation_parameters,
            **resolved_kwargs,
        )

generation_parameters instance-attribute

generation_parameters: TextGenerationParameters

The parameters for the generation.

result_type property

result_type: type[GenerationResultTypeVar]

The type of the result of the generation.

batch_size property

batch_size: int

The batch size of the generation.

result_ids property

result_ids: list[ID_TYPES]

The unique identifiers for the generations in the batch.

black_box_mode property

black_box_mode: bool

Whether or not the generation is in black box mode.

generation_id instance-attribute

generation_id: ID_TYPES

Unique identifier for the generation.

dispatch_result_ids property

dispatch_result_ids: list[ID_TYPES] | None

Identifiers supplied by the dispatch source, if any.

short_id property

short_id: str

Get a short identifier for the generation.

generation_failure_count property

generation_failure_count: int

The number of times the generation has failed during any step of the generation process.

errored_states property

errored_states: list[tuple[GENERATION_PROGRESS, float]]

Return a tuple of states which occurred just before an error state and the time they were set.

error_counts property

error_counts: dict[GENERATION_PROGRESS, int]

Return a dictionary of states and the number of times they occurred before an error state.

requires_post_processing property

requires_post_processing: bool

Whether or not the generation requires post-processing.

requires_generation property

requires_generation: bool

Whether or not the generation requires generation.

requires_safety_check property

requires_safety_check: bool

Whether or not the generation requires a safety check.

requires_submit property

requires_submit: bool

Whether or not the generation requires submission.

generation_results property

generation_results: OrderedDict[
    ID_TYPES, GenerationResultTypeVar | None
]

Get the result of the generation.

default_generate_progress_transitions classmethod

default_generate_progress_transitions() -> (
    dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]
)
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def default_generate_progress_transitions(cls) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
    return default_text_generate_progress_transitions

default_generate_progress_transitions_no_submit classmethod

default_generate_progress_transitions_no_submit() -> (
    dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]
)
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def default_generate_progress_transitions_no_submit(
    cls,
) -> dict[GENERATION_PROGRESS, list[GENERATION_PROGRESS]]:
    return default_text_generate_progress_no_submit_transitions

does_class_require_generation classmethod

does_class_require_generation() -> bool
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def does_class_require_generation(cls) -> bool:
    return True

does_class_require_safety_check classmethod

does_class_require_safety_check() -> bool
Source code in horde_sdk/worker/generations.py
@override
@classmethod
def does_class_require_safety_check(cls) -> bool:
    return False

__init__

__init__(
    *,
    generation_parameters: TextGenerationParameters,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    result_ids: list[ID_TYPES] | None = None,
    requires_generation: bool = True,
    requires_post_processing: bool = False,
    requires_safety_check: bool = False,
    requires_submit: bool = True,
    safety_rules: SafetyRules = default_text_safety_rules,
    black_box_mode: bool = False,
    state_error_limits: (
        Mapping[GENERATION_PROGRESS, int] | None
    ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
    strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
    extra_logging: bool = False
) -> None

Initialize the generation.

Parameters:

  • generation_parameters (TextGenerationParameters) –

    The parameters for the generation.

  • generation_id (str | None, default: None ) –

    The unique identifier for the generation. If None, a random UUID will be generated.

  • dispatch_result_ids (Sequence[str | UUID] | None, default: None ) –

    Result identifiers assigned by dispatch, if available.

  • result_ids (list[ID_TYPES] | None, default: None ) –

    The unique identifiers for the generation. If None, a random UUID will be generated for each result.

  • requires_generation (bool, default: True ) –

    Whether the generation requires generation. Defaults to True.

  • requires_post_processing (bool, default: False ) –

    Whether the generation requires post-processing. Defaults to False.

  • requires_safety_check (bool, default: False ) –

    Whether the generation requires a safety check. Defaults to False.

  • requires_submit (bool, default: True ) –

    Whether the generation requires submission. Defaults to True.

  • safety_rules (SafetyRules, default: default_text_safety_rules ) –

    The safety rules to apply to the generation. Defaults to default_text_safety_rules from horde_sdk.safety.

  • black_box_mode (bool, default: False ) –

    Whether the generation is in black box mode. This removes all of the intermediate states between starting and finished states. This should only be used when the backend has no observability into the generation process. Defaults to False.

  • state_error_limits (Mapping[GENERATION_PROGRESS, int], default: DEFAULT_STATE_ERROR_LIMITS ) –

    The maximum number of times a generation can be in an error state before it is considered failed. Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.

  • strict_transition_mode (bool, default: DEFAULT_GENERATION_STRICT_TRANSITION_MODE ) –

    Whether or not to enforce strict transition checking. This prevents setting the same state multiple times in a row. Defaults to True.

  • extra_logging (bool, default: False ) –

    Whether or not to enable extra debug-level logging, especially for state transitions. Defaults to True.

Source code in horde_sdk/worker/generations.py
def __init__(
    self,
    *,
    generation_parameters: TextGenerationParameters,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    result_ids: list[ID_TYPES] | None = None,
    requires_generation: bool = True,
    requires_post_processing: bool = False,
    requires_safety_check: bool = False,
    requires_submit: bool = True,
    safety_rules: SafetyRules = default_text_safety_rules,
    black_box_mode: bool = False,
    state_error_limits: (
        Mapping[GENERATION_PROGRESS, int] | None
    ) = HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS,
    strict_transition_mode: bool = HordeWorkerConfigDefaults.DEFAULT_GENERATION_STRICT_TRANSITION_MODE,
    extra_logging: bool = False,
) -> None:
    """Initialize the generation.

    Args:
        generation_parameters (TextGenerationParameters): The parameters for the generation.
        generation_id (str | None): The unique identifier for the generation. \
            If None, a random UUID will be generated.
        dispatch_result_ids (Sequence[str | uuid.UUID] | None): Result identifiers assigned by dispatch, if
            available.
        result_ids (list[ID_TYPES] | None): The unique identifiers for the generation. \
            If None, a random UUID will be generated for each result.
        requires_generation (bool, optional): Whether the generation requires generation. \
            Defaults to True.
        requires_post_processing (bool, optional): Whether the generation requires post-processing. \
            Defaults to False.
        requires_safety_check (bool, optional): Whether the generation requires a safety check. \
            Defaults to False.
        requires_submit (bool, optional): Whether the generation requires submission. \
            Defaults to True.
        safety_rules (SafetyRules, optional): The safety rules to apply to the generation. \
            Defaults to default_text_safety_rules from `horde_sdk.safety`.
        black_box_mode (bool, optional): Whether the generation is in black box mode. \
            This removes all of the intermediate states between starting and finished states. \
            This should only be used when the backend has no observability into the generation process. \
            Defaults to False.
        state_error_limits (Mapping[GENERATION_PROGRESS, int], optional): The maximum number of times a \
            generation can be in an error state before it is considered failed. \
            Defaults to HordeWorkerConfigDefaults.DEFAULT_STATE_ERROR_LIMITS.
        strict_transition_mode (bool, optional): Whether or not to enforce strict transition checking. \
            This prevents setting the same state multiple times in a row. \
            Defaults to True.
        extra_logging (bool, optional): Whether or not to enable extra debug-level logging, \
            especially for state transitions. \
            Defaults to True.
    """
    if requires_generation is False:
        raise ValueError("requires_generation must be True for TextSingleGeneration")

    generate_progress_transitions = self.default_generate_progress_transitions()

    if not requires_safety_check and not requires_submit:
        generate_progress_transitions = self.default_generate_progress_transitions_no_submit()
    if requires_safety_check and requires_submit:
        generate_progress_transitions = base_generate_progress_transitions
    elif requires_safety_check and not requires_submit:
        generate_progress_transitions = base_generate_progress_no_submit_transitions

    super().__init__(
        result_type=str,
        generation_parameters=generation_parameters,
        generation_id=generation_id,
        dispatch_result_ids=dispatch_result_ids,
        result_ids=result_ids,
        requires_generation=True,
        requires_post_processing=requires_post_processing,
        requires_safety_check=requires_safety_check,
        requires_submit=requires_submit,
        safety_rules=safety_rules,
        state_error_limits=state_error_limits,
        generate_progress_transitions=generate_progress_transitions,
        black_box_mode=black_box_mode,
        strict_transition_mode=strict_transition_mode,
        extra_logging=extra_logging,
    )

from_template classmethod

from_template(
    template: TextGenerationParametersTemplate,
    *,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    base_param_updates: (
        BasicTextGenerationParametersTemplate | None
    ) = None,
    result_ids: Sequence[ID_TYPES] | None = None,
    allocator: ResultIdAllocator | None = None,
    seed: str = "text",
    init_kwargs: TextGenerationInitKwargs | None = None
) -> TextSingleGeneration

Instantiate a text generation from a template.

Source code in horde_sdk/worker/generations.py
@classmethod
def from_template(
    cls,
    template: TextGenerationParametersTemplate,
    *,
    generation_id: ID_TYPES | None = None,
    dispatch_result_ids: Sequence[ID_TYPES] | None = None,
    base_param_updates: BasicTextGenerationParametersTemplate | None = None,
    result_ids: Sequence[ID_TYPES] | None = None,
    allocator: ResultIdAllocator | None = None,
    seed: str = "text",
    init_kwargs: TextGenerationInitKwargs | None = None,
) -> TextSingleGeneration:
    """Instantiate a text generation from a template."""
    generation_parameters = template.to_parameters(
        base_param_updates=base_param_updates,
        result_ids=result_ids,
        allocator=allocator,
        seed=seed,
    )
    resolved_kwargs: TextGenerationInitKwargs = {}
    if init_kwargs:
        resolved_kwargs.update(init_kwargs)
    if generation_id is not None:
        resolved_kwargs.setdefault("generation_id", generation_id)
    if dispatch_result_ids is not None:
        resolved_kwargs.setdefault("dispatch_result_ids", list(dispatch_result_ids))
    resolved_kwargs.setdefault("result_ids", generation_parameters.result_ids)
    return cls(
        generation_parameters=generation_parameters,
        **resolved_kwargs,
    )

default_interrupt_states classmethod

default_interrupt_states() -> set[GENERATION_PROGRESS]

Get the default interrupt states.

Source code in horde_sdk/worker/generations_base.py
@classmethod
def default_interrupt_states(cls) -> set[GENERATION_PROGRESS]:
    """Get the default interrupt states."""
    return {GENERATION_PROGRESS.ABORTED, GENERATION_PROGRESS.USER_REQUESTED_ABORT, GENERATION_PROGRESS.ABANDONED}

register_callback

register_callback(
    state: GENERATION_PROGRESS, callback: Callable[[], None]
) -> None

Register a callback to be called when the generation state is updated.

Parameters:

  • state (GENERATION_PROGRESS) –

    The state to register the callback for.

  • callback (Callable[[], None]) –

    The callback to call when the state is updated.

Source code in horde_sdk/worker/generations_base.py
def register_callback(
    self,
    state: GENERATION_PROGRESS,
    callback: Callable[[], None],
) -> None:
    """Register a callback to be called when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The state to register the callback for.
        callback (Callable[[], None]): The callback to call when the state is updated.
    """
    with self._lock:
        self._registered_callbacks[state].append(callback)

set_dispatch_result_ids

set_dispatch_result_ids(
    dispatch_result_ids: Sequence[ID_TYPES] | None,
) -> None

Bind the generation to the result identifiers supplied by dispatch.

Source code in horde_sdk/worker/generations_base.py
def set_dispatch_result_ids(self, dispatch_result_ids: Sequence[ID_TYPES] | None) -> None:
    """Bind the generation to the result identifiers supplied by dispatch."""
    with self._lock:
        self._dispatch_result_ids = list(dispatch_result_ids) if dispatch_result_ids is not None else None

get_generation_progress

get_generation_progress() -> GENERATION_PROGRESS

Return the state of the generation.

Source code in horde_sdk/worker/generations_base.py
def get_generation_progress(self) -> GENERATION_PROGRESS:
    """Return the state of the generation."""
    with self._lock:
        return self._generation_progress

get_progress_history

get_progress_history() -> (
    list[tuple[GENERATION_PROGRESS, float]]
)

Get the generation progress history.

Source code in horde_sdk/worker/generations_base.py
def get_progress_history(self) -> list[tuple[GENERATION_PROGRESS, float]]:
    """Get the generation progress history."""
    with self._lock:
        return self._progress_history.copy()

get_last_non_error_state

get_last_non_error_state() -> GENERATION_PROGRESS

Get the last non-error state.

Source code in horde_sdk/worker/generations_base.py
def get_last_non_error_state(self) -> GENERATION_PROGRESS:
    """Get the last non-error state."""
    with self._lock:
        for state, _ in reversed(self._progress_history):
            if state != GENERATION_PROGRESS.ERROR:
                return state
        raise RuntimeError("No non-error state found in progress history")

get_last_non_error_state_and_time

get_last_non_error_state_and_time() -> (
    tuple[GENERATION_PROGRESS, float]
)

Get the last non-error state and the time it was set.

Source code in horde_sdk/worker/generations_base.py
def get_last_non_error_state_and_time(self) -> tuple[GENERATION_PROGRESS, float]:
    """Get the last non-error state and the time it was set."""
    with self._lock:
        for state, time_set in reversed(self._progress_history):
            if state != GENERATION_PROGRESS.ERROR:
                return state, time_set
        raise RuntimeError("No non-error state found in progress history")

is_next_state_valid

is_next_state_valid(
    next_state: GENERATION_PROGRESS,
) -> bool

Check if the next state is valid.

Source code in horde_sdk/worker/generations_base.py
def is_next_state_valid(
    self,
    next_state: GENERATION_PROGRESS,
) -> bool:
    """Check if the next state is valid."""
    with self._lock:
        if next_state in self.default_interrupt_states():
            return True

        current_state = self._generation_progress

        if self._strict_transition_mode:
            if current_state == next_state:
                return False

            if self._any_error_count_exceeded:
                return False

            if current_state == GENERATION_PROGRESS.ERROR and next_state == GENERATION_PROGRESS.ERROR:
                return False

        if current_state == GENERATION_PROGRESS.ERROR and len(self._progress_history) < 2:
            return False

        state_error_count = self._error_counts.get(next_state, 0)
        state_error_limit = (
            self._state_error_limits.get(next_state, float("inf")) if self._state_error_limits else float("inf")
        )
        error_count_exceeded = state_error_count >= state_error_limit

        if error_count_exceeded:
            return False

    return True

on_error

on_error(
    *,
    failed_message: str,
    failure_exception: Exception | None = None
) -> GENERATION_PROGRESS

Call when an error occurs at any point in the generation process, safety checks, or submission.

This should be reserved for errors which make the current step impossible to complete. For example, if the a sub-process is OOM killed.

Contrast with the add_metadata_entry method, which is used for non-critical errors. If there is no METADATA_TYPE for the error you encountered, you can use failed_message and failure_exception instead.

Parameters:

  • failed_message (str) –

    The reason the generation failed.

  • failure_exception (Exception, default: None ) –

    The exception that caused the generation to fail. Defaults to None.

Returns:

  • GENERATION_PROGRESS ( GENERATION_PROGRESS ) –

    The new state of the generation, which will be set to GENERATION_PROGRESS.ERROR.

Raises:

  • RuntimeError

    If the generation has exceeded the maximum number of errors for the current state.

  • RuntimeError

    If the generation is in an error state and has no previous state to transition from.

Source code in horde_sdk/worker/generations_base.py
def on_error(self, *, failed_message: str, failure_exception: Exception | None = None) -> GENERATION_PROGRESS:
    """Call when an error occurs at any point in the generation process, safety checks, or submission.

    This should be reserved for errors which make the current step **impossible** to complete. For example, if the
    a sub-process is OOM killed.

    Contrast with the `add_metadata_entry` method, which is used for non-critical errors. If there is no
    METADATA_TYPE for the error you encountered, you can use `failed_message` and `failure_exception` instead.

    Args:
        failed_message (str): The reason the generation failed.
        failure_exception (Exception, optional): The exception that caused the generation to fail. \
            Defaults to None.

    Returns:
        GENERATION_PROGRESS: The new state of the generation, which will be set to `GENERATION_PROGRESS.ERROR`.

    Raises:
        RuntimeError: If the generation has exceeded the maximum number of errors for the current state.
        RuntimeError: If the generation is in an error state and has no previous state to transition from.
    """
    with self._lock:
        self._generation_failure_count += 1
        return self._set_generation_progress(
            GENERATION_PROGRESS.ERROR,
            failed_message=failed_message,
            failure_exception=failure_exception,
        )

on_abort

on_abort(
    *,
    failed_message: str,
    failure_exception: Exception | None = None
) -> GENERATION_PROGRESS

Call when the generation is aborted.

Parameters:

  • failed_message (str) –

    The reason the generation was aborted.

  • failure_exception (Exception, default: None ) –

    The exception that caused the generation to be aborted. Defaults to None.

Returns:

  • GENERATION_PROGRESS ( GENERATION_PROGRESS ) –

    The new state of the generation, which will be set to GENERATION_PROGRESS.ABORTED.

Raises:

  • RuntimeError

    If the generation has exceeded the maximum number of errors for the current state.

  • RuntimeError

    If the generation is in an error state and has no previous state to transition from.

Source code in horde_sdk/worker/generations_base.py
def on_abort(self, *, failed_message: str, failure_exception: Exception | None = None) -> GENERATION_PROGRESS:
    """Call when the generation is aborted.

    Args:
        failed_message (str): The reason the generation was aborted.
        failure_exception (Exception, optional): The exception that caused the generation to be aborted. \
            Defaults to None.

    Returns:
        GENERATION_PROGRESS: The new state of the generation, which will be set to `GENERATION_PROGRESS.ABORTED`.

    Raises:
        RuntimeError: If the generation has exceeded the maximum number of errors for the current state.
        RuntimeError: If the generation is in an error state and has no previous state to transition from.

    """
    return self._set_generation_progress(
        GENERATION_PROGRESS.ABORTED,
        failed_message=failed_message,
        failure_exception=failure_exception,
    )

on_preloading

on_preloading() -> GENERATION_PROGRESS

Call when preloading is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_preloading(self) -> GENERATION_PROGRESS:
    """Call when preloading is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.PRELOADING)

on_preloading_complete

on_preloading_complete() -> GENERATION_PROGRESS

Call after preloading is complete.

Source code in horde_sdk/worker/generations_base.py
def on_preloading_complete(self) -> GENERATION_PROGRESS:
    """Call after preloading is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.PRELOADING_COMPLETE)

on_generation_work_complete

on_generation_work_complete(
    result: (
        GenerationResultTypeVar
        | Collection[GenerationResultTypeVar]
        | None
    ) = None,
) -> GENERATION_PROGRESS

Call when the generation work is complete, such as when inference is done.

This does not mean the generation is finalized, as calling this function means that safety checks and submission may still be pending. Examples of when this function would be called are when comfyui has just returned an image, interrogating an image has just completed or when a text backend has just generated text.

Source code in horde_sdk/worker/generations_base.py
def on_generation_work_complete(
    self,
    result: GenerationResultTypeVar | Collection[GenerationResultTypeVar] | None = None,
) -> GENERATION_PROGRESS:
    """Call when the generation work is complete, such as when inference is done.

    This does not mean the generation is finalized, as calling this function means that safety checks and
    submission may still be pending. Examples of when this function would be called are when comfyui has
    just returned an image, interrogating an image has just completed or when a text backend has just generated
    text.
    """
    if self.requires_post_processing and not self._black_box_mode:
        return self._set_generation_progress(GENERATION_PROGRESS.PENDING_POST_PROCESSING)

    work_complete_progress = self._work_complete()
    if result is not None:
        self._set_generation_work_result(result)
    return work_complete_progress

on_generating

on_generating() -> GENERATION_PROGRESS

Call when the generation is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_generating(self) -> GENERATION_PROGRESS:
    """Call when the generation is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.GENERATING)

on_post_processing

on_post_processing() -> GENERATION_PROGRESS

Call when post-processing is about to begin.

Source code in horde_sdk/worker/generations_base.py
def on_post_processing(self) -> GENERATION_PROGRESS:
    """Call when post-processing is about to begin."""
    return self._set_generation_progress(GENERATION_PROGRESS.POST_PROCESSING)

on_post_processing_complete

on_post_processing_complete() -> GENERATION_PROGRESS

Call when post-processing is complete.

Source code in horde_sdk/worker/generations_base.py
def on_post_processing_complete(self) -> GENERATION_PROGRESS:
    """Call when post-processing is complete."""
    return self._work_complete()

on_pending_safety_check

on_pending_safety_check() -> GENERATION_PROGRESS

Call when the generation is pending safety check.

Source code in horde_sdk/worker/generations_base.py
def on_pending_safety_check(self) -> GENERATION_PROGRESS:
    """Call when the generation is pending safety check."""
    return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SAFETY_CHECK)

set_work_result

set_work_result(
    result: (
        GenerationResultTypeVar
        | Collection[GenerationResultTypeVar]
    ),
) -> None

Set the result of the generation work.

Parameters:

  • result (Any) –

    The result of the generation work.

Source code in horde_sdk/worker/generations_base.py
def set_work_result(self, result: GenerationResultTypeVar | Collection[GenerationResultTypeVar]) -> None:
    """Set the result of the generation work.

    Args:
        result (Any): The result of the generation work.
    """
    self._set_generation_work_result(result)

on_complete

on_complete() -> GENERATION_PROGRESS

Call when the generation is complete.

Source code in horde_sdk/worker/generations_base.py
def on_complete(self) -> GENERATION_PROGRESS:
    """Call when the generation is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.COMPLETE)

on_state

on_state(state: GENERATION_PROGRESS) -> GENERATION_PROGRESS

Call when the generation state is updated.

Parameters:

Source code in horde_sdk/worker/generations_base.py
def on_state(
    self,
    state: GENERATION_PROGRESS,
) -> GENERATION_PROGRESS:
    """Call when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The new state of the generation.
    """
    match state:
        case GENERATION_PROGRESS.PRELOADING:
            return self.on_preloading()
        case GENERATION_PROGRESS.PRELOADING_COMPLETE:
            return self.on_preloading_complete()
        case GENERATION_PROGRESS.GENERATING:
            return self.on_generating()
        case GENERATION_PROGRESS.POST_PROCESSING:
            return self.on_post_processing()
        case GENERATION_PROGRESS.PENDING_POST_PROCESSING:
            return self.on_post_processing_complete()
        case GENERATION_PROGRESS.PENDING_SAFETY_CHECK:
            return self.on_pending_safety_check()
        case GENERATION_PROGRESS.SAFETY_CHECKING:
            return self.on_safety_checking()
        case GENERATION_PROGRESS.PENDING_SUBMIT:
            return self.on_pending_submit()
        case GENERATION_PROGRESS.SUBMITTING:
            return self.on_submitting()
        case GENERATION_PROGRESS.SUBMIT_COMPLETE:
            return self.on_submit_complete()
        case GENERATION_PROGRESS.COMPLETE:
            return self.on_complete()
        case _:
            return self._set_generation_progress(state)

step

step(state: GENERATION_PROGRESS) -> GENERATION_PROGRESS

Call when the generation state is updated.

Parameters:

Source code in horde_sdk/worker/generations_base.py
def step(self, state: GENERATION_PROGRESS) -> GENERATION_PROGRESS:
    """Call when the generation state is updated.

    Args:
        state (GENERATION_PROGRESS): The new state of the generation.
    """
    return self.on_state(state)

is_safety_checking_done_on_all_batch

is_safety_checking_done_on_all_batch() -> bool

Check if the safety check is being done on a one-on-all basis.

Returns:

  • bool ( bool ) –

    True if the safety check is being done on a one-on-all basis, False otherwise.

Source code in horde_sdk/worker/generations_base.py
def is_safety_checking_done_on_all_batch(self) -> bool:
    """Check if the safety check is being done on a one-on-all basis.

    Returns:
        bool: True if the safety check is being done on a one-on-all basis, False otherwise.
    """
    all_batch_result_complete = False
    with self._lock:
        if self._safety_results is not None:
            all_batch_result_complete = all(result is not None for result in self._safety_results)

    return all_batch_result_complete

get_safety_check_results

get_safety_check_results() -> list[SafetyResult | None]

Get the results of the safety checks.

Returns:

  • list[SafetyResult | None]

    list[SafetyResult | None]: The results of the safety checks for each batch.

Source code in horde_sdk/worker/generations_base.py
def get_safety_check_results(self) -> list[SafetyResult | None]:
    """Get the results of the safety checks.

    Returns:
        list[SafetyResult | None]: The results of the safety checks for each batch.
    """
    with self._lock:
        return self._safety_results.copy() if self._safety_results is not None else None

on_safety_checking

on_safety_checking() -> GENERATION_PROGRESS

Call when the safety check is about to start.

Source code in horde_sdk/worker/generations_base.py
def on_safety_checking(self) -> GENERATION_PROGRESS:
    """Call when the safety check is about to start."""
    return self._set_generation_progress(GENERATION_PROGRESS.SAFETY_CHECKING)

on_safety_check_complete

on_safety_check_complete(
    batch_index: int, safety_result: SafetyResult
) -> GENERATION_PROGRESS

Call when the safety check is complete.

Parameters:

  • batch_index (int) –

    The index of the batch to set the safety check result for. This is 0-indexed and must match the position of the result_ids provided during initialization.

  • safety_result (SafetyResult) –

    The result of the safety check.

Source code in horde_sdk/worker/generations_base.py
def on_safety_check_complete(self, batch_index: int, safety_result: SafetyResult) -> GENERATION_PROGRESS:
    """Call when the safety check is complete.

    Args:
        batch_index (int): The index of the batch to set the safety check result for.
            This is 0-indexed and must match the position of the result_ids provided during initialization.
        safety_result (SafetyResult): The result of the safety check.
    """
    self._set_safety_check_result(
        batch_index=batch_index,
        safety_result=safety_result,
    )

    if not self.is_safety_checking_done_on_all_batch():
        return GENERATION_PROGRESS.SAFETY_CHECKING

    if self._requires_submit:
        return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SUBMIT)

    return self._set_generation_progress(GENERATION_PROGRESS.COMPLETE)

on_pending_submit

on_pending_submit() -> GENERATION_PROGRESS

Call when the generation is pending submission.

Source code in horde_sdk/worker/generations_base.py
def on_pending_submit(self) -> GENERATION_PROGRESS:
    """Call when the generation is pending submission."""
    return self._set_generation_progress(GENERATION_PROGRESS.PENDING_SUBMIT)

on_submitting

on_submitting() -> GENERATION_PROGRESS

Call when an attempt is going to be made to submit the generation.

Source code in horde_sdk/worker/generations_base.py
def on_submitting(self) -> GENERATION_PROGRESS:
    """Call when an attempt is going to be made to submit the generation."""
    return self._set_generation_progress(GENERATION_PROGRESS.SUBMITTING)

on_submit_complete

on_submit_complete() -> GENERATION_PROGRESS

Call when the generation has been successfully submitted.

Source code in horde_sdk/worker/generations_base.py
def on_submit_complete(self) -> GENERATION_PROGRESS:
    """Call when the generation has been successfully submitted."""
    return self._set_generation_progress(GENERATION_PROGRESS.SUBMIT_COMPLETE)

on_user_requested_abort

on_user_requested_abort() -> GENERATION_PROGRESS

Call when the user requests to abort the generation.

Source code in horde_sdk/worker/generations_base.py
def on_user_requested_abort(self) -> GENERATION_PROGRESS:
    """Call when the user requests to abort the generation."""
    return self._set_generation_progress(GENERATION_PROGRESS.USER_REQUESTED_ABORT)

on_user_abort_complete

on_user_abort_complete() -> GENERATION_PROGRESS

Call when the user requested abort is complete.

Source code in horde_sdk/worker/generations_base.py
def on_user_abort_complete(self) -> GENERATION_PROGRESS:
    """Call when the user requested abort is complete."""
    return self._set_generation_progress(GENERATION_PROGRESS.USER_ABORT_COMPLETE)