Skip to content

Pipelines

Pipelines let you chain Taters steps into a repeatable recipe. Instead of running one command at a time for each file, you write a short YAML preset that says: discover my inputs, run these functions per file, then run these global steps once at the end. The runner takes care of variable substitution, concurrency, error isolation, and a manifest so you can audit what happened.


Big picture

  • Two scopes:

  • item steps run once per discovered input (fan out with threads or processes).

  • global steps run once for the whole run (good for gather/aggregate).
  • Discovery: If your preset has any item steps, you pass --root_dir and the runner finds files by type (audio, video, any). If it has only global steps, no discovery is done.

  • Templating: Values like {{input}}, {{var:device}}, and {{global.some_key}} are filled in at runtime. If a field is exactly a single template (for example text_cols: "{{var:text_cols}}"), the runner preserves the native type (list, int, bool) rather than turning it into a string.

  • Calls: Use call: potato.<namespace>.<function> to invoke Taters methods on a shared Taters() instance, or use a dotted import like package.module:function.
  • Artifacts: Steps can save_as: name; later steps can reference {{name}} (directly) or pick nested fields with {{pick:name.path}}. Global steps can also save_as and are referenced via {{global.key}}.
  • Concurrency: Default engine is threads; you can set engine: process for CPU-heavy work and other specific conditions (see below). Each step reuses one pool across all items. Control fan-out with workers.
  • Manifest: After each step the runner writes a JSON manifest with inputs, status, artifacts, and errors. You can point to it with --out-manifest.

A minimal preset, explained

This example converts videos to WAV, runs diarization, then gathers features at the end. Save it as my_pipelines/minimal_conversation.yaml.

# Variables you can override from the CLI with --var key=value
vars:
  device: "auto"
  overwrite_existing: false
  whisper_model: "tiny"

steps:
  # 1) ITEM step: convert each video to WAV
  - scope: item
    call: potato.audio.convert_to_wav
    save_as: wav
    with:
      input_path: "{{input}}"
      overwrite_existing: "{{var:overwrite_existing}}"

  # 2) ITEM step: diarize & transcribe each WAV
  - scope: item
    call: potato.audio.diarizer.whisper_diar_wrapper
    save_as: diar
    with:
      audio_path: "{{wav}}"            # uses artifact from step 1
      device: "{{var:device}}"
      out_dir: "{{cwd}}/transcripts"   # write transcripts here

  # 3) ITEM step: extract Whisper embeddings per transcript segment
  - scope: item
    call: potato.audio.extract_whisper_embeddings
    save_as: whisper_feats
    with:
      source_wav: "{{wav}}"
      transcript_csv: "{{pick:diar.csv}}"   # pick nested key (if diar returns dict)
      time_unit: "ms"
      model_name: "{{var:whisper_model}}"
      device: "{{var:device}}"
      overwrite_existing: "{{var:overwrite_existing}}"

  # 4) GLOBAL step: gather all Whisper embeddings into one CSV
  - scope: global
    call: taters.helpers.feature_gather.feature_gather
    save_as: whisper_gathered
    with:
      root_dir: "./features/whisper-embeddings"
      pattern: "*.csv"
      group_by: null
      out_csv: null                     # default: ./features/whisper-embeddings.csv
      overwrite_existing: false

Key ideas you can see here:

  • {{input}} expands to the current file's path.
  • {{cwd}} is the working directory.
  • {{var:…}} reads from the vars: block, which you can override later.
  • save_as artifacts can be reused by name; {{pick:…}} digs into their fields.

  • The final global step does not need discovery. It runs once.


Understanding templating

You can use expressions in any with: parameter:

  • {{input}} → current item's full path (string).
  • {{cwd}} → current working dir.
  • {{var:key}} → a variable from vars or CLI overrides.
  • {{global.something}} → value from a global artifact.
  • {{pick:artifact.nested.path}} → nested lookup inside a saved artifact.
  • {{artifact_name}} → direct artifact value from a prior step.

If the entire value is a single template, the runner returns the native type. For example:

text_cols: "{{var:text_cols}}"  # becomes a list, not the string "['text']"

This avoids the classic YAML gotcha where lists turn into strings.


How discovery works

When your preset has any item steps, you must pass --root_dir and the kind of files to look for:

  • --file_type video → common video extensions like .mp4, .mov, .mkv
  • --file_type audio → common audio extensions like .wav, .mp3, .flac
  • --file_type any → all files

The runner prints how many inputs it found and proceeds. If there are no inputs, item steps are skipped.


Running a preset from the CLI

You can load a named preset that ships with Taters or point to a YAML file on disk.

A) Using a built-in preset by name

python -m taters.pipelines.run_pipeline \
  --root_dir videos \
  --file_type video \
  --preset conversation_video \
  --workers 8 \
  --var device=cuda \
  --var overwrite_existing=false \
  --var whisper_model=tiny

This uses taters/pipelines/presets/conversation_video.yaml. Variables are merged in this order: preset vars--vars-file (if provided) → repeated --var key=value flags (highest precedence).

B) Using your own YAML file

python -m taters.pipelines.run_pipeline \
  --root_dir recordings \
  --file_type audio \
  --preset-file my_pipelines/minimal_conversation.yaml \
  --workers 4 \
  --var device=cpu \
  --var overwrite_existing=true \
  --out-manifest ./runs/conv_run1_manifest.json
  • Set --out-manifest to control where the JSON manifest is written.
  • Use --workers N to control concurrency for item steps; defaults to 4.
  • If your preset has only global steps, you can omit --root_dir.

Engines, workers, and performance

There are two ways that you can launch a Taters module: as a thread or a process. There is some nuance in choosing which route you take. For most users' cases, a thread is probably optimal. However, if you are using a GPU-heavy library (e.g., pytorch) that loads models, you will absolutely need to limit your available workers for this step to 1. As is my understanding, you can run into some complicated partially-loaded model collisions if you are trying to multithread processes that require CUDA.

However, if you want to run multiple files concurrently using a CUDA-reliant package to speed things along, process is the way to go. Note that, currently, this means that any given model is loaded N number of times concurrently — for example, if you want to run 10 archetype analyses at a time, this means that the model driving the analysis (e.g., roberta-v1-large) will be loaded separately for each process. This can get both read-heavy and RAM-intensive, so you will want to ensure that your stack can handle it.

There are optimizations to be made here in your pipeline. For example, it might ultimately be faster or more efficient to gather your text first into a CSV, then process that CSV using a single thread or process rather than analyze all of your texts separately, then aggregating the resulting output later. But, with the pipeline feature... the choice is yours!

  • engine: thread (default) works well for I/O and GPU inference.
  • engine: process can help for CPU-heavy Python code; note that each worker is a separate Python process (models may load once per worker).
  • The workers field can be set per step; otherwise the CLI's --workers applies.

Error handling and the manifest

  • If an item step fails for one file, the run continues; that item is marked "error" with a message in the manifest.
  • A failed global step stops the entire run.
  • The manifest captures inputs, variables, step status, saved artifacts (JSON-safe), and errors. It is rewritten after each step so you can monitor long runs.

Common patterns

  • Gather only: You can make a preset with only global steps to gather features from prior runs. No --root_dir needed.
  • Per-speaker flow: Diarize as an item step, then use split_wav_by_speaker as another item step to produce per-speaker WAVs for downstream modeling.
  • Mixed sources: Use --file_type any for heterogeneous folders, and guard steps with require: fields so they fail fast if a parameter is missing after templating.

Example of require::

- scope: item
  call: potato.audio.extract_whisper_embeddings
  require: ["transcript_csv"]        # fail if this key is missing or empty after templating
  with:
    source_wav: "{{wav}}"
    transcript_csv: "{{pick:diar.csv}}"
    time_unit: "ms"

The step will error cleanly for that item if diar.csv is not present.


Where to go next

  • Start with the built-in conversation_video preset and tweak variables.
  • Copy the minimal preset above and expand it with dictionary and archetype steps once your transcripts look good.
  • Keep an eye on the manifest to understand what was produced and where it lives.