Module pachyderm_sdk.client
The Client used to interact with a Pachyderm instance.
Expand source code
"""The Client used to interact with a Pachyderm instance."""
import os
from pathlib import Path
from typing import Optional, Union
from urllib.parse import urlparse
import grpc
from .api.admin.extension import ApiStub as _AdminStub
from .api.auth import ApiStub as _AuthStub
from .api.debug.extension import ApiStub as _DebugStub
from .api.enterprise import ApiStub as _EnterpriseStub
from .api.identity import ApiStub as _IdentityStub
from .api.license import ApiStub as _LicenseStub
from .api.metadata import ApiStub as _MetadataApiStub
from .api.pfs.extension import ApiStub as _PfsStub
from .api.pps.extension import ApiStub as _PpsStub
from .api.storage.extension import ApiStub as _StorageStub
from .api.transaction.extension import ApiStub as _TransactionStub
from .api.version import ApiStub as _VersionStub, Version
from .api.worker.extension import WorkerStub as _WorkerStub
from .config import ConfigFile
from .constants import (
AUTH_TOKEN_ENV,
CONFIG_PATH_SPOUT,
DEFAULT_CONFIG,
DEFAULT_HOST,
DEFAULT_PORT,
GRPC_CHANNEL_OPTIONS,
OIDC_TOKEN_ENV,
PACHD_SERVICE_HOST_ENV,
PACHD_SERVICE_PORT_ENV,
WORKER_PORT_ENV,
)
from .interceptor import MetadataClientInterceptor, MetadataType
__all__ = ("Client",)
class Client:
"""The Client used to interact with a Pachyderm instance.
Examples
--------
Connect to a pachyderm instance using your local config file:
>>> from pachyderm_sdk import Client
>>> client = Client.from_config()
Connect to a pachyderm instance using a URL/address:
>>> from pachyderm_sdk import Client
>>> client = Client.from_pachd_address("test.work.com:30080")
"""
def __init__(
self,
host: str = DEFAULT_HOST,
port: int = DEFAULT_PORT,
auth_token: Optional[str] = None,
root_certs: Optional[bytes] = None,
transaction_id: str = None,
tls: bool = False,
):
"""
Creates a Pachyderm client.
Parameters
----------
host : str, optional
The pachd host. Default is 'localhost', which is used with
``pachctl port-forward``.
port : int, optional
The port to connect to. Default is 30650.
auth_token : str, optional
The authentication token. Used if authentication is enabled on the
cluster.
root_certs : bytes, optional
The PEM-encoded root certificates as byte string.
transaction_id : str, optional
The ID of the transaction to run operations on.
tls : bool
Whether TLS should be used. If `root_certs` are specified, they are
used. Otherwise, we use the certs provided by certifi.
"""
host = host or DEFAULT_HOST
port = port or DEFAULT_PORT
if auth_token is None:
auth_token = os.environ.get(AUTH_TOKEN_ENV)
tls = tls or (root_certs is not None)
if tls and root_certs is None:
# load default certs if none are specified
import certifi
with open(certifi.where(), "rb") as f:
root_certs = f.read()
self.address = "{}:{}".format(host, port)
self.root_certs = root_certs
channel = _create_channel(
self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
)
self._auth_token = auth_token
self._transaction_id = transaction_id
self._grpc_metadata = self._build_metadata()
self._channel = _apply_metadata_interceptor(channel, self._grpc_metadata)
# See implementation for api layout.
self._init_api()
# Worker stub is loaded when accessed through the worker property.
self._worker = None
if not auth_token and (oidc_token := os.environ.get(OIDC_TOKEN_ENV)):
self.auth_token = self.auth.authenticate(id_token=oidc_token)
def _init_api(self):
self.admin = _AdminStub(self._channel)
self.auth = _AuthStub(self._channel)
self.debug = _DebugStub(self._channel)
self.enterprise = _EnterpriseStub(self._channel)
self.identity = _IdentityStub(self._channel)
self.license = _LicenseStub(self._channel)
self.metadata = _MetadataApiStub(self._channel)
self.pfs = _PfsStub(
self._channel,
get_transaction_id=lambda: self.transaction_id,
)
self.pps = _PpsStub(self._channel)
self.storage = _StorageStub(self._channel)
self.transaction = _TransactionStub(
self._channel,
get_transaction_id=lambda: self.transaction_id,
set_transaction_id=lambda value: setattr(self, "transaction_id", value),
)
self._version_api = _VersionStub(self._channel)
self._worker: Optional[_WorkerStub]
@classmethod
def new_in_cluster(
cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None
) -> "Client":
"""Creates a Pachyderm client that operates within a Pachyderm cluster.
Parameters
----------
auth_token : str, optional
The authentication token. Used if authentication is enabled on the
cluster.
transaction_id : str, optional
The ID of the transaction to run operations on.
Returns
-------
Client
A python_pachyderm client instance.
"""
if CONFIG_PATH_SPOUT.exists():
# TODO: Should we notify the user that we are using spout config?
return cls.from_config(CONFIG_PATH_SPOUT)
host = os.environ.get(PACHD_SERVICE_HOST_ENV)
if host is None:
raise RuntimeError(
f"Environment variable {PACHD_SERVICE_HOST_ENV} not set "
f"-- cannot connect. Are you running in a cluster?"
)
port = os.environ.get(PACHD_SERVICE_PORT_ENV)
if port is None:
raise RuntimeError(
f"Environment variable {PACHD_SERVICE_PORT_ENV} not set "
f"-- cannot connect. Are you running in a cluster?"
)
return cls(
host=host,
port=int(port),
auth_token=auth_token,
transaction_id=transaction_id,
)
@classmethod
def from_pachd_address(
cls,
pachd_address: str,
auth_token: str = None,
root_certs: bytes = None,
transaction_id: str = None,
) -> "Client":
"""Creates a Pachyderm client from a given pachd address.
Parameters
----------
pachd_address : str
The address of pachd server
auth_token : str, optional
The authentication token. Used if authentication is enabled on the
cluster.
root_certs : bytes, optional
The PEM-encoded root certificates as byte string. If unspecified,
this will load default certs from certifi.
transaction_id : str, optional
The ID of the transaction to run operations on.
Returns
-------
Client
A python_pachyderm client instance.
"""
if "://" not in pachd_address:
pachd_address = "grpc://{}".format(pachd_address)
u = urlparse(pachd_address)
if u.scheme not in ("grpc", "http", "grpcs", "https"):
raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme))
if u.path or u.params or u.query or u.fragment or u.username or u.password:
raise ValueError("invalid pachd address")
return cls(
host=u.hostname,
port=u.port,
auth_token=auth_token,
root_certs=root_certs,
transaction_id=transaction_id,
tls=u.scheme == "grpcs" or u.scheme == "https",
)
@classmethod
def from_config(cls, config_file: Union[Path, str] = DEFAULT_CONFIG) -> "Client":
"""Creates a Pachyderm client from a config file.
Parameters
----------
config_file : Union[Path, str]
The path to a config json file.
config_file defaults to the local config.
Returns
-------
Client
A properly configured Client.
"""
config = ConfigFile.from_path(config_file)
active_context = config.active_context
client = cls.from_pachd_address(
active_context.active_pachd_address,
auth_token=active_context.session_token,
root_certs=active_context.server_cas_decoded,
transaction_id=active_context.active_transaction,
)
return client
@property
def auth_token(self):
"""The authentication token. Used if authentication is enabled on the cluster."""
return self._auth_token
@auth_token.setter
def auth_token(self, value):
self._auth_token = value
self._grpc_metadata = self._build_metadata()
self._channel = _apply_metadata_interceptor(
channel=_create_channel(
self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
),
metadata=self._grpc_metadata,
)
self._init_api()
@property
def transaction_id(self):
"""The ID of the transaction to run operations on."""
return self._transaction_id
@transaction_id.setter
def transaction_id(self, value):
self._transaction_id = value
self._grpc_metadata = self._build_metadata()
self._channel = _apply_metadata_interceptor(
channel=_create_channel(
self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
),
metadata=self._grpc_metadata,
)
self._init_api()
@property
def worker(self) -> _WorkerStub:
"""Access the worker API stub.
This is dynamically loaded in order to provide a helpful error message
to the user if they try to interact the worker API from outside a worker.
"""
if self._worker is None:
port = os.environ.get(WORKER_PORT_ENV)
if port is None:
raise ConnectionError(
f"Cannot connect to the worker since {WORKER_PORT_ENV} is not set. "
"Are you running inside a pipeline?"
)
# Note: This channel does not go through the metadata interceptor.
channel = _create_channel(
address=f"localhost:{port}", root_certs=None, options=GRPC_CHANNEL_OPTIONS
)
self._worker = _WorkerStub(channel)
return self._worker
def _build_metadata(self):
metadata = []
if self._auth_token is not None:
metadata.append(("authn-token", self._auth_token))
if self._transaction_id is not None:
metadata.append(("pach-transaction", self._transaction_id))
return metadata
def get_version(self) -> Version:
"""Requests version information from the pachd cluster."""
return self._version_api.get_version()
def _apply_metadata_interceptor(
channel: grpc.Channel, metadata: MetadataType
) -> grpc.Channel:
metadata_interceptor = MetadataClientInterceptor(metadata)
return grpc.intercept_channel(channel, metadata_interceptor)
def _create_channel(
address: str,
root_certs: Optional[bytes],
options: MetadataType,
) -> grpc.Channel:
if root_certs is not None:
ssl = grpc.ssl_channel_credentials(root_certificates=root_certs)
return grpc.secure_channel(address, ssl, options=options)
return grpc.insecure_channel(address, options=options)
Classes
class Client (host: str = 'localhost', port: int = 30650, auth_token: Optional[str] = None, root_certs: Optional[bytes] = None, transaction_id: str = None, tls: bool = False)
-
The Client used to interact with a Pachyderm instance.
Examples
Connect to a pachyderm instance using your local config file:
>>> from pachyderm_sdk import Client >>> client = Client.from_config()
Connect to a pachyderm instance using a URL/address:
>>> from pachyderm_sdk import Client >>> client = Client.from_pachd_address("test.work.com:30080")
Creates a Pachyderm client.
Parameters
host
:str
, optional- The pachd host. Default is 'localhost', which is used with
pachctl port-forward
. port
:int
, optional- The port to connect to. Default is 30650.
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
root_certs
:bytes
, optional- The PEM-encoded root certificates as byte string.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
tls
:bool
- Whether TLS should be used. If
root_certs
are specified, they are used. Otherwise, we use the certs provided by certifi.
Expand source code
class Client: """The Client used to interact with a Pachyderm instance. Examples -------- Connect to a pachyderm instance using your local config file: >>> from pachyderm_sdk import Client >>> client = Client.from_config() Connect to a pachyderm instance using a URL/address: >>> from pachyderm_sdk import Client >>> client = Client.from_pachd_address("test.work.com:30080") """ def __init__( self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT, auth_token: Optional[str] = None, root_certs: Optional[bytes] = None, transaction_id: str = None, tls: bool = False, ): """ Creates a Pachyderm client. Parameters ---------- host : str, optional The pachd host. Default is 'localhost', which is used with ``pachctl port-forward``. port : int, optional The port to connect to. Default is 30650. auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. transaction_id : str, optional The ID of the transaction to run operations on. tls : bool Whether TLS should be used. If `root_certs` are specified, they are used. Otherwise, we use the certs provided by certifi. """ host = host or DEFAULT_HOST port = port or DEFAULT_PORT if auth_token is None: auth_token = os.environ.get(AUTH_TOKEN_ENV) tls = tls or (root_certs is not None) if tls and root_certs is None: # load default certs if none are specified import certifi with open(certifi.where(), "rb") as f: root_certs = f.read() self.address = "{}:{}".format(host, port) self.root_certs = root_certs channel = _create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ) self._auth_token = auth_token self._transaction_id = transaction_id self._grpc_metadata = self._build_metadata() self._channel = _apply_metadata_interceptor(channel, self._grpc_metadata) # See implementation for api layout. self._init_api() # Worker stub is loaded when accessed through the worker property. self._worker = None if not auth_token and (oidc_token := os.environ.get(OIDC_TOKEN_ENV)): self.auth_token = self.auth.authenticate(id_token=oidc_token) def _init_api(self): self.admin = _AdminStub(self._channel) self.auth = _AuthStub(self._channel) self.debug = _DebugStub(self._channel) self.enterprise = _EnterpriseStub(self._channel) self.identity = _IdentityStub(self._channel) self.license = _LicenseStub(self._channel) self.metadata = _MetadataApiStub(self._channel) self.pfs = _PfsStub( self._channel, get_transaction_id=lambda: self.transaction_id, ) self.pps = _PpsStub(self._channel) self.storage = _StorageStub(self._channel) self.transaction = _TransactionStub( self._channel, get_transaction_id=lambda: self.transaction_id, set_transaction_id=lambda value: setattr(self, "transaction_id", value), ) self._version_api = _VersionStub(self._channel) self._worker: Optional[_WorkerStub] @classmethod def new_in_cluster( cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None ) -> "Client": """Creates a Pachyderm client that operates within a Pachyderm cluster. Parameters ---------- auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if CONFIG_PATH_SPOUT.exists(): # TODO: Should we notify the user that we are using spout config? return cls.from_config(CONFIG_PATH_SPOUT) host = os.environ.get(PACHD_SERVICE_HOST_ENV) if host is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_HOST_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) port = os.environ.get(PACHD_SERVICE_PORT_ENV) if port is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_PORT_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) return cls( host=host, port=int(port), auth_token=auth_token, transaction_id=transaction_id, ) @classmethod def from_pachd_address( cls, pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, ) -> "Client": """Creates a Pachyderm client from a given pachd address. Parameters ---------- pachd_address : str The address of pachd server auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if "://" not in pachd_address: pachd_address = "grpc://{}".format(pachd_address) u = urlparse(pachd_address) if u.scheme not in ("grpc", "http", "grpcs", "https"): raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme)) if u.path or u.params or u.query or u.fragment or u.username or u.password: raise ValueError("invalid pachd address") return cls( host=u.hostname, port=u.port, auth_token=auth_token, root_certs=root_certs, transaction_id=transaction_id, tls=u.scheme == "grpcs" or u.scheme == "https", ) @classmethod def from_config(cls, config_file: Union[Path, str] = DEFAULT_CONFIG) -> "Client": """Creates a Pachyderm client from a config file. Parameters ---------- config_file : Union[Path, str] The path to a config json file. config_file defaults to the local config. Returns ------- Client A properly configured Client. """ config = ConfigFile.from_path(config_file) active_context = config.active_context client = cls.from_pachd_address( active_context.active_pachd_address, auth_token=active_context.session_token, root_certs=active_context.server_cas_decoded, transaction_id=active_context.active_transaction, ) return client @property def auth_token(self): """The authentication token. Used if authentication is enabled on the cluster.""" return self._auth_token @auth_token.setter def auth_token(self, value): self._auth_token = value self._grpc_metadata = self._build_metadata() self._channel = _apply_metadata_interceptor( channel=_create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ), metadata=self._grpc_metadata, ) self._init_api() @property def transaction_id(self): """The ID of the transaction to run operations on.""" return self._transaction_id @transaction_id.setter def transaction_id(self, value): self._transaction_id = value self._grpc_metadata = self._build_metadata() self._channel = _apply_metadata_interceptor( channel=_create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ), metadata=self._grpc_metadata, ) self._init_api() @property def worker(self) -> _WorkerStub: """Access the worker API stub. This is dynamically loaded in order to provide a helpful error message to the user if they try to interact the worker API from outside a worker. """ if self._worker is None: port = os.environ.get(WORKER_PORT_ENV) if port is None: raise ConnectionError( f"Cannot connect to the worker since {WORKER_PORT_ENV} is not set. " "Are you running inside a pipeline?" ) # Note: This channel does not go through the metadata interceptor. channel = _create_channel( address=f"localhost:{port}", root_certs=None, options=GRPC_CHANNEL_OPTIONS ) self._worker = _WorkerStub(channel) return self._worker def _build_metadata(self): metadata = [] if self._auth_token is not None: metadata.append(("authn-token", self._auth_token)) if self._transaction_id is not None: metadata.append(("pach-transaction", self._transaction_id)) return metadata def get_version(self) -> Version: """Requests version information from the pachd cluster.""" return self._version_api.get_version()
Static methods
def new_in_cluster(auth_token: Optional[str] = None, transaction_id: Optional[str] = None) ‑> Client
-
Creates a Pachyderm client that operates within a Pachyderm cluster.
Parameters
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
Returns
Client
- A python_pachyderm client instance.
Expand source code
@classmethod def new_in_cluster( cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None ) -> "Client": """Creates a Pachyderm client that operates within a Pachyderm cluster. Parameters ---------- auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if CONFIG_PATH_SPOUT.exists(): # TODO: Should we notify the user that we are using spout config? return cls.from_config(CONFIG_PATH_SPOUT) host = os.environ.get(PACHD_SERVICE_HOST_ENV) if host is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_HOST_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) port = os.environ.get(PACHD_SERVICE_PORT_ENV) if port is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_PORT_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) return cls( host=host, port=int(port), auth_token=auth_token, transaction_id=transaction_id, )
def from_pachd_address(pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None) ‑> Client
-
Creates a Pachyderm client from a given pachd address.
Parameters
pachd_address
:str
- The address of pachd server
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
root_certs
:bytes
, optional- The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
Returns
Client
- A python_pachyderm client instance.
Expand source code
@classmethod def from_pachd_address( cls, pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, ) -> "Client": """Creates a Pachyderm client from a given pachd address. Parameters ---------- pachd_address : str The address of pachd server auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if "://" not in pachd_address: pachd_address = "grpc://{}".format(pachd_address) u = urlparse(pachd_address) if u.scheme not in ("grpc", "http", "grpcs", "https"): raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme)) if u.path or u.params or u.query or u.fragment or u.username or u.password: raise ValueError("invalid pachd address") return cls( host=u.hostname, port=u.port, auth_token=auth_token, root_certs=root_certs, transaction_id=transaction_id, tls=u.scheme == "grpcs" or u.scheme == "https", )
def from_config(config_file: Union[pathlib.Path, str] = PosixPath('~/.pachyderm/config.json')) ‑> Client
-
Creates a Pachyderm client from a config file.
Parameters
config_file
:Union[Path, str]
- The path to a config json file. config_file defaults to the local config.
Returns
Client
- A properly configured Client.
Expand source code
@classmethod def from_config(cls, config_file: Union[Path, str] = DEFAULT_CONFIG) -> "Client": """Creates a Pachyderm client from a config file. Parameters ---------- config_file : Union[Path, str] The path to a config json file. config_file defaults to the local config. Returns ------- Client A properly configured Client. """ config = ConfigFile.from_path(config_file) active_context = config.active_context client = cls.from_pachd_address( active_context.active_pachd_address, auth_token=active_context.session_token, root_certs=active_context.server_cas_decoded, transaction_id=active_context.active_transaction, ) return client
Instance variables
var auth_token
-
The authentication token. Used if authentication is enabled on the cluster.
Expand source code
@property def auth_token(self): """The authentication token. Used if authentication is enabled on the cluster.""" return self._auth_token
var transaction_id
-
The ID of the transaction to run operations on.
Expand source code
@property def transaction_id(self): """The ID of the transaction to run operations on.""" return self._transaction_id
var worker : WorkerStub
-
Access the worker API stub.
This is dynamically loaded in order to provide a helpful error message to the user if they try to interact the worker API from outside a worker.
Expand source code
@property def worker(self) -> _WorkerStub: """Access the worker API stub. This is dynamically loaded in order to provide a helpful error message to the user if they try to interact the worker API from outside a worker. """ if self._worker is None: port = os.environ.get(WORKER_PORT_ENV) if port is None: raise ConnectionError( f"Cannot connect to the worker since {WORKER_PORT_ENV} is not set. " "Are you running inside a pipeline?" ) # Note: This channel does not go through the metadata interceptor. channel = _create_channel( address=f"localhost:{port}", root_certs=None, options=GRPC_CHANNEL_OPTIONS ) self._worker = _WorkerStub(channel) return self._worker
Methods
def get_version(self) ‑> Version
-
Requests version information from the pachd cluster.
Expand source code
def get_version(self) -> Version: """Requests version information from the pachd cluster.""" return self._version_api.get_version()