Skip to content

Pipelines

taters.pipelines.run_pipeline

Taters Pipeline Runner (robust templating + flexible call resolution)

  • ITEM steps run once per input (fan-out concurrently).
  • GLOBAL steps run once (barrier before/after).
  • Templating preserves native types when the entire value is a single template (e.g., {{var:text_cols}} → list, not "['text']").
  • Calls:
    • "potato.*" → call via a Taters() instance (e.g., potato.text.analyze_with_dictionaries)
    • dotted path → import and call any function (e.g., taters.helpers.feature_gather.aggregate_features)

Usage example: python -m taters.pipelines.run_pipeline --root_dir videos --file_type video --preset conversation_video --workers 4 --var device=cuda --var overwrite_existing=true

discover_inputs

discover_inputs(root_dir, kind)

Recursively discover input files under a root folder.

The preset's ITEM-scoped steps operate over a list of inputs. This function builds that list by scanning root_dir and selecting files by type:

  • kind == "video": only common video extensions (e.g., .mp4, .mov, .mkv)
  • kind == "audio": only common audio extensions (e.g., .wav, .mp3, .flac)
  • kind == "any": all files

Parameters:

Name Type Description Default
root_dir Path

Directory to scan (will be resolved to an absolute path).

required
kind ('audio', 'video', 'any')

Filter that determines which file extensions are included.

"audio","video","any"

Returns:

Type Description
List[Path]

Sorted list of absolute file paths.

Raises:

Type Description
FileNotFoundError

If root_dir does not exist.

Source code in src\taters\pipelines\run_pipeline.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def discover_inputs(root_dir: Path, kind: str) -> List[Path]:
    """
    Recursively discover input files under a root folder.

    The preset's ITEM-scoped steps operate over a list of inputs. This
    function builds that list by scanning `root_dir` and selecting files by
    type:

    - kind == "video": only common video extensions (e.g., .mp4, .mov, .mkv)
    - kind == "audio": only common audio extensions (e.g., .wav, .mp3, .flac)
    - kind == "any":   all files

    Parameters
    ----------
    root_dir : Path
        Directory to scan (will be resolved to an absolute path).
    kind : {"audio","video","any"}
        Filter that determines which file extensions are included.

    Returns
    -------
    List[Path]
        Sorted list of absolute file paths.

    Raises
    ------
    FileNotFoundError
        If `root_dir` does not exist.
    """
    root_dir = root_dir.resolve()
    if not root_dir.exists():
        raise FileNotFoundError(f"root_dir not found: {root_dir}")
    out: List[Path] = []
    for p in root_dir.rglob("*"):
        if not p.is_file():
            continue
        ext = p.suffix.lower()
        if kind == "video" and ext in _VIDEO_EXTS:
            out.append(p)
        elif kind == "audio" and ext in _AUDIO_EXTS:
            out.append(p)
        elif kind == "any":
            out.append(p)
    return sorted(out)

load_preset_by_name

load_preset_by_name(name)

Load a named pipeline preset from taters/pipelines/presets/<name>.yaml.

Parameters:

Name Type Description Default
name str

Basename of a YAML preset in the presets/ directory.

required

Returns:

Type Description
dict

Parsed YAML as a Python dictionary. Returns {} for an empty file.

Raises:

Type Description
FileNotFoundError

If the preset file does not exist.

Source code in src\taters\pipelines\run_pipeline.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def load_preset_by_name(name: str) -> dict:
    """
    Load a named pipeline preset from `taters/pipelines/presets/<name>.yaml`.

    Parameters
    ----------
    name : str
        Basename of a YAML preset in the `presets/` directory.

    Returns
    -------
    dict
        Parsed YAML as a Python dictionary. Returns `{}` for an empty file.

    Raises
    ------
    FileNotFoundError
        If the preset file does not exist.
    """
    here = Path(__file__).parent
    path = here / "presets" / f"{name}.yaml"
    if not path.exists():
        raise FileNotFoundError(f"Preset not found: {path}")
    with path.open("r", encoding="utf-8") as f:
        return yaml.safe_load(f) or {}

load_yaml_file

load_yaml_file(path)

Load a YAML file into a Python dictionary.

Parameters:

Name Type Description Default
path Path

Full path to a YAML file.

required

Returns:

Type Description
dict

Parsed YAML contents. Empty files yield {}.

Source code in src\taters\pipelines\run_pipeline.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def load_yaml_file(path: Path) -> dict:
    """
    Load a YAML file into a Python dictionary.

    Parameters
    ----------
    path : Path
        Full path to a YAML file.

    Returns
    -------
    dict
        Parsed YAML contents. Empty files yield `{}`.
    """
    with path.open("r", encoding="utf-8") as f:
        return yaml.safe_load(f) or {}

main

main()

Entry point for the Taters Pipeline Runner.

Responsibilities
  • Parse CLI arguments (--preset or --preset-file, optional --vars-file and repeated --var key=value overrides, --workers, etc.).
  • Load the preset YAML and merge variables from three sources in order: 1) preset vars block 2) --vars-file (YAML) 3) repeated --var CLI flags
  • Decide whether input discovery is required:
    • If the preset has any ITEM-scoped steps, --root_dir is required and files are discovered with discover_inputs(...).
    • If there are only GLOBAL steps, discovery is skipped entirely.
  • Build a run manifest skeleton (preset name, inputs, vars, globals).
  • Create a single Taters() instance (shared across all steps in the run).
  • Execute each step in order:
    • ITEM steps: fan out across discovered inputs using a thread or process pool (configurable per step). A given step reuses one pool for all items to amortize worker startup.
    • GLOBAL steps: run once, in order, with a barrier between steps.
  • After each step, update and persist the JSON manifest so long-running runs are observable and resumable.
  • Print the final manifest path on completion.
Concurrency Notes
  • The default executor for ITEM steps is a ThreadPoolExecutor (good for I/O-bound steps and for GPU inference that releases the GIL).
  • For heavy Python/CPU work, presets may set engine: process on a step to use a ProcessPoolExecutor. In that case, be mindful that a new Python process is spawned for each worker (model weights may be reloaded once per worker).
Error Handling
  • Individual ITEM step failures do not crash the pipeline; they mark that item as "error" in the manifest and continue.
  • GLOBAL step failures are terminal for the run (the loop breaks).

Returns:

Type Description
None

The function exits the process after writing the manifest.

Source code in src\taters\pipelines\run_pipeline.py
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
def main():
    """
    Entry point for the Taters Pipeline Runner.

    Responsibilities
    ----------------
    - Parse CLI arguments (`--preset` or `--preset-file`, optional `--vars-file`
      and repeated `--var key=value` overrides, `--workers`, etc.).
    - Load the preset YAML and merge variables from three sources in order:
        1) preset `vars` block
        2) `--vars-file` (YAML)
        3) repeated `--var` CLI flags
    - Decide whether input discovery is required:
        * If the preset has any ITEM-scoped steps, `--root_dir` is required and
          files are discovered with `discover_inputs(...)`.
        * If there are only GLOBAL steps, discovery is skipped entirely.
    - Build a run manifest skeleton (preset name, inputs, vars, globals).
    - Create a single `Taters()` instance (shared across all steps in the run).
    - Execute each step in order:
        * ITEM steps: fan out across discovered inputs using a thread or process
          pool (configurable per step). A given step reuses one pool for all
          items to amortize worker startup.
        * GLOBAL steps: run once, in order, with a barrier between steps.
    - After each step, update and persist the JSON manifest so long-running runs
      are observable and resumable.
    - Print the final manifest path on completion.

    Concurrency Notes
    -----------------
    - The default executor for ITEM steps is a `ThreadPoolExecutor` (good for
      I/O-bound steps and for GPU inference that releases the GIL).
    - For heavy Python/CPU work, presets may set `engine: process` on a step to
      use a `ProcessPoolExecutor`. In that case, be mindful that a new Python
      process is spawned for each worker (model weights may be reloaded once per
      worker).

    Error Handling
    --------------
    - Individual ITEM step failures do not crash the pipeline; they mark that
      item as `"error"` in the manifest and continue.
    - GLOBAL step failures are terminal for the run (the loop breaks).

    Returns
    -------
    None
        The function exits the process after writing the manifest.
    """
    # ---------------------------
    # CLI
    # ---------------------------
    ap = argparse.ArgumentParser(
        description="Taters Pipeline Runner (robust templating + flexible calls)"
    )
    ap.add_argument("--root_dir", default=None,
                    help="Folder to scan for inputs (required only if preset has ITEM steps)")
    ap.add_argument("--file_type", default="any", choices=["audio", "video", "any"],
                    help="Input type filter for discovery")

    # NOTE: not required here — we enforce after handling list/describe.
    group = ap.add_mutually_exclusive_group(required=False)
    group.add_argument("--preset", help="Preset name (taters/pipelines/presets/<name>.yaml)")
    group.add_argument("--preset-file", dest="preset_file", help="Path to preset YAML")

    ap.add_argument("--vars-file", dest="vars_file", help="YAML file with 'vars' overrides")
    ap.add_argument("--var", action="append", default=[], help="Single override key=value (repeatable)")
    ap.add_argument("--workers", type=int, default=4, help="Concurrency for ITEM steps")
    ap.add_argument("--out-manifest", dest="out_manifest", default=None,
                    help="Run manifest (JSON). Default: ./run_manifest.json")

    # discovery / docs helpers
    ap.add_argument("--list-presets", action="store_true",
                    help="List all discovered presets and exit")
    ap.add_argument("--describe-preset", metavar="NAME",
                    help="Show metadata for a preset (by id or filename) and exit")

    args = ap.parse_args()

    # Early-exit helpers (no preset required)
    if args.list_presets:
        _cmd_list_presets()
        sys.exit(0)

    if args.describe_preset:
        _cmd_describe_preset(args.describe_preset)
        sys.exit(0)

    # Now enforce that one of --preset/--preset-file is present
    if not (args.preset or args.preset_file):
        ap.error("one of --preset or --preset-file is required "
                 "unless using --list-presets or --describe-preset")

    # ---------------------------
    # Load preset and vars first
    # ---------------------------
    preset = load_preset_by_name(args.preset) if args.preset else load_yaml_file(Path(args.preset_file))
    steps: List[dict] = preset.get("steps", []) or []
    if not steps:
        raise ValueError("Preset has no steps")

    vars_ctx: Dict[str, Any] = dict(preset.get("vars", {}) or {})
    if args.vars_file:
        vars_ctx = merge_vars(vars_ctx, load_yaml_file(Path(args.vars_file)))
    vars_ctx = merge_vars(vars_ctx, parse_var_overrides(args.var))

    # ---------------------------
    # Decide if discovery is needed
    # ---------------------------
    has_item_steps = any((step.get("scope", "item") == "item") for step in steps)

    inputs: List[Path] = []
    root_dir: Path | None = None
    if has_item_steps:
        if not args.root_dir:
            raise ValueError("--root_dir is required because this preset contains ITEM-scoped steps.")
        root_dir = Path(args.root_dir).resolve()
        inputs = discover_inputs(root_dir, args.file_type)
        print(f"[pipeline] Found {len(inputs)} '{args.file_type}' input(s) under {root_dir}")
        if not inputs:
            print("[pipeline] No inputs found; ITEM steps will be skipped.")
    else:
        print("[pipeline] Preset has only GLOBAL steps; skipping input discovery.")

    # ---------------------------
    # Build manifest skeleton
    # ---------------------------
    manifest: Dict[str, Any] = {
        "preset": args.preset or str(args.preset_file),
        "root_dir": str(root_dir) if root_dir else None,
        "file_type": args.file_type if has_item_steps else None,
        "vars": _json_safe(vars_ctx),
        "items": [{"input": str(p), "artifacts": {}, "status": "pending", "errors": []} for p in inputs],
        "globals": {},
        "errors": [],
    }
    out_manifest_path = Path(args.out_manifest or (Path.cwd() / "run_manifest.json"))

    # Create a single Taters instance for the whole run (correct class import)
    from taters.Taters import Taters  # ← IMPORTANT: import the class, not the module
    potato = Taters()
    globals_ctx: Dict[str, Any] = {}

    # ---------------------------
    # Execute steps
    # ---------------------------
    for idx, step in enumerate(steps, 1):
        scope = step.get("scope", "item")
        call_name = step.get("call")
        print(f"[pipeline] Step {idx}/{len(steps)}: {call_name}  (scope={scope})")

        if scope == "item":
            if not inputs:
                print(f"[pipeline] No inputs; skipping ITEM step: {call_name}")
                continue

            step_engine = step.get("engine", "thread")  # "thread" (default) or "process"
            step_workers = max(1, int(step.get("workers", args.workers)))

            def _run_one(ix_and_path: Tuple[int, Path]):
                i, p = ix_and_path
                itm = manifest["items"][i]
                status, new_artifacts, err = run_item_step_for_one_input(
                    step=step,
                    input_path=p,
                    potato=potato,
                    item_artifacts=itm["artifacts"],
                    globals_ctx=globals_ctx,
                    vars_ctx=vars_ctx,
                )
                return i, status, new_artifacts, err

            results: List[Tuple[int, str, Dict[str, Any], Dict[str, Any]]] = []
            Executor = cf.ProcessPoolExecutor if step_engine == "process" else cf.ThreadPoolExecutor
            with Executor(max_workers=step_workers) as pool:
                futures = [pool.submit(_run_one, (i, p)) for i, p in enumerate(inputs)]
                for fut in cf.as_completed(futures):
                    results.append(fut.result())

            for i, status, new_artifacts, err in results:
                itm = manifest["items"][i]
                if status == "ok":
                    for k, v in (new_artifacts or {}).items():
                        itm["artifacts"][k] = _json_safe(v)
                    if itm["status"] != "error":
                        itm["status"] = "ok"
                else:
                    itm["status"] = "error"
                    itm["errors"].append(err.get("error", "unknown error"))

        elif scope == "global":
            status, new_globals, err = run_global_step(
                step=step,
                potato=potato,
                globals_ctx=globals_ctx,
                vars_ctx=vars_ctx,
                manifest_path=out_manifest_path,
            )
            if status != "ok":
                print(f"[pipeline] GLOBAL step failed: {err.get('error')}")
                manifest["errors"].append(err.get("error", "unknown error"))
                break
            for k, v in (new_globals or {}).items():
                globals_ctx[k] = v
                manifest["globals"][k] = _json_safe(v)

        else:
            raise ValueError(f"Invalid scope: {scope}")

        # Persist manifest after each step
        out_manifest_path.parent.mkdir(parents=True, exist_ok=True)
        with out_manifest_path.open("w", encoding="utf-8") as f:
            json.dump(_json_safe(manifest), f, indent=2, ensure_ascii=False)

    print(f"[pipeline] Manifest written to: {out_manifest_path}")

merge_vars

merge_vars(base, overlay)

Shallow-merge two variable dictionaries.

Later sources of variables (e.g., --vars-file, then repeated --var overrides) should replace keys from earlier sources. This helper applies a simple dict.update(...) and returns a new dictionary.

Parameters:

Name Type Description Default
base dict

The starting dictionary of variables.

required
overlay dict

The dictionary whose keys override entries in base.

required

Returns:

Type Description
dict

A new dictionary with merged keys/values.

Source code in src\taters\pipelines\run_pipeline.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def merge_vars(base: dict, overlay: dict) -> dict:
    """
    Shallow-merge two variable dictionaries.

    Later sources of variables (e.g., `--vars-file`, then repeated `--var`
    overrides) should replace keys from earlier sources. This helper
    applies a simple `dict.update(...)` and returns a new dictionary.

    Parameters
    ----------
    base : dict
        The starting dictionary of variables.
    overlay : dict
        The dictionary whose keys override entries in `base`.

    Returns
    -------
    dict
        A new dictionary with merged keys/values.
    """
    out = dict(base or {})
    out.update(overlay or {})
    return out

parse_var_overrides

parse_var_overrides(pairs)

Parse --var key=value CLI overrides into typed Python values.

Typing rules: - "true"/"false" (case-insensitive) → bool - "null"/"none" (case-insensitive) → None - integer or float strings → numeric - all else → raw string

Parameters:

Name Type Description Default
pairs List[str]

CLI arguments of the form ["k1=v1", "k2=v2", ...].

required

Returns:

Type Description
dict

Mapping from variable name to parsed value.

Raises:

Type Description
ValueError

If any entry does not contain an '=' separator.

Source code in src\taters\pipelines\run_pipeline.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def parse_var_overrides(pairs: List[str]) -> dict:
    """
    Parse `--var key=value` CLI overrides into typed Python values.

    Typing rules:
      - "true"/"false" (case-insensitive) → bool
      - "null"/"none" (case-insensitive)  → None
      - integer or float strings → numeric
      - all else → raw string

    Parameters
    ----------
    pairs : List[str]
        CLI arguments of the form `["k1=v1", "k2=v2", ...]`.

    Returns
    -------
    dict
        Mapping from variable name to parsed value.

    Raises
    ------
    ValueError
        If any entry does not contain an '=' separator.
    """
    out: Dict[str, Any] = {}
    for s in pairs:
        if "=" not in s:
            raise ValueError(f"--var expects key=value, got: {s}")
        k, v = s.split("=", 1)
        vs = v.strip()
        if vs.lower() in {"true", "false"}:
            out[k] = (vs.lower() == "true")
        elif vs.lower() in {"null", "none"}:
            out[k] = None
        else:
            try:
                out[k] = float(vs) if "." in vs else int(vs)
            except Exception:
                out[k] = v
    return out

render_value

render_value(
    val, *, item_ctx, globals_ctx, vars_ctx, input_path
)

Render templating expressions within a value (str, list, or dict).

Behavior
  • Dicts/lists/tuples: render recursively.
  • If a string is exactly one template token (e.g., "{{var:text_cols}}"), return the native value of that expression (list, int, bool, ...).
  • Otherwise, perform string substitution for every {{...}} occurrence and return the resulting string.
Resolution rules (summary)
  • {{input}} / {{cwd}}
  • {{var:key}}
  • {{global.path}} (explicit globals)
  • {{pick:name.path}} → search item, then globals
  • {{name}} → bare name; search item, then globals
Source code in src\taters\pipelines\run_pipeline.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def render_value(
    val: Any,
    *,
    item_ctx: dict,
    globals_ctx: dict,
    vars_ctx: dict,
    input_path: Path
) -> Any:
    """
    Render templating expressions within a value (str, list, or dict).

    Behavior
    --------
      - Dicts/lists/tuples: render recursively.
      - If a string is exactly one template token (e.g., "{{var:text_cols}}"),
        return the *native* value of that expression (list, int, bool, ...).
      - Otherwise, perform string substitution for every {{...}} occurrence and
        return the resulting string.

    Resolution rules (summary)
    --------------------------
      - {{input}} / {{cwd}}
      - {{var:key}}
      - {{global.path}} (explicit globals)
      - {{pick:name.path}}  → search item, then globals
      - {{name}}            → bare name; search item, then globals
    """
    if isinstance(val, dict):
        return {
            k: render_value(
                v,
                item_ctx=item_ctx,
                globals_ctx=globals_ctx,
                vars_ctx=vars_ctx,
                input_path=input_path,
            )
            for k, v in val.items()
        }
    if isinstance(val, (list, tuple)):
        return [
            render_value(
                v,
                item_ctx=item_ctx,
                globals_ctx=globals_ctx,
                vars_ctx=vars_ctx,
                input_path=input_path,
            )
            for v in val
        ]
    if not isinstance(val, str):
        return val

    # Entire string is a single template → return native type
    m = _VAR_RE.fullmatch(val.strip())
    if m:
        return _eval_expr(
            m.group(1),
            item_ctx=item_ctx,
            globals_ctx=globals_ctx,
            vars_ctx=vars_ctx,
            input_path=input_path,
        )

    # Otherwise substitute each token as a string
    def _subst(match: re.Match) -> str:
        expr = match.group(1)
        v = _eval_expr(
            expr,
            item_ctx=item_ctx,
            globals_ctx=globals_ctx,
            vars_ctx=vars_ctx,
            input_path=input_path,
        )
        return str(v)

    return _VAR_RE.sub(_subst, val)

resolve_call

resolve_call(call_name, potato)

Resolve a call target from a preset step into an actual callable.

Supported forms

1) Taters instance methods (recommended): - "potato.audio.convert_to_wav" - "potato.text.analyze_with_dictionaries" The function is resolved via attribute chaining on a single Taters() instance created for the whole run.

2) Dotted import paths: - "package.module:function" - "package.module.func" - "package.module.Class.method" The target is imported and attributes are resolved. The final target must be callable.

Parameters:

Name Type Description Default
call_name str

Call string from the preset step's call: field.

required
potato Taters

The shared Taters instance for resolving "potato.*" calls.

required

Returns:

Type Description
Callable

The function/object that will be invoked for the step.

Raises:

Type Description
(AttributeError, KeyError, TypeError)

If the target cannot be resolved or is not callable.

Source code in src\taters\pipelines\run_pipeline.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def resolve_call(call_name: str, potato: Taters):
    """
    Resolve a call target from a preset step into an actual callable.

    Supported forms
    ---------------
    1) Taters instance methods (recommended):
       - `"potato.audio.convert_to_wav"`
       - `"potato.text.analyze_with_dictionaries"`
       The function is resolved via attribute chaining on a single
       `Taters()` instance created for the whole run.

    2) Dotted import paths:
       - `"package.module:function"`
       - `"package.module.func"`
       - `"package.module.Class.method"`
       The target is imported and attributes are resolved. The final target
       must be callable.

    Parameters
    ----------
    call_name : str
        Call string from the preset step's `call:` field.
    potato : Taters
        The shared `Taters` instance for resolving `"potato.*"` calls.

    Returns
    -------
    Callable
        The function/object that will be invoked for the step.

    Raises
    ------
    AttributeError, KeyError, TypeError
        If the target cannot be resolved or is not callable.
    """
    if call_name.startswith("potato."):
        obj: Any = potato
        for part in call_name.split(".")[1:]:
            if not hasattr(obj, part):
                raise AttributeError(f"{call_name}: '{part}' not found on {obj}")
            obj = getattr(obj, part)
        if not callable(obj):
            raise TypeError(f"{call_name} is not callable")
        return obj

    # Allow dotted import paths
    # Support both "pkg.mod:func" and "pkg.mod.func"
    mod_path, sep, tail = call_name.partition(":")
    if not sep:
        # split at last dot for function
        parts = call_name.rsplit(".", 1)
        if len(parts) == 2:
            mod_path, tail = parts
        else:
            raise KeyError(f"Cannot resolve call target: {call_name}")
    module = importlib.import_module(mod_path)
    target = module
    for attr in tail.split("."):
        if not hasattr(target, attr):
            raise AttributeError(f"{call_name}: '{attr}' not found in {target}")
        target = getattr(target, attr)
    if not callable(target):
        raise TypeError(f"{call_name} resolved to non-callable: {target}")
    return target

run_global_step

run_global_step(
    *, step, potato, globals_ctx, vars_ctx, manifest_path
)

Execute a single GLOBAL-scoped step (runs once per pipeline).

Differences from ITEM steps
  • The templating item_ctx is empty.
  • The run manifest path is injected into vars as run_manifest, so presets can reference it in GLOBAL stages.
  • On success, any values from save_as: are merged into the globals artifact map.

Parameters:

Name Type Description Default
step dict

The step definition block from the preset.

required
potato Taters

Shared Taters instance used to call potato.* targets.

required
globals_ctx Dict[str, Any]

Accumulated global artifacts (readable by later steps).

required
vars_ctx Dict[str, Any]

Merged variables.

required
manifest_path Path

Path where the JSON run manifest is (or will be) saved.

required

Returns:

Type Description
Tuple[str, Dict[str, Any], Dict[str, Any]]

A tuple (status, new_globals, err) mirroring the ITEM step shape.

Source code in src\taters\pipelines\run_pipeline.py
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
def run_global_step(
    *, step: dict, potato: Taters, globals_ctx: Dict[str, Any], vars_ctx: Dict[str, Any], manifest_path: Path
) -> Tuple[str, Dict[str, Any], Dict[str, Any]]:
    """
    Execute a single GLOBAL-scoped step (runs once per pipeline).

    Differences from ITEM steps
    ---------------------------
    - The templating `item_ctx` is empty.
    - The run manifest path is injected into `vars` as `run_manifest`,
      so presets can reference it in GLOBAL stages.
    - On success, any values from `save_as:` are merged into the `globals`
      artifact map.

    Parameters
    ----------
    step : dict
        The step definition block from the preset.
    potato : Taters
        Shared Taters instance used to call `potato.*` targets.
    globals_ctx : Dict[str, Any]
        Accumulated global artifacts (readable by later steps).
    vars_ctx : Dict[str, Any]
        Merged variables.
    manifest_path : Path
        Path where the JSON run manifest is (or will be) saved.

    Returns
    -------
    Tuple[str, Dict[str, Any], Dict[str, Any]]
        A tuple `(status, new_globals, err)` mirroring the ITEM step shape.
    """
    call = step["call"]
    params = step.get("with", {})

    # Expose manifest path via vars
    vars_aug = dict(vars_ctx)
    vars_aug["run_manifest"] = str(manifest_path)

    rendered = render_value(params, item_ctx={}, globals_ctx=globals_ctx, vars_ctx=vars_aug, input_path=manifest_path)

    func = resolve_call(call, potato)
    try:
        result = func(**rendered)
    except Exception as e:
        return ("error", {}, {"error": f"{call} failed: {e}"})

    out: Dict[str, Any] = {}
    if "save_as" in step:
        out[step["save_as"]] = result
    return ("ok", out, {})

run_item_step_for_one_input

run_item_step_for_one_input(
    *,
    step,
    input_path,
    potato,
    item_artifacts,
    globals_ctx,
    vars_ctx
)

Execute a single ITEM-scoped step for one input path.

Lifecycle

1) Template the step's with: parameters using render_value(...). 2) Validate any require: keys after templating (fail fast if missing). 3) Resolve the callable (Taters method or import path). 4) Invoke with keyword arguments. 5) If the step specified save_as: <name>, store the return value under that name in the item's artifacts dict.

Parameters:

Name Type Description Default
step dict

The step definition block from the preset.

required
input_path Path

The current input file for ITEM scope.

required
potato Taters

Shared Taters instance used to call potato.* targets.

required
item_artifacts Dict[str, Any]

The current item's artifact dictionary (mutated across steps).

required
globals_ctx Dict[str, Any]

Global artifacts (from GLOBAL steps).

required
vars_ctx Dict[str, Any]

Merged variables.

required

Returns:

Type Description
Tuple[str, Dict[str, Any], Dict[str, Any]]

A tuple (status, new_artifacts, err) where: - status is "ok" or "error". - new_artifacts is a (possibly empty) dict of artifacts to merge. - err contains an "error" message on failure.

Source code in src\taters\pipelines\run_pipeline.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
def run_item_step_for_one_input(
    *, step: dict, input_path: Path, potato: Taters, item_artifacts: Dict[str, Any],
    globals_ctx: Dict[str, Any], vars_ctx: Dict[str, Any]
) -> Tuple[str, Dict[str, Any], Dict[str, Any]]:
    """
    Execute a single ITEM-scoped step for one input path.

    Lifecycle
    ---------
    1) Template the step's `with:` parameters using `render_value(...)`.
    2) Validate any `require:` keys after templating (fail fast if missing).
    3) Resolve the callable (Taters method or import path).
    4) Invoke with keyword arguments.
    5) If the step specified `save_as: <name>`, store the return value under
       that name in the item's `artifacts` dict.

    Parameters
    ----------
    step : dict
        The step definition block from the preset.
    input_path : Path
        The current input file for ITEM scope.
    potato : Taters
        Shared Taters instance used to call `potato.*` targets.
    item_artifacts : Dict[str, Any]
        The current item's artifact dictionary (mutated across steps).
    globals_ctx : Dict[str, Any]
        Global artifacts (from GLOBAL steps).
    vars_ctx : Dict[str, Any]
        Merged variables.

    Returns
    -------
    Tuple[str, Dict[str, Any], Dict[str, Any]]
        A tuple `(status, new_artifacts, err)` where:
          - `status` is `"ok"` or `"error"`.
          - `new_artifacts` is a (possibly empty) dict of artifacts to merge.
          - `err` contains an `"error"` message on failure.
    """
    call = step["call"]
    params = step.get("with", {})

    rendered = render_value(params, item_ctx=item_artifacts, globals_ctx=globals_ctx, vars_ctx=vars_ctx, input_path=input_path)

    # Required keys check (post-templating)
    for key in step.get("require", []):
        if key not in rendered or rendered[key] in (None, "", []):
            return ("error", {}, {"error": f"Missing required parameter '{key}' after templating"})

    func = resolve_call(call, potato)
    try:
        result = func(**rendered)
    except Exception as e:
        return ("error", {}, {"error": f"{call} failed: {e}"})

    out: Dict[str, Any] = {}
    if "save_as" in step:
        out[step["save_as"]] = result
    return ("ok", out, {})