Datum Batching
By default, Pachyderm processes each datum independently. This means that your user code is called once for each datum. This can be inefficient and costly if you have a large number of small datums or if your user code is slow to start.
When you have a large number of datums, you can batch them to optimize performance. Pachyderm provides a next datum
command that you can use to batch datums.
Flow Diagram #
flowchart LR user_code(User Code) ndsuccess(NextDatum) nderror("NextDatum(error)") response(NextDatumResponse) process_datum{process datum} cmd_err(Run cmd_err) kill[Kill User Code] datum?{datum exists?} retry?{retry?} cmd_err?{cmd_err defined} user_code ==>ndsuccess ndsuccess =====> datum? datum? ==>|yes| process_datum process_datum ==>|success| response response ==> user_code datum? -->|no| kill process_datum -->|fail| nderror nderror --> cmd_err? cmd_err? -->|yes| cmd_err cmd_err? -->|no|kill cmd_err --> retry? retry? -->|yes| response retry? -->|no| kill
How to Batch Datums #
-
Define your user code and build a docker image. Your user code must call
pachctl next datum
to get the next datum to process. -
Create a repo (e.g.,
pachctl create repo repoName
). -
Define a pipeline spec in YAML or JSON that references your Docker image and repo.
-
Add the following to the
transform
section of your pipeline spec:datum_batching: true
pipeline: name: p_datum_batching_example input: pfs: repo: repoName glob: "/*" transform: datum_batching: true image: user/docker-image:tag
-
Create the pipeline (e.g.,
pachctl update pipeline -f pipeline.yaml
). -
Monitor the pipeline’s state either via Console or via
pachctl list pipeline
.
FAQ #
Q: My pipeline started but no files from my input repo are present. Where are they?
A: Files from the first datum are mounted following the first call to NextDatum
or, when using the Python client, when code execution enters the decorated function.
Q: How can I set environment variables when the datum runs?
A: You can use the .env
file accessible from the /pfs
directory. To easily locate your .env
file, you can do the following:
def find_files(pattern):
return [f for f in glob.glob(os.path.join("/pfs", "**", pattern), recursive=True)]
env_file = find_files(".env")