Skip to content

If you are using Pachyderm version 1.9.7 or earlier, go to the documentation archive.

Spout

A spout is a type of pipeline that ingests streaming data. Generally, you use spouts for situations when the interval between new data generation is large or sporadic, but the latency requirement to start the processing is short. Therefore, a regular pipeline with a cron input that polls for new data might not be an optimal solution.

Examples of streaming data include a message queue, a database transactions log, event notifications, and others. In spouts, your code runs continuously and writes the results to the pipeline's output location, pfs/out. Every time you create a complete .tar archive, Pachyderm creates a new commit and triggers the pipeline to process it.

One main difference from regular pipelines is that spouts ingest their data from outside sources. Therefore, they do not take an input.

Another important aspect is that in spouts, pfs/out is a named pipe, or First in, First Out (FIFO), and is not a directory like in standard pipelines. Unlike the traditional pipe, that is familiar to most Linux users, a named pipe enables two system processes to access the pipe simultaneously and gives one of the processes read-only and the other process write-only access. Therefore, the two processes can simultaneously read and write to the same pipe.

To create a spout pipeline, you need the following items:

  • A source of streaming data
  • A Docker container with your spout code that reads from the data source
  • A spout pipeline specification file that uses the container

Your spout code performs the following actions:

  1. Connects to the specified streaming data source.
  2. Opens /pfs/out as a named pipe.
  3. Reads the data from the streaming data source.
  4. Packages the data into a tar stream.
  5. Writes the tar stream into the pfs/out pipe. In case of transient errors produced by closing a previous write to the pipe, retries the write operation.
  6. Closes the tar stream and connection to /pfs/out, which produces the commit.

A minimum spout specification must include the following parameters:

Parameter Description
name The name of your data pipeline and the output repository. You can set an
arbitrary name that is meaningful to the code you want to run.
transform Specifies the code that you want to run against your data, such as a Python
or Go script. Also, it specifies a Docker image that you want to use to run that script.
overwrite (Optional) Specifies whether to overwrite the existing content
of the file from previous commits or previous calls to the
put file command within this commit. The default value is false.

The following text is an example of a minimum specification:

Note

The env property is an optional argument. You can define your data stream source from within the container in which you run your script. For simplicity, in this example, env specifies the source of the Kafka host.

{
  "pipeline": {
    "name": "my-spout"
  },
  "transform": {
    "cmd": [ "go", "run", "./main.go" ],
    "image": "myaccount/myimage:0.1"
    "env": {
        "HOST": "kafkahost",
        "TOPIC": "mytopic",
        "PORT": "9092"
    },
  },
  "spout": {
    "overwrite": false
  }
}

Resuming Spout Progress

When a spout container crashes, all incomplete operations that were processed before the crash are lost, and the spout needs to start the interrupted data operation from scratch. To keep the history of changes, so that the spout can continue where it left off after the restart, you can configure a record tracking marker file for your spout.

When you specify the marker parameter as a subfield in the spout section of your pipeline, Pachyderm creates the marker file or directory. The file or directory is named according to the provided value. For example, if you specify "marker": "offset", Pachyderm stores the current marker in pfs/out/offset and the previous marker in pfs/offset. If a spout container crashes and then starts again, it can read the marker file and resume where it left off instead of starting over.

Markers are useful if you want to leverage a record tracking functionality of an external messaging system, such as Apache® Kafka offset management or similar.

If you want to check how a marker works in Pahcyderm, see the Resuming a Spout Pipeline example.

Last updated: February 28, 2020