Module pachyderm_sdk.api.worker.extension

Expand source code
from contextlib import contextmanager
from typing import ContextManager, Optional

from dotenv import load_dotenv

from ...constants import DOTENV_PATH_WORKER
from . import WorkerStub as _GeneratedWorkerStub


class WorkerStub(_GeneratedWorkerStub):
    __error: Optional[str] = None  # used by batch_datums

    @contextmanager
    def batch_datum(self) -> ContextManager:
        """A ContextManager that, when entered, calls the NextDatum
        endpoint within the worker to step forward during datum batching.
        This context manager will also prepare the environment for the user
        and report errors that occur back to the worker.

        This context manager expects to be called within an infinite while
        loop -- see the examples section. This context can only be entered
        from within a Pachyderm worker and the worker will terminate your
        code when all datums have been processed.

        Note: The API stub must have an open gRPC channel with the worker
        for NextDatum to function correctly. The ``Client`` object
        should automatically do this for the user.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>>
        >>> worker = Client().worker
        >>>
        >>> # Perform an expensive computation here before
        >>> #   you being processing your datums
        >>> #   i.e. initializing a model.
        >>>
        >>> while True:
        >>>     with worker.batch_datum():
        >>>         # process datums
        >>>         pass
        """
        self.next_datum(error=self.__error or "")
        load_dotenv(DOTENV_PATH_WORKER, override=True)

        self.__error = None
        try:
            yield
        except Exception as error:
            self.__error = repr(error)
            print(f"{self.__error}\nReporting above error to worker.")

Classes

class WorkerStub (channel: grpc.Channel)
Expand source code
class WorkerStub(_GeneratedWorkerStub):
    __error: Optional[str] = None  # used by batch_datums

    @contextmanager
    def batch_datum(self) -> ContextManager:
        """A ContextManager that, when entered, calls the NextDatum
        endpoint within the worker to step forward during datum batching.
        This context manager will also prepare the environment for the user
        and report errors that occur back to the worker.

        This context manager expects to be called within an infinite while
        loop -- see the examples section. This context can only be entered
        from within a Pachyderm worker and the worker will terminate your
        code when all datums have been processed.

        Note: The API stub must have an open gRPC channel with the worker
        for NextDatum to function correctly. The ``Client`` object
        should automatically do this for the user.

        Examples
        --------
        >>> from pachyderm_sdk import Client
        >>>
        >>> worker = Client().worker
        >>>
        >>> # Perform an expensive computation here before
        >>> #   you being processing your datums
        >>> #   i.e. initializing a model.
        >>>
        >>> while True:
        >>>     with worker.batch_datum():
        >>>         # process datums
        >>>         pass
        """
        self.next_datum(error=self.__error or "")
        load_dotenv(DOTENV_PATH_WORKER, override=True)

        self.__error = None
        try:
            yield
        except Exception as error:
            self.__error = repr(error)
            print(f"{self.__error}\nReporting above error to worker.")

Ancestors

Methods

def batch_datum(self) ‑> ContextManager

A ContextManager that, when entered, calls the NextDatum endpoint within the worker to step forward during datum batching. This context manager will also prepare the environment for the user and report errors that occur back to the worker.

This context manager expects to be called within an infinite while loop – see the examples section. This context can only be entered from within a Pachyderm worker and the worker will terminate your code when all datums have been processed.

Note: The API stub must have an open gRPC channel with the worker for NextDatum to function correctly. The Client object should automatically do this for the user.

Examples

>>> from pachyderm_sdk import Client
>>>
>>> worker = Client().worker
>>>
>>> # Perform an expensive computation here before
>>> #   you being processing your datums
>>> #   i.e. initializing a model.
>>>
>>> while True:
>>>     with worker.batch_datum():
>>>         # process datums
>>>         pass
Expand source code
@contextmanager
def batch_datum(self) -> ContextManager:
    """A ContextManager that, when entered, calls the NextDatum
    endpoint within the worker to step forward during datum batching.
    This context manager will also prepare the environment for the user
    and report errors that occur back to the worker.

    This context manager expects to be called within an infinite while
    loop -- see the examples section. This context can only be entered
    from within a Pachyderm worker and the worker will terminate your
    code when all datums have been processed.

    Note: The API stub must have an open gRPC channel with the worker
    for NextDatum to function correctly. The ``Client`` object
    should automatically do this for the user.

    Examples
    --------
    >>> from pachyderm_sdk import Client
    >>>
    >>> worker = Client().worker
    >>>
    >>> # Perform an expensive computation here before
    >>> #   you being processing your datums
    >>> #   i.e. initializing a model.
    >>>
    >>> while True:
    >>>     with worker.batch_datum():
    >>>         # process datums
    >>>         pass
    """
    self.next_datum(error=self.__error or "")
    load_dotenv(DOTENV_PATH_WORKER, override=True)

    self.__error = None
    try:
        yield
    except Exception as error:
        self.__error = repr(error)
        print(f"{self.__error}\nReporting above error to worker.")