Input Join PPS

Spec

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "input": {
    "join": [
      {
        "pfs": {
          "project": string,
          "name": string,
          "repo": string,
          "branch": string,
          "glob": string,
          "joinOn": string,
          "outerJoin": bool,
          "lazy": bool,
          "emptyFiles": bool,
          "s3": bool
        }
      },
      {
        "pfs": {
          "project": string,
          "name": string,
          "repo": string,
          "branch": string,
          "glob": string,
          "joinOn": string,
          "outerJoin": bool,
          "lazy": bool,
          "emptyFiles": bool,
          "s3": bool
        }
      }
    ]
  },
  ...
}

Behavior

  • A join input must have the glob and joinOn parameters configured to work properly. A join can combine multiple PFS inputs.
  • You can optionally add "outerJoin": true to your PFS input. In that case, you will alter the join’s behavior from a default “inner-join” (creates a datum if there is a match only) to a “outer-join” (the repos marked as "outerJoin": true will see a datum even if there is no match).
  • You can set 0 to many PFS input to "outerJoin": true within your join.

Capture Groups

When you configure a join input (inner or outer), you must specify a glob pattern that includes a capture group. The capture group defines the specific string in the file path that is used to match files in other joined repos. Capture groups work analogously to the regex capture group. You define the capture group inside parenthesis. Capture groups are numbered from left to right and can also be nested within each other. Numbering for nested capture groups is based on their opening parenthesis.

Below you can find a few examples of applying a glob pattern with a capture group to a file path. For example, if you have the following file path:

/foo/bar-123/ABC.txt

The following glob patterns in a joint input create the following capture groups:

Regular expression Capture groups
/(*) foo
/*/bar-(*) 123
/(*)/*/(??)*.txt Capture group 1: foo, capture group 2: AB.
/*/(bar-(123))/* Capture group 1: bar-123, capture group 2: 123.

Also, joins require you to specify a replacement group in the joinOn parameter to define which capture groups you want to tryto match.

For example, $1 indicates that you want Pachyderm to match based on capture group 1. Similarly, $2 matches the capture group 2. $1$2 means that it must match both capture groups 1 and 2.

See the full join input configuration in the pipeline specification.

You can test your glob pattern and capture groups by using the pachctl list datum -f <your_pipeline_spec.json> command.

Tip
The content of the capture group defined in the joinOn parameter is available to your pipeline’s code in an environment variable: PACH_DATUM_<input.name>_JOIN_ON.

Examples

Inner Join

Per default, a join input has an inner-join behavior.

For example, you have two repositories. One with sensor readings and the other with parameters. The repositories have the following structures:

  • readings repo:

    ├── ID1234
        ├── file1.txt
        ├── file2.txt
        ├── file3.txt
        ├── file4.txt
        ├── file5.txt
  • parameters repo:

    ├── file1.txt
    ├── file2.txt
    ├── file3.txt
    ├── file4.txt
    ├── file5.txt
    ├── file6.txt
    ├── file7.txt
    ├── file8.txt

Pachyderm runs your code only on the pairs of files that match the glob pattern and capture groups.

The following example shows how you can use joins to group matching IDs:

 {
   "pipeline": {
     "name": "joins"
   },
   "input": {
     "join": [
       {
         "pfs": {
           "repo": "readings",
           "branch": "master",
           "glob": "/*/(*).txt",
           "joinOn": "$1"
         }
       },
      {
        "pfs": {
          "repo": "parameters",
          "branch": "master",
          "glob": "/(*).txt",
          "joinOn": "$1"
        }
      }
    ]
  },
  "transform": {
     "cmd": [ "python3", "/joins.py"],
     "image": "joins-example"
   }
 }

The glob pattern for the readings repository, /*/(*).txt, indicates all matching files in the ID sub-directory. In the parameters repository, the glob pattern /(*).txt selects all the matching files in the root directory. All files with indices from 1 to 5 match. The files with indices from 6 to 8 do not match. Therefore, you only get five datums for this job.

To experiment further, see the full joins example.

Outer Join

Pachyderm also supports outer joins. Outer joins include everything an inner join does plus the files that didn’t match anything. Inputs can be set to outer semantics independently. So while there isn’t an explicit notion of “left” or “right” outer joins, you can still get those semantics, and even extend them to multiway joins.

Building off the previous example, notice that there are 3 files in the parameters repo, file6.txt, file7.txt and file8.txt, which don’t match any files in the readings repo. In an inner join, those files are omitted. If you still want to see the files without a match, you can use an outer join. The change to the pipeline spec is simple:

 {
   "pipeline": {
     "name": "joins"
   },
   "input": {
     "join": [
       {
         "pfs": {
           "repo": "readings",
           "branch": "master",
           "glob": "/*/(*).txt",
           "joinOn": "$1"
         }
       },
      {
        "pfs": {
          "repo": "parameters",
          "branch": "master",
          "glob": "/(*).txt",
          "joinOn": "$1",
          "outerJoin": true
        }
      }
    ]
  },
  "transform": {
     "cmd": [ "python3", "/joins.py"],
     "image": "joins-example"
   }
 }

Your code will see the joined pairs that it saw before. In addition to those five datums, your code will also see three new ones: one for each of the files in parameters that didn’t match. Note that this means that your code needs to handle (not crash) the case where input files are missing from /pfs/readings.

To experiment further, see the full join example.