Skip to content

Utilities & Helpers

taters.helpers.feature_gather

AggregationPlan dataclass

AggregationPlan(
    group_by,
    per_file=True,
    stats=("mean", "std"),
    exclude_cols=(),
    include_regex=None,
    exclude_regex=None,
    dropna=True,
)

Plan describing how numeric feature columns should be aggregated.

Parameters:

Name Type Description Default
group_by Sequence[str]

One or more column names used as grouping keys (e.g., ["speaker"]).

required
per_file bool

If True, include "source" in the grouping keys to aggregate within each input file; if False, aggregate across all files globally.

True
stats Sequence[str]

Statistical reductions to compute for each numeric feature column. Values are passed to pandas.DataFrame.agg (e.g., "mean", "std", "median", etc.).

("mean", "std")
exclude_cols Sequence[str]

Columns to drop before filtering/selecting numeric features (e.g., timestamps or free text).

()
include_regex str or None

Optional regex; if provided, only columns matching this pattern are kept (after excluding exclude_cols).

None
exclude_regex str or None

Optional regex; if provided, columns matching this pattern are removed (after applying include_regex, if any).

None
dropna bool

Whether to drop rows with NA in any of the group-by keys before grouping.

True
Notes

This plan is consumed by :func:aggregate_features. Column filtering happens before numeric selection; only columns that remain and can be coerced to numeric will be aggregated.

aggregate_features

aggregate_features(
    *,
    root_dir,
    pattern="*.csv",
    recursive=True,
    delimiter=",",
    encoding="utf-8-sig",
    add_source_path=False,
    plan,
    out_csv=None,
    overwrite_existing=False
)

Discover files, read, concatenate, and aggregate numeric columns per plan.

This function consolidates CSVs from a single folder, filters columns, coerces candidate features to numeric, groups by the specified keys, and computes the requested statistics. Output columns for aggregated features are flattened with the pattern "{column}__{stat}".

Parameters:

Name Type Description Default
root_dir PathLike

Folder containing per-item CSVs, or a single CSV file.

required
pattern str

Glob pattern for selecting files.

"*.csv"
recursive bool

Recurse into subdirectories when True.

True
delimiter str

CSV delimiter.

","
encoding str

CSV encoding for read/write.

"utf-8-sig"
add_source_path bool

If True, include absolute path in "source_path" prior to filtering.

False
plan AggregationPlan

Aggregation configuration (group keys, stats, filters, NA handling).

required
out_csv PathLike or None

Output path. If None, defaults to <root_dir_parent>/<root_dir_name>.csv.

None
overwrite_existing bool

If False and out_csv exists, return it without recomputation.

False

Returns:

Type Description
Path

Path to the written CSV of aggregated features.

Raises:

Type Description
FileNotFoundError

If no files match the pattern under root_dir.

RuntimeError

If files were found but none could be read successfully.

ValueError

If required group-by columns are missing, or if no numeric columns remain after filtering, or if per-file grouping is requested but the "source" column is absent.

Notes

Group keys are preserved as leading columns in the output. The output places "source" (and optionally "source_path") first when present.

Source code in src\taters\helpers\feature_gather.py
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
def aggregate_features(
    *,
    root_dir: PathLike,
    pattern: str = "*.csv",
    recursive: bool = True,
    delimiter: str = ",",
    encoding: str = "utf-8-sig",
    add_source_path: bool = False,
    plan: AggregationPlan,
    out_csv: Optional[PathLike] = None,
    overwrite_existing: bool = False,
) -> Path:
    """
    Discover files, read, concatenate, and aggregate numeric columns per plan.

    This function consolidates CSVs from a single folder, filters columns,
    coerces candidate features to numeric, groups by the specified keys,
    and computes the requested statistics. Output columns for aggregated
    features are flattened with the pattern ``"{column}__{stat}"``.

    Parameters
    ----------
    root_dir : PathLike
        Folder containing per-item CSVs, or a single CSV file.
    pattern : str, default="*.csv"
        Glob pattern for selecting files.
    recursive : bool, default=True
        Recurse into subdirectories when True.
    delimiter : str, default=","
        CSV delimiter.
    encoding : str, default="utf-8-sig"
        CSV encoding for read/write.
    add_source_path : bool, default=False
        If True, include absolute path in ``"source_path"`` prior to filtering.
    plan : AggregationPlan
        Aggregation configuration (group keys, stats, filters, NA handling).
    out_csv : PathLike or None, default=None
        Output path. If None, defaults to
        ``<root_dir_parent>/<root_dir_name>.csv``.
    overwrite_existing : bool, default=False
        If False and `out_csv` exists, return it without recomputation.

    Returns
    -------
    pathlib.Path
        Path to the written CSV of aggregated features.

    Raises
    ------
    FileNotFoundError
        If no files match the pattern under `root_dir`.
    RuntimeError
        If files were found but none could be read successfully.
    ValueError
        If required group-by columns are missing,
        or if no numeric columns remain after filtering,
        or if per-file grouping is requested but the ``"source"`` column is absent.

    Notes
    -----
    Group keys are preserved as leading columns in the output. The output places
    ``"source"`` (and optionally ``"source_path"``) first when present.
    """

    root = Path(root_dir)
    if out_csv is None:
        out_csv = root.parent / f"{root.name}.csv"
    out_csv = Path(out_csv)
    out_csv.parent.mkdir(parents=True, exist_ok=True)

    if out_csv.exists() and not overwrite_existing:
        print(f"Aggregated feature output file already exists; returning existing file: {out_csv}")
        return out_csv

    files = list(_iter_csv_files(root, pattern=pattern, recursive=recursive))
    if not files:
        raise FileNotFoundError(f"No files matched {pattern} under {root}")

    frames = []
    for fp in files:
        try:
            frames.append(
                _read_csv_add_source(
                    fp,
                    delimiter=delimiter,
                    encoding=encoding,
                    add_source_path=add_source_path,
                )
            )
        except Exception as e:
            print(f"[aggregate] WARNING: failed to read {fp}: {e}")

    if not frames:
        raise RuntimeError("No CSVs could be read successfully.")

    df = pd.concat(frames, axis=0, ignore_index=True)

    # Filter columns (remove known non-feature columns, optional regex filters)
    df_f = _filter_columns(
        df,
        exclude_cols=tuple(plan.exclude_cols) + ("source_path",),
        include_regex=plan.include_regex,
        exclude_regex=plan.exclude_regex,
    )

    # Build group keys
    group_keys = list(plan.group_by)
    if plan.per_file:
        if "source" not in df_f.columns:
            raise ValueError("source column is missing; cannot group per_file.")
        group_keys = ["source"] + group_keys

    if plan.dropna:
        df_f = df_f.dropna(subset=[k for k in group_keys if k in df_f.columns], how="any")

    missing = [k for k in group_keys if k not in df_f.columns]
    if missing:
        raise ValueError(f"Missing group-by columns in data: {missing}")

    # Candidate numeric features
    feature_cols = [c for c in df_f.columns if c not in set(group_keys)]
    numeric_df = _numeric_subframe(df_f[feature_cols])
    if numeric_df.empty:
        raise ValueError("No numeric columns available for aggregation after filtering.")

    # Reattach group keys for grouping
    gdf = pd.concat([df_f[group_keys].reset_index(drop=True),
                     numeric_df.reset_index(drop=True)], axis=1)

    agg_ops = {c: list(plan.stats) for c in numeric_df.columns}
    grouped = gdf.groupby(group_keys, dropna=False).agg(agg_ops)

    # Flatten MultiIndex columns and order 'source' first
    grouped.columns = [f"{c}__{stat}" for (c, stat) in grouped.columns]
    grouped = grouped.reset_index()

    # Ensure 'source' (and 'source_path' if present) lead the output
    cols = list(grouped.columns)
    lead = [c for c in ("source", "source_path") if c in cols]
    rest = [c for c in cols if c not in lead]
    grouped = grouped[lead + rest]

    grouped.to_csv(out_csv, index=False, encoding=encoding)
    return out_csv

feature_gather

feature_gather(
    *,
    root_dir,
    pattern="*.csv",
    recursive=True,
    delimiter=",",
    encoding="utf-8-sig",
    add_source_path=False,
    aggregate=False,
    plan=None,
    group_by=None,
    per_file=True,
    stats=("mean", "std"),
    exclude_cols=(),
    include_regex=None,
    exclude_regex=None,
    dropna=True,
    out_csv=None,
    overwrite_existing=False
)

Single entry point to concatenate or aggregate feature CSVs from one folder.

If aggregate=False, CSVs are concatenated with origin metadata (see :func:gather_csvs_to_one). If aggregate=True, numeric feature columns are aggregated per the provided or constructed plan (see :func:aggregate_features).

Parameters:

Name Type Description Default
root_dir PathLike

Folder containing per-item CSVs (or a single CSV file).

required
pattern str

Glob pattern for selecting CSV files.

"*.csv"
recursive bool

Recurse into subdirectories when True.

True
delimiter str

CSV delimiter.

","
encoding str

CSV encoding.

"utf-8-sig"
add_source_path bool

If True, include a "source_path" column in outputs.

False
aggregate bool

Toggle aggregation mode. If False, files are concatenated.

False
plan AggregationPlan or None

Explicit plan for aggregation. Required if aggregate=True and group_by is not given.

None
group_by Sequence[str] or None

Quick-plan keys. Used only when aggregate=True and plan is None.

None
per_file bool

Quick-plan flag; include "source" in grouping keys to aggregate per file.

True
stats Sequence[str]

Quick-plan statistics to compute per numeric column.

("mean", "std")
exclude_cols Sequence[str]

Quick-plan columns to drop before numeric selection.

()
include_regex str or None

Quick-plan regex to include feature columns by name.

None
exclude_regex str or None

Quick-plan regex to exclude feature columns by name.

None
dropna bool

Quick-plan NA handling for group keys.

True
out_csv PathLike or None

Output CSV path. If None, defaults to <root_dir_parent>/<root_dir_name>.csv.

None
overwrite_existing bool

If False and out_csv exists, the existing path is returned without recomputation.

False

Returns:

Type Description
Path

Path to the resulting CSV.

Raises:

Type Description
ValueError

If aggregate=True and neither plan nor group_by is provided.

See Also

gather_csvs_to_one : Concatenate CSVs with origin metadata. aggregate_features : Aggregate numeric columns according to a plan.

Source code in src\taters\helpers\feature_gather.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def feature_gather(
    *,
    root_dir: PathLike,
    pattern: str = "*.csv",
    recursive: bool = True,
    delimiter: str = ",",
    encoding: str = "utf-8-sig",
    add_source_path: bool = False,
    # toggle aggregation; when True you must pass a plan (or plan_args below)
    aggregate: bool = False,
    plan: Optional[AggregationPlan] = None,
    # optional “quick plan” args (only used if plan=None and aggregate=True)
    group_by: Optional[Sequence[str]] = None,
    per_file: bool = True,
    stats: Sequence[str] = ("mean", "std"),
    exclude_cols: Sequence[str] = (),
    include_regex: Optional[str] = None,
    exclude_regex: Optional[str] = None,
    dropna: bool = True,
    # output
    out_csv: Optional[PathLike] = None,
    overwrite_existing: bool = False,
) -> Path:
    """
    Single entry point to concatenate or aggregate feature CSVs from one folder.

    If ``aggregate=False``, CSVs are concatenated with origin metadata
    (see :func:`gather_csvs_to_one`). If ``aggregate=True``, numeric feature
    columns are aggregated per the provided or constructed plan
    (see :func:`aggregate_features`).

    Parameters
    ----------
    root_dir : PathLike
        Folder containing per-item CSVs (or a single CSV file).
    pattern : str, default="*.csv"
        Glob pattern for selecting CSV files.
    recursive : bool, default=True
        Recurse into subdirectories when True.
    delimiter : str, default=","
        CSV delimiter.
    encoding : str, default="utf-8-sig"
        CSV encoding.
    add_source_path : bool, default=False
        If True, include a ``"source_path"`` column in outputs.
    aggregate : bool, default=False
        Toggle aggregation mode. If False, files are concatenated.
    plan : AggregationPlan or None, default=None
        Explicit plan for aggregation. Required if ``aggregate=True`` and
        ``group_by`` is not given.
    group_by : Sequence[str] or None, default=None
        Quick-plan keys. Used only when ``aggregate=True`` and ``plan`` is None.
    per_file : bool, default=True
        Quick-plan flag; include ``"source"`` in grouping keys to aggregate per file.
    stats : Sequence[str], default=("mean", "std")
        Quick-plan statistics to compute per numeric column.
    exclude_cols : Sequence[str], default=()
        Quick-plan columns to drop before numeric selection.
    include_regex : str or None, default=None
        Quick-plan regex to include feature columns by name.
    exclude_regex : str or None, default=None
        Quick-plan regex to exclude feature columns by name.
    dropna : bool, default=True
        Quick-plan NA handling for group keys.
    out_csv : PathLike or None, default=None
        Output CSV path. If None, defaults to
        ``<root_dir_parent>/<root_dir_name>.csv``.
    overwrite_existing : bool, default=False
        If False and `out_csv` exists, the existing path is returned without
        recomputation.

    Returns
    -------
    pathlib.Path
        Path to the resulting CSV.

    Raises
    ------
    ValueError
        If ``aggregate=True`` and neither ``plan`` nor ``group_by`` is provided.

    See Also
    --------
    gather_csvs_to_one : Concatenate CSVs with origin metadata.
    aggregate_features : Aggregate numeric columns according to a plan.
    """

    if not aggregate:
        return gather_csvs_to_one(
            root_dir=root_dir,
            pattern=pattern,
            recursive=recursive,
            delimiter=delimiter,
            encoding=encoding,
            add_source_path=add_source_path,
            out_csv=out_csv,
            overwrite_existing=overwrite_existing,
        )

    # aggregate=True
    if plan is None:
        if not group_by:
            raise ValueError("When aggregate=True, you must provide 'plan' or 'group_by'.")
        plan = make_plan(
            group_by=group_by,
            per_file=per_file,
            stats=stats,
            exclude_cols=exclude_cols,
            include_regex=include_regex,
            exclude_regex=exclude_regex,
            dropna=dropna,
        )

    return aggregate_features(
        root_dir=root_dir,
        pattern=pattern,
        recursive=recursive,
        delimiter=delimiter,
        encoding=encoding,
        add_source_path=add_source_path,
        plan=plan,
        out_csv=out_csv,
        overwrite_existing=overwrite_existing,
    )

gather_csvs_to_one

gather_csvs_to_one(
    *,
    root_dir,
    pattern="*.csv",
    recursive=True,
    delimiter=",",
    encoding="utf-8-sig",
    add_source_path=False,
    out_csv=None,
    overwrite_existing=False
)

Concatenate many CSVs into a single CSV with origin metadata.

Each input CSV is loaded (all columns as object dtype), a leading "source" column is inserted (and optionally "source_path"), and rows are appended. The final CSV ensures "source" (and, if present, "source_path") lead the column order.

Parameters:

Name Type Description Default
root_dir PathLike

Folder containing CSVs, or a single CSV file.

required
pattern str

Glob pattern for selecting files.

"*.csv"
recursive bool

Recurse into subdirectories when True.

True
delimiter str

CSV delimiter.

","
encoding str

CSV encoding for read/write.

"utf-8-sig"
add_source_path bool

If True, include absolute path in "source_path".

False
out_csv PathLike or None

Output path. If None, defaults to <root_dir_parent>/<root_dir_name>.csv.

None
overwrite_existing bool

If False and out_csv exists, return it without recomputation.

False

Returns:

Type Description
Path

Path to the written CSV.

Raises:

Type Description
FileNotFoundError

If no files match the pattern under root_dir.

RuntimeError

If files were found but none could be read successfully.

Notes

Input rows are not type-coerced beyond object dtype. Column order from inputs is preserved after the leading origin columns.

Source code in src\taters\helpers\feature_gather.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def gather_csvs_to_one(
    *,
    root_dir: PathLike,
    pattern: str = "*.csv",
    recursive: bool = True,
    delimiter: str = ",",
    encoding: str = "utf-8-sig",
    add_source_path: bool = False,
    out_csv: Optional[PathLike] = None,
    overwrite_existing: bool = False,
) -> Path:
    """
    Concatenate many CSVs into a single CSV with origin metadata.

    Each input CSV is loaded (all columns as object dtype), a leading
    ``"source"`` column is inserted (and optionally ``"source_path"``), and
    rows are appended. The final CSV ensures ``"source"`` (and, if present,
    ``"source_path"``) lead the column order.

    Parameters
    ----------
    root_dir : PathLike
        Folder containing CSVs, or a single CSV file.
    pattern : str, default="*.csv"
        Glob pattern for selecting files.
    recursive : bool, default=True
        Recurse into subdirectories when True.
    delimiter : str, default=","
        CSV delimiter.
    encoding : str, default="utf-8-sig"
        CSV encoding for read/write.
    add_source_path : bool, default=False
        If True, include absolute path in ``"source_path"``.
    out_csv : PathLike or None, default=None
        Output path. If None, defaults to
        ``<root_dir_parent>/<root_dir_name>.csv``.
    overwrite_existing : bool, default=False
        If False and `out_csv` exists, return it without recomputation.

    Returns
    -------
    pathlib.Path
        Path to the written CSV.

    Raises
    ------
    FileNotFoundError
        If no files match the pattern under `root_dir`.
    RuntimeError
        If files were found but none could be read successfully.

    Notes
    -----
    Input rows are not type-coerced beyond object dtype. Column order from
    inputs is preserved after the leading origin columns.
    """

    root = Path(root_dir)
    if out_csv is None:
        out_csv = root.parent / f"{root.name}.csv"

    out_csv = Path(out_csv)
    out_csv.parent.mkdir(parents=True, exist_ok=True)

    if out_csv.exists() and not overwrite_existing:
        print(f"Aggregated feature output file already exists; returning existing file: {out_csv}")
        return out_csv

    files = list(_iter_csv_files(root, pattern=pattern, recursive=recursive))
    if not files:
        raise FileNotFoundError(f"No files matched {pattern} under {root}")

    frames = []
    for fp in files:
        try:
            frames.append(
                _read_csv_add_source(
                    fp,
                    delimiter=delimiter,
                    encoding=encoding,
                    add_source_path=add_source_path,
                )
            )
        except Exception as e:
            print(f"[gather] WARNING: failed to read {fp}: {e}")

    if not frames:
        raise RuntimeError("No CSVs could be read successfully.")

    merged = pd.concat(frames, axis=0, ignore_index=True)

    # Ensure 'source' is first (and 'source_path' next if present)
    cols = list(merged.columns)
    if "source" in cols:
        lead = ["source"] + (["source_path"] if "source_path" in cols else [])
        rest = [c for c in cols if c not in lead]
        merged = merged[lead + rest]

    merged.to_csv(out_csv, index=False, encoding=encoding)
    return out_csv

main

main()

Entry point for the command-line interface.

Parses arguments, dispatches to :func:gather_csvs_to_one, :func:aggregate_features, or :func:feature_gather depending on the selected subcommand, and prints the resulting output path.

Notes

This function is invoked when the module is executed as a script::

python -m taters.helpers.feature_gather <subcommand> [options]
Source code in src\taters\helpers\feature_gather.py
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
def main():
    """
    Entry point for the command-line interface.

    Parses arguments, dispatches to :func:`gather_csvs_to_one`,
    :func:`aggregate_features`, or :func:`feature_gather` depending on the
    selected subcommand, and prints the resulting output path.

    Notes
    -----
    This function is invoked when the module is executed as a script::

        python -m taters.helpers.feature_gather <subcommand> [options]
    """

    parser = _build_parser()
    args = parser.parse_args()

    if args.cmd == "gather":
        out = gather_csvs_to_one(
            root_dir=args.root_dir,
            pattern=args.pattern,
            recursive=not args.no_recursive,
            delimiter=args.delimiter,
            encoding=args.encoding,
            add_source_path=args.add_source_path,
            out_csv=args.out_csv,
            overwrite_existing=args.overwrite_existing,
        )
        print(str(out))
        return

    if args.cmd == "aggregate":
        plan = AggregationPlan(
            group_by=args.group_by,
            per_file=args.per_file,
            stats=tuple(args.stats),
            exclude_cols=tuple(args.exclude_cols or []),
            include_regex=args.include_regex,
            exclude_regex=args.exclude_regex,
        )
        out = aggregate_features(
            root_dir=args.root_dir,
            pattern=args.pattern,
            recursive=not args.no_recursive,
            delimiter=args.delimiter,
            encoding=args.encoding,
            add_source_path=args.add_source_path,
            plan=plan,
            out_csv=args.out_csv,
            overwrite_existing=args.overwrite_existing,
        )
        print(str(out))
        return

    if args.cmd == "run":
        out = feature_gather(
            root_dir=args.root_dir,
            pattern=args.pattern,
            recursive=not args.no_recursive,
            delimiter=args.delimiter,
            encoding=args.encoding,
            add_source_path=args.add_source_path,
            aggregate=args.aggregate,
            group_by=args.group_by,          # may be None if aggregate=False
            per_file=args.per_file,
            stats=tuple(args.stats),
            exclude_cols=tuple(args.exclude_cols or []),
            include_regex=args.include_regex,
            exclude_regex=args.exclude_regex,
            out_csv=args.out_csv,
            overwrite_existing=args.overwrite_existing,
        )
        print(str(out))
        return

make_plan

make_plan(
    *,
    group_by,
    per_file=True,
    stats=("mean", "std"),
    exclude_cols=(),
    include_regex=None,
    exclude_regex=None,
    dropna=True
)

Create an :class:AggregationPlan from simple arguments.

Parameters:

Name Type Description Default
group_by Sequence[str]

Grouping key(s) to use (e.g., ["speaker"]).

required
per_file bool

If True, group within files by including "source" in group keys.

True
stats Sequence[str]

Statistical reductions to compute per numeric column.

("mean", "std")
exclude_cols Sequence[str]

Columns to drop prior to feature selection.

()
include_regex str or None

Regex to include feature columns by name.

None
exclude_regex str or None

Regex to exclude feature columns by name.

None
dropna bool

Drop rows with NA in any group key.

True

Returns:

Type Description
AggregationPlan

A configured plan instance for :func:aggregate_features.

Source code in src\taters\helpers\feature_gather.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def make_plan(
    *,
    group_by: Sequence[str],
    per_file: bool = True,
    stats: Sequence[str] = ("mean", "std"),
    exclude_cols: Sequence[str] = (),
    include_regex: Optional[str] = None,
    exclude_regex: Optional[str] = None,
    dropna: bool = True,
) -> AggregationPlan:
    """
    Create an :class:`AggregationPlan` from simple arguments.

    Parameters
    ----------
    group_by : Sequence[str]
        Grouping key(s) to use (e.g., ``["speaker"]``).
    per_file : bool, default=True
        If True, group within files by including ``"source"`` in group keys.
    stats : Sequence[str], default=("mean", "std")
        Statistical reductions to compute per numeric column.
    exclude_cols : Sequence[str], default=()
        Columns to drop prior to feature selection.
    include_regex : str or None, default=None
        Regex to include feature columns by name.
    exclude_regex : str or None, default=None
        Regex to exclude feature columns by name.
    dropna : bool, default=True
        Drop rows with NA in any group key.

    Returns
    -------
    AggregationPlan
        A configured plan instance for :func:`aggregate_features`.
    """

    return AggregationPlan(
        group_by=tuple(group_by),
        per_file=per_file,
        stats=tuple(stats),
        exclude_cols=tuple(exclude_cols),
        include_regex=include_regex,
        exclude_regex=exclude_regex,
        dropna=dropna,
    )

taters.helpers.find_files

find_files

find_files(
    root_dir,
    *,
    file_type="video",
    extensions=None,
    recursive=True,
    follow_symlinks=False,
    include_hidden=False,
    include_globs=None,
    exclude_globs=None,
    absolute=True,
    sort=True,
    ffprobe_verify=False
)

Discover media files under a folder using smart, FFmpeg-friendly filters.

You can either (a) choose a built-in group of extensions via file_type ("audio"|"video"|"image"|"subtitle"|"archive"|"any") or (b) pass an explicit list of extensions to match. Matching is case-insensitive; dots are optional (e.g., ".wav" and "wav" are equivalent). Hidden files and directories are excluded by default.

For audio/video, ffprobe_verify=True additionally checks that at least one corresponding stream is present (e.g., exclude MP4s with no audio when file_type="audio"). This is slower but robust when your dataset contains “container only” files. :contentReference[oaicite:0]{index=0}

Parameters:

Name Type Description Default
root_dir str | PathLike

Folder to scan.

required
file_type str

Built-in group selector. Ignored if extensions is provided.

'video'
extensions Optional[Sequence[str]]

Explicit extensions to include (e.g., [".wav",".flac"]). Overrides file_type.

None
recursive bool

Recurse into subfolders. Default: True.

True
follow_symlinks bool

Follow directory symlinks during traversal. Default: False.

False
include_hidden bool

Include dot-files and dot-dirs. Default: False.

False
include_globs Optional[Sequence[str]]

Additional glob filters applied after extension filtering; include_globs uses OR-semantics, then exclude_globs removes matches.

None
absolute bool

Return absolute paths when True (default) else relative to root_dir.

True
sort bool

Sort lexicographically (case-insensitive). Default: True.

True
ffprobe_verify bool

For audio/video, keep only files where ffprobe reports ≥1 matching stream.

False

Returns:

Type Description
list[Path]

The matched files.

Raises:

Type Description
FileNotFoundError

If root_dir does not exist.

ValueError

If file_type is not one of the supported groups.

Examples:

Find all videos (recursive), as absolute paths:

>>> find_files("dataset", file_type="video")

Use explicit extensions and keep paths relative:

>>> find_files("dataset", extensions=[".wav",".flac"], absolute=False)

Only include files matching a glob and exclude temp folders:

>>> find_files("dataset", file_type="audio",
...            include_globs=["**/*session*"], exclude_globs=["**/tmp/**"])

Verify playable audio streams exist:

>>> find_files("dataset", file_type="audio", ffprobe_verify=True)
Source code in src\taters\helpers\find_files.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def find_files(
    root_dir: str | os.PathLike,
    *,
    file_type: str = "video",                            # 'audio' | 'video' | 'image' | 'subtitle' | 'archive' | 'any'
    extensions: Optional[Sequence[str]] = None,     # explicit extensions override group (e.g., ['.wav','.flac'])
    recursive: bool = True,
    follow_symlinks: bool = False,
    include_hidden: bool = False,
    include_globs: Optional[Sequence[str]] = None,  # e.g., ['**/*session*']
    exclude_globs: Optional[Sequence[str]] = None,  # e.g., ['**/temp/**']
    absolute: bool = True,
    sort: bool = True,
    ffprobe_verify: bool = False,                   # confirm stream presence via ffprobe (audio/video only)
) -> List[Path]:
    """
    Discover media files under a folder using smart, FFmpeg-friendly filters.

    You can either (a) choose a built-in **group** of extensions via `file_type`
    (`"audio"|"video"|"image"|"subtitle"|"archive"|"any"`) or (b) pass an explicit
    list of `extensions` to match. Matching is case-insensitive; dots are optional
    (e.g., `".wav"` and `"wav"` are equivalent). Hidden files and directories are
    excluded by default.

    For audio/video, `ffprobe_verify=True` additionally checks that at least one
    corresponding stream is present (e.g., exclude MP4s with no audio when
    `file_type="audio"`). This is slower but robust when your dataset contains
    “container only” files. :contentReference[oaicite:0]{index=0}

    Parameters
    ----------
    root_dir
        Folder to scan.
    file_type
        Built-in group selector. Ignored if `extensions` is provided.
    extensions
        Explicit extensions to include (e.g., `[".wav",".flac"]`). Overrides `file_type`.
    recursive
        Recurse into subfolders. Default: `True`.
    follow_symlinks
        Follow directory symlinks during traversal. Default: `False`.
    include_hidden
        Include dot-files and dot-dirs. Default: `False`.
    include_globs / exclude_globs
        Additional glob filters applied after extension filtering; `include_globs`
        uses OR-semantics, then `exclude_globs` removes matches.
    absolute
        Return absolute paths when `True` (default) else relative to `root_dir`.
    sort
        Sort lexicographically (case-insensitive). Default: `True`.
    ffprobe_verify
        For `audio`/`video`, keep only files where `ffprobe` reports ≥1 matching
        stream.

    Returns
    -------
    list[pathlib.Path]
        The matched files.

    Raises
    ------
    FileNotFoundError
        If `root_dir` does not exist.
    ValueError
        If `file_type` is not one of the supported groups.

    Examples
    --------
    Find all videos (recursive), as absolute paths:

    >>> find_files("dataset", file_type="video")

    Use explicit extensions and keep paths relative:

    >>> find_files("dataset", extensions=[".wav",".flac"], absolute=False)

    Only include files matching a glob and exclude temp folders:

    >>> find_files("dataset", file_type="audio",
    ...            include_globs=["**/*session*"], exclude_globs=["**/tmp/**"])

    Verify playable audio streams exist:

    >>> find_files("dataset", file_type="audio", ffprobe_verify=True)
    """
    root_dir = Path(root_dir)
    if not root_dir.exists():
        raise FileNotFoundError(f"Root path not found: {root_dir}")

    if extensions:
        allowed = {_norm_ext(e) for e in extensions}
    else:
        if file_type not in GROUPS:
            raise ValueError(f"Unknown kind '{file_type}'. Choose from {', '.join(GROUPS.keys())}.")
        allowed = set(GROUPS[file_type])

    cand = (
        p for p in _iter_files(root_dir, recursive=recursive, follow_symlinks=follow_symlinks, include_hidden=include_hidden)
        if p.is_file() and _match_ext(p, allowed)
    )

    cand = _glob_filter(
        cand,
        includes=include_globs or [],
        excludes=exclude_globs or [],
    )

    out: List[Path] = []
    for p in cand:
        if ffprobe_verify and file_type in ("audio", "video"):
            if not _ffprobe_has_stream(p, file_type):
                continue
        out.append(p.resolve() if absolute else p)

    if sort:
        out.sort(key=lambda x: str(x).lower())
    return out

taters.helpers.text_gather

csv_to_analysis_ready_csv

csv_to_analysis_ready_csv(
    *,
    csv_path,
    out_csv=None,
    overwrite_existing=False,
    text_cols,
    id_cols=None,
    mode="concat",
    group_by=None,
    delimiter=None,
    encoding=DEFAULT_ENCODING,
    joiner=DEFAULT_JOINER,
    num_buckets=1024,
    max_open_bucket_files=64,
    tmp_root=None
)

Stream a (possibly huge) CSV into a compact analysis-ready CSV with a stable schema and optional external grouping.

Output schema

Always writes a header and enforces a consistent column order:

• No grouping: text_id,text (plus source_col if mode="separate") • With grouping: text_id,text,group_count (plus source_col if mode="separate")

Where: - text_id is either the composed ID from id_cols or row_<n> when id_cols=None. - mode="concat" joins all text_cols using joiner per row or group. - mode="separate" emits one row per (row_or_group, text_col) and fills source_col with the contributing column name.

Grouping at scale

If group_by is provided, the function performs a two-pass external grouping that does not require presorting: 1) Hash-partition rows to on-disk “bucket” CSVs (bounded writers with LRU). 2) Aggregate each bucket into final rows (concat or separate mode), writing group_count to record how many pieces contributed. :contentReference[oaicite:1]{index=1}

Parameters:

Name Type Description Default
csv_path PathLike

Source CSV with at least the columns in text_cols (and group_by if grouping).

required
out_csv PathLike | None

Destination CSV. If None, a name is derived from the input and options (e.g., <stem>_grouped_<group_by>.csv or <stem>_concat_<cols>.csv).

None
overwrite_existing bool

If False (default) and out_csv exists, the function returns early.

False
text_cols Sequence[str]

One or more text fields to concatenate or emit separately.

required
id_cols Sequence[str] | None

Optional columns to compose text_id when not grouping. When omitted, a synthetic row_<n> is used.

None
mode str

"concat" (default) or "separate". See schema above.

'concat'
group_by Sequence[str] | None

Optional list of columns to aggregate by; works on unsorted CSVs.

None
delimiter str | None

Parsing/formatting options. If delimiter=None, sniffs from a sample.

None
encoding str | None

Parsing/formatting options. If delimiter=None, sniffs from a sample.

None
joiner str | None

Parsing/formatting options. If delimiter=None, sniffs from a sample.

None
num_buckets int

External grouping controls (partition count, LRU limit, temp root).

1024
max_open_bucket_files int

External grouping controls (partition count, LRU limit, temp root).

1024
tmp_root int

External grouping controls (partition count, LRU limit, temp root).

1024

Returns:

Type Description
Path

Path to the analysis-ready CSV.

Raises:

Type Description
ValueError

If required columns are missing or mode is invalid.

Examples:

Concatenate two text fields per row:

>>> csv_to_analysis_ready_csv(
...     csv_path="transcripts.csv",
...     text_cols=["prompt","response"],
...     id_cols=["speaker"],
... )

Group by speaker and join rows:

>>> csv_to_analysis_ready_csv(
...     csv_path="transcripts.csv",
...     text_cols=["text"],
...     group_by=["speaker"],
... )
Source code in src\taters\helpers\text_gather.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def csv_to_analysis_ready_csv(
    *,
    csv_path: PathLike,
    out_csv: PathLike | None = None,
    overwrite_existing: bool = False,  # if the file already exists, let's not overwrite by default
    text_cols: Sequence[str],
    id_cols: Sequence[str] | None = None,
    mode: str = "concat",                 # "concat" or "separate"
    group_by: Sequence[str] | None = None,
    delimiter: str | None = None,
    encoding: str = DEFAULT_ENCODING,
    joiner: str = DEFAULT_JOINER,
    # external grouping params
    num_buckets: int = 1024,               # tune up if many groups / very large file
    max_open_bucket_files: int = 64,      # file descriptor cap (LRU)
    tmp_root: PathLike | None = None,     # where to place partitions (default: system tmp)
) -> Path:
    """
    Stream a (possibly huge) CSV into a compact **analysis-ready** CSV with a
    stable schema and optional external grouping.

    Output schema
    -------------
    Always writes a header and enforces a consistent column order:

    • No grouping:
        `text_id,text`                            (plus `source_col` if `mode="separate"`)
    • With grouping:
        `text_id,text,group_count`                (plus `source_col` if `mode="separate"`)

    Where:
      - `text_id` is either the composed ID from `id_cols` or `row_<n>` when
        `id_cols=None`.
      - `mode="concat"` joins all `text_cols` using `joiner` per row or group.
      - `mode="separate"` emits one row per (`row_or_group`, `text_col`) and
        fills `source_col` with the contributing column name.

    Grouping at scale
    -----------------
    If `group_by` is provided, the function performs a **two-pass external
    grouping** that does not require presorting:
      1) Hash-partition rows to on-disk “bucket” CSVs (bounded writers with LRU).
      2) Aggregate each bucket into final rows (concat or separate mode), writing
         `group_count` to record how many pieces contributed. :contentReference[oaicite:1]{index=1}

    Parameters
    ----------
    csv_path
        Source CSV with at least the columns in `text_cols` (and `group_by` if
        grouping).
    out_csv
        Destination CSV. If `None`, a name is derived from the input and options
        (e.g., `<stem>_grouped_<group_by>.csv` or `<stem>_concat_<cols>.csv`).
    overwrite_existing
        If `False` (default) and `out_csv` exists, the function returns early.
    text_cols
        One or more text fields to concatenate or emit separately.
    id_cols
        Optional columns to compose `text_id` when not grouping. When omitted, a
        synthetic `row_<n>` is used.
    mode
        `"concat"` (default) or `"separate"`. See schema above.
    group_by
        Optional list of columns to aggregate by; works on unsorted CSVs.
    delimiter, encoding, joiner
        Parsing/formatting options. If `delimiter=None`, sniffs from a sample.
    num_buckets, max_open_bucket_files, tmp_root
        External grouping controls (partition count, LRU limit, temp root).

    Returns
    -------
    Path
        Path to the analysis-ready CSV.

    Raises
    ------
    ValueError
        If required columns are missing or `mode` is invalid.

    Examples
    --------
    Concatenate two text fields per row:

    >>> csv_to_analysis_ready_csv(
    ...     csv_path="transcripts.csv",
    ...     text_cols=["prompt","response"],
    ...     id_cols=["speaker"],
    ... )

    Group by speaker and join rows:

    >>> csv_to_analysis_ready_csv(
    ...     csv_path="transcripts.csv",
    ...     text_cols=["text"],
    ...     group_by=["speaker"],
    ... )
    """
    in_path = _ensure_path(csv_path)

    # Detect delimiter if not provided
    if delimiter is None:
        with in_path.open("rb") as fb:
            sample = fb.read(8192)
        delimiter = _detect_delimiter(sample, default=DEFAULT_DELIM)

    text_cols = list(text_cols)
    if not text_cols:
        raise ValueError("text_cols must be non-empty")
    mode = mode.strip().lower()
    if mode not in ("concat", "separate"):
        raise ValueError("mode must be 'concat' or 'separate'")

    include_source_col = (mode == "separate")
    include_source_path = False  # this function deals with CSV; folder variant uses this flag

    # Decide output path (default next to input if not specified)
    out_path = _ensure_path(out_csv) if out_csv is not None else _default_csv_out_path(
        in_csv=in_path, mode=mode, text_cols=text_cols, group_by=group_by)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    if not overwrite_existing and Path(out_path).is_file():
        print("File with gathered text already exists; returning existing file.")
        return out_path

    # If no grouping, we can stream straight to the output
    if not group_by:
        writer, fh, _ = _open_out_csv(out_path, include_source_col, include_source_path)
        try:
            with in_path.open("r", newline="", encoding=encoding) as f:
                rdr = csv.DictReader(f, delimiter=delimiter)
                headers = rdr.fieldnames or []
                missing = [c for c in (id_cols or []) + text_cols if c not in headers]
                if missing:
                    raise ValueError(f"Missing columns: {missing}. Make sure that you try specifying a delimiter manually if you see this error message.")

                for idx, row in enumerate(rdr, start=1):
                    text_id = _compose_id([row.get(c, "") for c in (id_cols or [])]) if id_cols else f"row_{idx}"
                    if mode == "concat":
                        parts = [row.get(c, "") for c in text_cols if row.get(c, "")]
                        if not parts:
                            continue
                        writer.writerow([text_id, joiner.join(parts)])
                    else:
                        for col in text_cols:
                            val = row.get(col, "")
                            if not val:
                                continue
                            writer.writerow([text_id, val, col])
        finally:
            fh.close()
        return out_path

    # Otherwise, do external grouping (two-pass)
    group_by = list(group_by)

    # Phase 1: partition into hash buckets
    tmp_base = Path(tempfile.mkdtemp(prefix="gather_partitions_", dir=str(tmp_root) if tmp_root else None))
    part_dir = tmp_base / "parts"
    part_dir.mkdir(parents=True, exist_ok=True)

    # Bucket writer cache
    header_small = group_by + text_cols
    cache = _LRUFileCache(
        max_open=max_open_bucket_files,
        newline="",
        encoding=encoding,
        delimiter=delimiter,             # ← NEW
    )


    try:
        with in_path.open("r", newline="", encoding=encoding) as f:
            rdr = csv.DictReader(f, delimiter=delimiter)
            headers = rdr.fieldnames or []
            missing = [c for c in group_by + text_cols if c not in headers]
            if missing:
                raise ValueError(f"Missing columns: {missing}. Make sure that you try specifying a delimiter manually if you see this error message.")

            for row in rdr:
                key_tuple = tuple(row[g] for g in group_by)
                bucket = _bucket_of_key(key_tuple, num_buckets)
                bpath = part_dir / f"bucket_{bucket:05d}.csv"
                w = cache.get(bucket, bpath, header_small)
                # write only needed fields to keep partitions lean
                w.writerow([row.get(c, "") for c in header_small])
    finally:
        cache.close_all()

    # Phase 2: per-bucket aggregation → final writer
    writer, out_fh, _ = _open_out_csv(
                                        out_path,
                                        include_source_col,
                                        include_source_path,
                                        include_group_count=True,
                                    )

    try:
        for bfile in sorted(part_dir.glob("bucket_*.csv")):
            # Aggregate this bucket in memory
            if mode == "concat":
                # key -> list[text]
                agg: Dict[Tuple[str, ...], List[str]] = {}
                with bfile.open("r", newline="", encoding=encoding) as bf:
                    br = csv.DictReader(bf, delimiter=delimiter)
                    for row in br:
                        key = tuple(row[g] for g in group_by)
                        parts = [row.get(c, "") for c in text_cols if row.get(c, "")]
                        if not parts:
                            continue
                        agg.setdefault(key, []).append(joiner.join(parts))
                # Emit
                for key, pieces in agg.items():
                    text_id = _compose_id(key) or "group"
                    writer.writerow([text_id, joiner.join(pieces), len(pieces)])  # <- add count
            else:
                # key -> col -> list[text]
                agg: Dict[Tuple[str, ...], Dict[str, List[str]]] = {}
                with bfile.open("r", newline="", encoding=encoding) as bf:
                    br = csv.DictReader(bf, delimiter=delimiter)
                    for row in br:
                        key = tuple(row[g] for g in group_by)
                        box = agg.setdefault(key, {})
                        for col in text_cols:
                            val = row.get(col, "")
                            if val:
                                box.setdefault(col, []).append(val)
                # Emit
                for key, per_col in agg.items():
                    text_id = _compose_id(key) or "group"
                    for col in text_cols:
                        vals = per_col.get(col, [])
                        if not vals:
                            continue
                        writer.writerow([text_id, joiner.join(vals), len(vals), col])  # <- count before source_col

    finally:
        out_fh.close()
        # Clean up partitions
        try:
            for p in part_dir.glob("bucket_*.csv"):
                p.unlink(missing_ok=True)
            part_dir.rmdir()
            tmp_base.rmdir()
        except Exception:
            pass

    return out_path

txt_folder_to_analysis_ready_csv

txt_folder_to_analysis_ready_csv(
    *,
    root_dir,
    out_csv=None,
    recursive=False,
    pattern="*.txt",
    encoding="utf-8",
    id_from="stem",
    include_source_path=True,
    overwrite_existing=False
)

Stream a folder of .txt files into an analysis-ready CSV with predictable, reproducible IDs.

For each file matching pattern, the emitted row contains: - text_id: the basename (stem), full filename, or relative path (see id_from), and - text: the file contents. - source_path: optional column with path relative to root_dir.

Parameters:

Name Type Description Default
root_dir PathLike

Folder containing .txt files.

required
out_csv PathLike | None

Destination CSV. If None, a descriptive default is created next to root_dir (e.g., <folder>_txt_recursive_*.csv).

None
recursive bool

Recurse into subfolders. Default: False.

False
pattern str

Glob for matching text files. Default: "*.txt".

'*.txt'
encoding str

File decoding. Default: "utf-8".

'utf-8'
id_from str

How to derive text_id: "stem" (basename without extension), "name" (filename), or "path" (relative path).

'stem'
include_source_path bool

If True (default), add a source_path column showing the relative path.

True
overwrite_existing bool

If False (default) and out_csv exists, returns the existing file.

False

Returns:

Type Description
Path

Path to the analysis-ready CSV.

Examples:

>>> txt_folder_to_analysis_ready_csv(root_dir="notes", recursive=True, id_from="path")
Source code in src\taters\helpers\text_gather.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def txt_folder_to_analysis_ready_csv(
    *,
    root_dir: PathLike,
    out_csv: PathLike | None = None,
    recursive: bool = False,
    pattern: str = "*.txt",
    encoding: str = "utf-8",
    id_from: str = "stem",            # "stem" | "name" | "path"
    include_source_path: bool = True, # writes 'source_path' column
    overwrite_existing: bool = False  # if the file already exists, let's not overwrite by default

) -> Path:
    """
    Stream a folder of `.txt` files into an analysis-ready CSV with predictable,
    reproducible IDs.

    For each file matching `pattern`, the emitted row contains:
      - `text_id`: the basename (stem), full filename, or relative path (see
        `id_from`), and
      - `text`: the file contents.
      - `source_path`: optional column with path relative to `root_dir`.

    Parameters
    ----------
    root_dir
        Folder containing `.txt` files.
    out_csv
        Destination CSV. If `None`, a descriptive default is created next to
        `root_dir` (e.g., `<folder>_txt_recursive_*.csv`).
    recursive
        Recurse into subfolders. Default: `False`.
    pattern
        Glob for matching text files. Default: `"*.txt"`.
    encoding
        File decoding. Default: `"utf-8"`.
    id_from
        How to derive `text_id`: `"stem"` (basename without extension),
        `"name"` (filename), or `"path"` (relative path).
    include_source_path
        If `True` (default), add a `source_path` column showing the relative path.
    overwrite_existing
        If `False` (default) and `out_csv` exists, returns the existing file.

    Returns
    -------
    Path
        Path to the analysis-ready CSV.

    Examples
    --------
    >>> txt_folder_to_analysis_ready_csv(root_dir="notes", recursive=True, id_from="path")
    """
    root = _ensure_path(root_dir)
    out_path = _ensure_path(out_csv) if out_csv is not None else _default_txt_out_path(
        root, id_from=id_from, recursive=recursive, pattern=pattern)
    out_path.parent.mkdir(parents=True, exist_ok=True)

    if not overwrite_existing and Path(out_path).is_file():
        print("File with gathered text already exists; returning existing file.")
        return out_path

    writer, fh, _ = _open_out_csv(out_path, include_source_col=False, include_source_path=include_source_path)
    try:
        files = root.rglob(pattern) if recursive else root.glob(pattern)
        for p in files:
            if not p.is_file():
                continue
            if id_from == "stem":
                text_id = p.stem
            elif id_from == "name":
                text_id = p.name
            elif id_from == "path":
                text_id = str(p.relative_to(root))
            else:
                raise ValueError("id_from must be 'stem', 'name', or 'path'")
            text = p.read_text(encoding=encoding, errors="ignore")
            if include_source_path:
                writer.writerow([text_id, text, str(p.relative_to(root))])
            else:
                writer.writerow([text_id, text])
    finally:
        fh.close()

    return out_path