Pipelines¶
Pipelines let you chain Taters steps into a repeatable recipe. Instead of running one command at a time for each file, you write a short YAML preset that says: discover my inputs, run these functions per file, then run these global steps once at the end. The runner takes care of variable substitution, concurrency, error isolation, and a manifest so you can audit what happened.
Big picture¶
-
Two scopes:
-
item
steps run once per discovered input (fan out with threads or processes). global
steps run once for the whole run (good for gather/aggregate).-
Discovery: If your preset has any
item
steps, you pass--root_dir
and the runner finds files by type (audio, video, any). If it has onlyglobal
steps, no discovery is done. -
Templating: Values like
{{input}}
,{{var:device}}
, and{{global.some_key}}
are filled in at runtime. If a field is exactly a single template (for exampletext_cols: "{{var:text_cols}}"
), the runner preserves the native type (list, int, bool) rather than turning it into a string. - Calls: Use
call: potato.<namespace>.<function>
to invoke Taters methods on a sharedTaters()
instance, or use a dotted import likepackage.module:function
. - Artifacts: Steps can
save_as: name
; later steps can reference{{name}}
(directly) or pick nested fields with{{pick:name.path}}
. Global steps can alsosave_as
and are referenced via{{global.key}}
. - Concurrency: Default engine is threads; you can set
engine: process
for CPU-heavy work and other specific conditions (see below). Each step reuses one pool across all items. Control fan-out withworkers
. - Manifest: After each step the runner writes a JSON manifest with inputs, status, artifacts, and errors. You can point to it with
--out-manifest
.
A minimal preset, explained¶
This example converts videos to WAV, runs diarization, then gathers features at the end. Save it as my_pipelines/minimal_conversation.yaml
.
# Variables you can override from the CLI with --var key=value
vars:
device: "auto"
overwrite_existing: false
whisper_model: "tiny"
steps:
# 1) ITEM step: convert each video to WAV
- scope: item
call: potato.audio.convert_to_wav
save_as: wav
with:
input_path: "{{input}}"
overwrite_existing: "{{var:overwrite_existing}}"
# 2) ITEM step: diarize & transcribe each WAV
- scope: item
call: potato.audio.diarizer.whisper_diar_wrapper
save_as: diar
with:
audio_path: "{{wav}}" # uses artifact from step 1
device: "{{var:device}}"
out_dir: "{{cwd}}/transcripts" # write transcripts here
# 3) ITEM step: extract Whisper embeddings per transcript segment
- scope: item
call: potato.audio.extract_whisper_embeddings
save_as: whisper_feats
with:
source_wav: "{{wav}}"
transcript_csv: "{{pick:diar.csv}}" # pick nested key (if diar returns dict)
time_unit: "ms"
model_name: "{{var:whisper_model}}"
device: "{{var:device}}"
overwrite_existing: "{{var:overwrite_existing}}"
# 4) GLOBAL step: gather all Whisper embeddings into one CSV
- scope: global
call: taters.helpers.feature_gather.feature_gather
save_as: whisper_gathered
with:
root_dir: "./features/whisper-embeddings"
pattern: "*.csv"
group_by: null
out_csv: null # default: ./features/whisper-embeddings.csv
overwrite_existing: false
Key ideas you can see here:
{{input}}
expands to the current file's path.{{cwd}}
is the working directory.{{var:…}}
reads from thevars:
block, which you can override later.-
save_as
artifacts can be reused by name;{{pick:…}}
digs into their fields. -
The final
global
step does not need discovery. It runs once.
Understanding templating¶
You can use expressions in any with:
parameter:
{{input}}
→ current item's full path (string).{{cwd}}
→ current working dir.{{var:key}}
→ a variable fromvars
or CLI overrides.{{global.something}}
→ value from a global artifact.{{pick:artifact.nested.path}}
→ nested lookup inside a saved artifact.{{artifact_name}}
→ direct artifact value from a prior step.
If the entire value is a single template, the runner returns the native type. For example:
text_cols: "{{var:text_cols}}" # becomes a list, not the string "['text']"
This avoids the classic YAML gotcha where lists turn into strings.
How discovery works¶
When your preset has any item
steps, you must pass --root_dir
and the kind of files to look for:
--file_type video
→ common video extensions like .mp4, .mov, .mkv--file_type audio
→ common audio extensions like .wav, .mp3, .flac--file_type any
→ all files
The runner prints how many inputs it found and proceeds. If there are no inputs, item steps are skipped.
Running a preset from the CLI¶
You can load a named preset that ships with Taters or point to a YAML file on disk.
A) Using a built-in preset by name¶
python -m taters.pipelines.run_pipeline \
--root_dir videos \
--file_type video \
--preset conversation_video \
--workers 8 \
--var device=cuda \
--var overwrite_existing=false \
--var whisper_model=tiny
This uses taters/pipelines/presets/conversation_video.yaml
. Variables are merged in this order: preset vars
→ --vars-file
(if provided) → repeated --var key=value
flags (highest precedence).
B) Using your own YAML file¶
python -m taters.pipelines.run_pipeline \
--root_dir recordings \
--file_type audio \
--preset-file my_pipelines/minimal_conversation.yaml \
--workers 4 \
--var device=cpu \
--var overwrite_existing=true \
--out-manifest ./runs/conv_run1_manifest.json
- Set
--out-manifest
to control where the JSON manifest is written. - Use
--workers N
to control concurrency foritem
steps; defaults to 4. - If your preset has only
global
steps, you can omit--root_dir
.
Engines, workers, and performance¶
There are two ways that you can launch a Taters module: as a thread
or a process
. There is some nuance in choosing which route you take. For most users' cases, a thread
is probably optimal. However, if you are using a GPU-heavy library (e.g., pytorch
) that loads models, you will absolutely need to limit your available workers for this step to 1. As is my understanding, you can run into some complicated partially-loaded model collisions if you are trying to multithread processes that require CUDA.
However, if you want to run multiple files concurrently using a CUDA-reliant package to speed things along, process
is the way to go. Note that, currently, this means that any given model is loaded N number of times concurrently — for example, if you want to run 10 archetype
analyses at a time, this means that the model driving the analysis (e.g., roberta-v1-large
) will be loaded separately for each process. This can get both read-heavy and RAM-intensive, so you will want to ensure that your stack can handle it.
There are optimizations to be made here in your pipeline. For example, it might ultimately be faster or more efficient to gather
your text first into a CSV, then process that CSV using a single thread
or process
rather than analyze all of your texts separately, then aggregating the resulting output later. But, with the pipeline
feature... the choice is yours!
engine: thread
(default) works well for I/O and GPU inference.engine: process
can help for CPU-heavy Python code; note that each worker is a separate Python process (models may load once per worker).- The
workers
field can be set per step; otherwise the CLI's--workers
applies.
Error handling and the manifest¶
- If an
item
step fails for one file, the run continues; that item is marked"error"
with a message in the manifest. - A failed
global
step stops the entire run. - The manifest captures inputs, variables, step status, saved artifacts (JSON-safe), and errors. It is rewritten after each step so you can monitor long runs.
Common patterns¶
- Gather only: You can make a preset with only
global
steps to gather features from prior runs. No--root_dir
needed. - Per-speaker flow: Diarize as an
item
step, then usesplit_wav_by_speaker
as anotheritem
step to produce per-speaker WAVs for downstream modeling. - Mixed sources: Use
--file_type any
for heterogeneous folders, and guard steps withrequire:
fields so they fail fast if a parameter is missing after templating.
Example of require:
:
- scope: item
call: potato.audio.extract_whisper_embeddings
require: ["transcript_csv"] # fail if this key is missing or empty after templating
with:
source_wav: "{{wav}}"
transcript_csv: "{{pick:diar.csv}}"
time_unit: "ms"
The step will error cleanly for that item if diar.csv
is not present.
Where to go next¶
- Start with the built-in
conversation_video
preset and tweak variables. - Copy the minimal preset above and expand it with dictionary and archetype steps once your transcripts look good.
- Keep an eye on the manifest to understand what was produced and where it lives.