Datum Batching

By default, Pachyderm processes each datum independently. This means that your user code is called once for each datum. This can be inefficient and costly if you have a large number of small datums or if your user code is slow to start.

When you have a large number of datums, you can batch them to optimize performance. Pachyderm provides a next datum command that you can use to batch datums.

Flow Diagram

flowchart LR
    user_code(User Code)
    ndsuccess(NextDatum)
    nderror("NextDatum(error)")
    response(NextDatumResponse)
    process_datum{process datum}

    cmd_err(Run cmd_err)
    kill[Kill User Code]  
    datum?{datum exists?}
    retry?{retry?}
    cmd_err?{cmd_err defined}

    user_code ==>ndsuccess
    ndsuccess =====> datum?
    datum? ==>|yes| process_datum
    process_datum ==>|success| response
    response ==> user_code

    datum? -->|no| kill
    process_datum -->|fail| nderror
    nderror --> cmd_err?
    cmd_err? -->|yes| cmd_err
    cmd_err? -->|no|kill
    cmd_err --> retry?
    retry? -->|yes| response
    retry? -->|no| kill
  
  

How to Batch Datums

  1. Define your user code and build a docker image. Your user code must call pachctl next datum to get the next datum to process.

  2. Create a repo (e.g., pachctl create repo repoName).

  3. Define a pipeline spec in YAML or JSON that references your Docker image and repo.

  4. Add the following to the transform section of your pipeline spec:

    • datum_batching: true
    pipeline:
      name: p_datum_batching_example
    input:
      pfs:
        repo: repoName
        glob: "/*"
    transform:
      datum_batching: true
      image: user/docker-image:tag
  5. Create the pipeline (e.g., pachctl update pipeline -f pipeline.yaml).

  6. Monitor the pipeline’s state either via Console or via pachctl list pipeline.

Tip
You can view the printed confirmation of “Next datum called” in the logs your pipeline’s job.

FAQ

Q: My pipeline started but no files from my input repo are present. Where are they?

A: Files from the first datum are mounted following the first call to NextDatum or, when using the Python client, when code execution enters the decorated function.

Q: How can I set environment variables when the datum runs?

A: You can use the .env file accessible from the /pfs directory. To easily locate your .env file, you can do the following:

def find_files(pattern):
    return [f for f in glob.glob(os.path.join("/pfs", "**", pattern), recursive=True)]

env_file = find_files(".env")