Package pachyderm_sdk

Expand source code
import importlib.metadata as metadata

__version__ = ""
try:
    __version__ = metadata.version(__name__)  # type: ignore
except (FileNotFoundError, ModuleNotFoundError):
    pass

from .api.pfs import _additions as __pfs_additions
from .client import Client
from .datum_batching import batch_all_datums

__all__ = [
    "Client",
    "batch_all_datums",
]

Sub-modules

pachyderm_sdk.api

Python bindings and API stubs for the Pachyderm public API.

pachyderm_sdk.client

The Client used to interact with a Pachyderm instance.

pachyderm_sdk.config

Functionality for parsing Pachyderm config files.

pachyderm_sdk.constants
pachyderm_sdk.datum_batching

A high-level decorator for pipeline code that uses the datum-batching feature.

pachyderm_sdk.errors

Errors that can be raised by this library.

pachyderm_sdk.interceptor

Implementation of a gRPC interceptor used to set request metadata and catch connection errors.

Functions

def batch_all_datums(user_code: Callable[..., None]) ‑> Callable[..., None]

A decorator that will repeatedly call the wrapped function until all datums have been processed. Before calling the wrapped function, this decorator will call the NextDatum endpoint within the worker and set any environment variables specified by the worker.

Any exceptions raised during the execution of the wrapped function will be reported back to the worker. See the pachyderm documentation for more information on how the datum batching feature works.

Note: This can only be used within a Pachyderm worker.

Examples

>>> from pachyderm_sdk import batch_all_datums
>>>
>>> @batch_all_datums
>>> def pipeline():
>>>     # process datums
>>>     pass
>>>
>>> if __name__ == '__main__':
>>>   # Perform an expensive computation here before
>>>   #   entering your datum processing function
>>>   #   i.e. initializing a model.
>>>   pipeline()

Check the following link for a more substatial example: github.com/pachyderm/examples/tree/master/object-detection

Expand source code
def batch_all_datums(user_code: PIPELINE_FUNC) -> PIPELINE_FUNC:
    """A decorator that will repeatedly call the wrapped function until
    all datums have been processed. Before calling the wrapped function,
    this decorator will call the NextDatum endpoint within the worker
    and set any environment variables specified by the worker.

    Any exceptions raised during the execution of the wrapped function
    will be reported back to the worker. See the pachyderm documentation
    for more information on how the datum batching feature works.

    Note: This can only be used within a Pachyderm worker.

    Examples
    --------
    >>> from pachyderm_sdk import batch_all_datums
    >>>
    >>> @batch_all_datums
    >>> def pipeline():
    >>>     # process datums
    >>>     pass
    >>>
    >>> if __name__ == '__main__':
    >>>   # Perform an expensive computation here before
    >>>   #   entering your datum processing function
    >>>   #   i.e. initializing a model.
    >>>   pipeline()

    Check the following link for a more substatial example:
        github.com/pachyderm/examples/tree/master/object-detection
    """

    @wraps(user_code)
    def wrapper(*args, **kwargs) -> None:
        worker = Client().worker
        while True:
            with worker.batch_datum():
                user_code(*args, **kwargs)

    return wrapper

Classes

class Client (host: str = 'localhost', port: int = 30650, auth_token: Optional[str] = None, root_certs: Optional[bytes] = None, transaction_id: str = None, tls: bool = False)

The Client used to interact with a Pachyderm instance.

Examples

Connect to a pachyderm instance using your local config file:

>>> from pachyderm_sdk import Client
>>> client = Client.from_config()

Connect to a pachyderm instance using a URL/address:

>>> from pachyderm_sdk import Client
>>> client = Client.from_pachd_address("test.work.com:30080")

Creates a Pachyderm client.

Parameters

host : str, optional
The pachd host. Default is 'localhost', which is used with pachctl port-forward.
port : int, optional
The port to connect to. Default is 30650.
auth_token : str, optional
The authentication token. Used if authentication is enabled on the cluster.
root_certs : bytes, optional
The PEM-encoded root certificates as byte string.
transaction_id : str, optional
The ID of the transaction to run operations on.
tls : bool
Whether TLS should be used. If root_certs are specified, they are used. Otherwise, we use the certs provided by certifi.
Expand source code
class Client:
    """The Client used to interact with a Pachyderm instance.

    Examples
    --------
    Connect to a pachyderm instance using your local config file:
    >>> from pachyderm_sdk import Client
    >>> client = Client.from_config()

    Connect to a pachyderm instance using a URL/address:
    >>> from pachyderm_sdk import Client
    >>> client = Client.from_pachd_address("test.work.com:30080")
    """

    def __init__(
        self,
        host: str = DEFAULT_HOST,
        port: int = DEFAULT_PORT,
        auth_token: Optional[str] = None,
        root_certs: Optional[bytes] = None,
        transaction_id: str = None,
        tls: bool = False,
    ):
        """
        Creates a Pachyderm client.

        Parameters
        ----------
        host : str, optional
            The pachd host. Default is 'localhost', which is used with
            ``pachctl port-forward``.
        port : int, optional
            The port to connect to. Default is 30650.
        auth_token : str, optional
            The authentication token. Used if authentication is enabled on the
            cluster.
        root_certs : bytes, optional
            The PEM-encoded root certificates as byte string.
        transaction_id : str, optional
            The ID of the transaction to run operations on.
        tls : bool
            Whether TLS should be used. If `root_certs` are specified, they are
            used. Otherwise, we use the certs provided by certifi.
        """
        host = host or DEFAULT_HOST
        port = port or DEFAULT_PORT
        if auth_token is None:
            auth_token = os.environ.get(AUTH_TOKEN_ENV)

        tls = tls or (root_certs is not None)
        if tls and root_certs is None:
            # load default certs if none are specified
            import certifi

            with open(certifi.where(), "rb") as f:
                root_certs = f.read()

        self.address = "{}:{}".format(host, port)
        self.root_certs = root_certs
        channel = _create_channel(
            self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
        )

        self._auth_token = auth_token
        self._transaction_id = transaction_id
        self._grpc_metadata = self._build_metadata()
        self._channel = _apply_metadata_interceptor(channel, self._grpc_metadata)

        # See implementation for api layout.
        self._init_api()
        # Worker stub is loaded when accessed through the worker property.
        self._worker = None

        if not auth_token and (oidc_token := os.environ.get(OIDC_TOKEN_ENV)):
            self.auth_token = self.auth.authenticate(id_token=oidc_token)

    def _init_api(self):
        self.admin = _AdminStub(self._channel)
        self.auth = _AuthStub(self._channel)
        self.debug = _DebugStub(self._channel)
        self.enterprise = _EnterpriseStub(self._channel)
        self.identity = _IdentityStub(self._channel)
        self.license = _LicenseStub(self._channel)
        self.metadata = _MetadataApiStub(self._channel)
        self.pfs = _PfsStub(
            self._channel,
            get_transaction_id=lambda: self.transaction_id,
        )
        self.pps = _PpsStub(self._channel)
        self.storage = _StorageStub(self._channel)
        self.transaction = _TransactionStub(
            self._channel,
            get_transaction_id=lambda: self.transaction_id,
            set_transaction_id=lambda value: setattr(self, "transaction_id", value),
        )
        self._version_api = _VersionStub(self._channel)
        self._worker: Optional[_WorkerStub]

    @classmethod
    def new_in_cluster(
        cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None
    ) -> "Client":
        """Creates a Pachyderm client that operates within a Pachyderm cluster.

        Parameters
        ----------
        auth_token : str, optional
            The authentication token. Used if authentication is enabled on the
            cluster.
        transaction_id : str, optional
            The ID of the transaction to run operations on.

        Returns
        -------
        Client
            A python_pachyderm client instance.
        """
        if CONFIG_PATH_SPOUT.exists():
            # TODO: Should we notify the user that we are using spout config?
            return cls.from_config(CONFIG_PATH_SPOUT)

        host = os.environ.get(PACHD_SERVICE_HOST_ENV)
        if host is None:
            raise RuntimeError(
                f"Environment variable {PACHD_SERVICE_HOST_ENV} not set "
                f"-- cannot connect. Are you running in a cluster?"
            )
        port = os.environ.get(PACHD_SERVICE_PORT_ENV)
        if port is None:
            raise RuntimeError(
                f"Environment variable {PACHD_SERVICE_PORT_ENV} not set "
                f"-- cannot connect. Are you running in a cluster?"
            )

        return cls(
            host=host,
            port=int(port),
            auth_token=auth_token,
            transaction_id=transaction_id,
        )

    @classmethod
    def from_pachd_address(
        cls,
        pachd_address: str,
        auth_token: str = None,
        root_certs: bytes = None,
        transaction_id: str = None,
    ) -> "Client":
        """Creates a Pachyderm client from a given pachd address.

        Parameters
        ----------
        pachd_address : str
            The address of pachd server
        auth_token : str, optional
            The authentication token. Used if authentication is enabled on the
            cluster.
        root_certs : bytes, optional
            The PEM-encoded root certificates as byte string. If unspecified,
            this will load default certs from certifi.
        transaction_id : str, optional
            The ID of the transaction to run operations on.

        Returns
        -------
        Client
            A python_pachyderm client instance.
        """
        if "://" not in pachd_address:
            pachd_address = "grpc://{}".format(pachd_address)

        u = urlparse(pachd_address)

        if u.scheme not in ("grpc", "http", "grpcs", "https"):
            raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme))
        if u.path or u.params or u.query or u.fragment or u.username or u.password:
            raise ValueError("invalid pachd address")

        return cls(
            host=u.hostname,
            port=u.port,
            auth_token=auth_token,
            root_certs=root_certs,
            transaction_id=transaction_id,
            tls=u.scheme == "grpcs" or u.scheme == "https",
        )

    @classmethod
    def from_config(cls, config_file: Union[Path, str] = DEFAULT_CONFIG) -> "Client":
        """Creates a Pachyderm client from a config file.

        Parameters
        ----------
        config_file : Union[Path, str]
            The path to a config json file.
            config_file defaults to the local config.

        Returns
        -------
        Client
            A properly configured Client.
        """
        config = ConfigFile.from_path(config_file)
        active_context = config.active_context
        client = cls.from_pachd_address(
            active_context.active_pachd_address,
            auth_token=active_context.session_token,
            root_certs=active_context.server_cas_decoded,
            transaction_id=active_context.active_transaction,
        )

        return client

    @property
    def auth_token(self):
        """The authentication token. Used if authentication is enabled on the cluster."""
        return self._auth_token

    @auth_token.setter
    def auth_token(self, value):
        self._auth_token = value
        self._grpc_metadata = self._build_metadata()
        self._channel = _apply_metadata_interceptor(
            channel=_create_channel(
                self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
            ),
            metadata=self._grpc_metadata,
        )
        self._init_api()

    @property
    def transaction_id(self):
        """The ID of the transaction to run operations on."""
        return self._transaction_id

    @transaction_id.setter
    def transaction_id(self, value):
        self._transaction_id = value
        self._grpc_metadata = self._build_metadata()
        self._channel = _apply_metadata_interceptor(
            channel=_create_channel(
                self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
            ),
            metadata=self._grpc_metadata,
        )
        self._init_api()

    @property
    def worker(self) -> _WorkerStub:
        """Access the worker API stub.

        This is dynamically loaded in order to provide a helpful error message
        to the user if they try to interact the worker API from outside a worker.
        """
        if self._worker is None:
            port = os.environ.get(WORKER_PORT_ENV)
            if port is None:
                raise ConnectionError(
                    f"Cannot connect to the worker since {WORKER_PORT_ENV} is not set. "
                    "Are you running inside a pipeline?"
                )
            # Note: This channel does not go through the metadata interceptor.
            channel = _create_channel(
                address=f"localhost:{port}", root_certs=None, options=GRPC_CHANNEL_OPTIONS
            )
            self._worker = _WorkerStub(channel)
        return self._worker

    def _build_metadata(self):
        metadata = []
        if self._auth_token is not None:
            metadata.append(("authn-token", self._auth_token))
        if self._transaction_id is not None:
            metadata.append(("pach-transaction", self._transaction_id))
        return metadata

    def get_version(self) -> Version:
        """Requests version information from the pachd cluster."""
        return self._version_api.get_version()

Static methods

def new_in_cluster(auth_token: Optional[str] = None, transaction_id: Optional[str] = None) ‑> Client

Creates a Pachyderm client that operates within a Pachyderm cluster.

Parameters

auth_token : str, optional
The authentication token. Used if authentication is enabled on the cluster.
transaction_id : str, optional
The ID of the transaction to run operations on.

Returns

Client
A python_pachyderm client instance.
Expand source code
@classmethod
def new_in_cluster(
    cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None
) -> "Client":
    """Creates a Pachyderm client that operates within a Pachyderm cluster.

    Parameters
    ----------
    auth_token : str, optional
        The authentication token. Used if authentication is enabled on the
        cluster.
    transaction_id : str, optional
        The ID of the transaction to run operations on.

    Returns
    -------
    Client
        A python_pachyderm client instance.
    """
    if CONFIG_PATH_SPOUT.exists():
        # TODO: Should we notify the user that we are using spout config?
        return cls.from_config(CONFIG_PATH_SPOUT)

    host = os.environ.get(PACHD_SERVICE_HOST_ENV)
    if host is None:
        raise RuntimeError(
            f"Environment variable {PACHD_SERVICE_HOST_ENV} not set "
            f"-- cannot connect. Are you running in a cluster?"
        )
    port = os.environ.get(PACHD_SERVICE_PORT_ENV)
    if port is None:
        raise RuntimeError(
            f"Environment variable {PACHD_SERVICE_PORT_ENV} not set "
            f"-- cannot connect. Are you running in a cluster?"
        )

    return cls(
        host=host,
        port=int(port),
        auth_token=auth_token,
        transaction_id=transaction_id,
    )
def from_pachd_address(pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None) ‑> Client

Creates a Pachyderm client from a given pachd address.

Parameters

pachd_address : str
The address of pachd server
auth_token : str, optional
The authentication token. Used if authentication is enabled on the cluster.
root_certs : bytes, optional
The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi.
transaction_id : str, optional
The ID of the transaction to run operations on.

Returns

Client
A python_pachyderm client instance.
Expand source code
@classmethod
def from_pachd_address(
    cls,
    pachd_address: str,
    auth_token: str = None,
    root_certs: bytes = None,
    transaction_id: str = None,
) -> "Client":
    """Creates a Pachyderm client from a given pachd address.

    Parameters
    ----------
    pachd_address : str
        The address of pachd server
    auth_token : str, optional
        The authentication token. Used if authentication is enabled on the
        cluster.
    root_certs : bytes, optional
        The PEM-encoded root certificates as byte string. If unspecified,
        this will load default certs from certifi.
    transaction_id : str, optional
        The ID of the transaction to run operations on.

    Returns
    -------
    Client
        A python_pachyderm client instance.
    """
    if "://" not in pachd_address:
        pachd_address = "grpc://{}".format(pachd_address)

    u = urlparse(pachd_address)

    if u.scheme not in ("grpc", "http", "grpcs", "https"):
        raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme))
    if u.path or u.params or u.query or u.fragment or u.username or u.password:
        raise ValueError("invalid pachd address")

    return cls(
        host=u.hostname,
        port=u.port,
        auth_token=auth_token,
        root_certs=root_certs,
        transaction_id=transaction_id,
        tls=u.scheme == "grpcs" or u.scheme == "https",
    )
def from_config(config_file: Union[pathlib.Path, str] = PosixPath('~/.pachyderm/config.json')) ‑> Client

Creates a Pachyderm client from a config file.

Parameters

config_file : Union[Path, str]
The path to a config json file. config_file defaults to the local config.

Returns

Client
A properly configured Client.
Expand source code
@classmethod
def from_config(cls, config_file: Union[Path, str] = DEFAULT_CONFIG) -> "Client":
    """Creates a Pachyderm client from a config file.

    Parameters
    ----------
    config_file : Union[Path, str]
        The path to a config json file.
        config_file defaults to the local config.

    Returns
    -------
    Client
        A properly configured Client.
    """
    config = ConfigFile.from_path(config_file)
    active_context = config.active_context
    client = cls.from_pachd_address(
        active_context.active_pachd_address,
        auth_token=active_context.session_token,
        root_certs=active_context.server_cas_decoded,
        transaction_id=active_context.active_transaction,
    )

    return client

Instance variables

var auth_token

The authentication token. Used if authentication is enabled on the cluster.

Expand source code
@property
def auth_token(self):
    """The authentication token. Used if authentication is enabled on the cluster."""
    return self._auth_token
var transaction_id

The ID of the transaction to run operations on.

Expand source code
@property
def transaction_id(self):
    """The ID of the transaction to run operations on."""
    return self._transaction_id
var workerWorkerStub

Access the worker API stub.

This is dynamically loaded in order to provide a helpful error message to the user if they try to interact the worker API from outside a worker.

Expand source code
@property
def worker(self) -> _WorkerStub:
    """Access the worker API stub.

    This is dynamically loaded in order to provide a helpful error message
    to the user if they try to interact the worker API from outside a worker.
    """
    if self._worker is None:
        port = os.environ.get(WORKER_PORT_ENV)
        if port is None:
            raise ConnectionError(
                f"Cannot connect to the worker since {WORKER_PORT_ENV} is not set. "
                "Are you running inside a pipeline?"
            )
        # Note: This channel does not go through the metadata interceptor.
        channel = _create_channel(
            address=f"localhost:{port}", root_certs=None, options=GRPC_CHANNEL_OPTIONS
        )
        self._worker = _WorkerStub(channel)
    return self._worker

Methods

def get_version(self) ‑> Version

Requests version information from the pachd cluster.

Expand source code
def get_version(self) -> Version:
    """Requests version information from the pachd cluster."""
    return self._version_api.get_version()