Skip to content

Audio Modules

convert_audio_to_wav

convert_audio_to_wav(
    input_path,
    *,
    output_path=None,
    output_dir=None,
    sample_rate=16000,
    bit_depth=16,
    channels=1,
    overwrite_existing=False
)

Convert any FFmpeg-readable audio/video file to a linear PCM WAV.

Parameters:

Name Type Description Default
input_path str | Path

Source media file (audio or video container). FFmpeg must be able to read it.

required
output_path str | Path | None

Target WAV path. If None, defaults to <cwd>/audio/<input_stem>.wav.

None
sample_rate int

Desired sample rate (Hz).

16000
bit_depth (16, 24, 32)

Output PCM bit depth; maps to pcm_s{bit_depth}le codec.

16,24,32
channels int | None

If provided, set number of output channels (e.g., 1=mono, 2=stereo). If None, keep original channel count.

1
overwrite_existing bool

Overwrite output_path if it already exists.

False

Returns:

Type Description
Path

Path to the written WAV file.

Raises:

Type Description
FileNotFoundError

If input_path does not exist.

RuntimeError

If FFmpeg/FFprobe are missing or the conversion fails.

Notes
  • Video inputs are supported: the audio stream is extracted and converted.
  • For multi-channel sources and channels is None, channel layout is preserved.
  • We run FFmpeg with -nostdin to avoid TTY issues in pipelines.
Source code in src\taters\audio\convert_to_wav.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def convert_audio_to_wav(
    input_path: Union[str, Path],
    *,
    output_path: Optional[Union[str, Path]] = None,
    output_dir: Optional[Union[str, Path]] = None,
    sample_rate: int = 16000,          # common for ASR
    bit_depth: int = 16,               # 16/24/32 signed PCM
    channels: int = 1,                 # 1=mono, 2=stereo
    overwrite_existing: bool = False,  # if the file already exists, let's not overwrite by default
) -> Path:
    """
    Convert any FFmpeg-readable audio/video file to a linear PCM WAV.

    Parameters
    ----------
    input_path : str | Path
        Source media file (audio or video container). FFmpeg must be able to read it.
    output_path : str | Path | None, optional
        Target WAV path. If None, defaults to
        ``<cwd>/audio/<input_stem>.wav``.
    sample_rate : int, default 16000
        Desired sample rate (Hz).
    bit_depth : {16,24,32}, default 16
        Output PCM bit depth; maps to ``pcm_s{bit_depth}le`` codec.
    channels : int | None, default 1
        If provided, set number of output channels (e.g., 1=mono, 2=stereo).
        If None, keep original channel count.
    overwrite_existing : bool, default False
        Overwrite `output_path` if it already exists.

    Returns
    -------
    Path
        Path to the written WAV file.

    Raises
    ------
    FileNotFoundError
        If `input_path` does not exist.
    RuntimeError
        If FFmpeg/FFprobe are missing or the conversion fails.

    Notes
    -----
    - Video inputs are supported: the audio stream is extracted and converted.
    - For multi-channel sources and `channels is None`, channel layout is preserved.
    - We run FFmpeg with ``-nostdin`` to avoid TTY issues in pipelines.
    """

    _check_ffmpeg()

    in_path = Path(input_path).resolve()
    if not in_path.exists():
        raise FileNotFoundError(f"Input file not found: {in_path}")

    if output_path and output_dir:
        raise ValueError("Provide at most one of output_path or output_dir.")

    if output_path:
        out_path = Path(output_path).resolve()
    else:
        base = in_path.stem + ".wav"
        out_dir = Path(output_dir).resolve() if output_dir else Path.cwd() / "audio"
        out_dir.mkdir(parents=True, exist_ok=True)
        out_path = out_dir / base

    if not overwrite_existing and Path(out_path).is_file():
        print("WAV file already exists; returning existing file.")
        return out_path

    pcm_map = {16: "pcm_s16le", 24: "pcm_s24le", 32: "pcm_s32le"}
    if bit_depth not in pcm_map:
        raise ValueError("bit_depth must be one of {16, 24, 32}.")
    if channels not in (1, 2):
        raise ValueError("channels must be 1 (mono) or 2 (stereo).")

    cmd = [
        "ffmpeg",
        "-nostdin",
        "-hide_banner", "-loglevel", "error",
        "-y" if overwrite_existing else "-n",
        "-i", str(in_path),
        "-vn",                        # ignore video
        "-acodec", pcm_map[bit_depth],
        "-ar", str(sample_rate),
        "-ac", str(channels),
        str(out_path),
    ]

    result = subprocess.run(cmd, capture_output=True, text=True, stdin=subprocess.DEVNULL)
    if result.returncode != 0:
        if not overwrite_existing and out_path.exists():
            raise FileExistsError(f"Target exists (use overwrite=True): {out_path}")
        raise RuntimeError(f"ffmpeg failed: {result.stderr.strip()}")

    return out_path

Thin CLI shim for the vendored Whisper diarization wrapper.

This module exists so you can run:

python -m taters.audio.diarize_with_thirdparty --audio_path ...

It simply delegates to the real implementation in taters/audio/diarizer/whisper_diar_wrapper.py. :contentReference[oaicite:0]{index=0}

Extract all audio streams from a video/container into standalone WAV files.

This utility probes the container with ffprobe, lists audio streams (with index and tags), and then maps each stream with ffmpeg to a separate PCM WAV. It is useful for multi-track recordings (e.g., Zoom, OBS, ProRes with stems). :contentReference[oaicite:1]{index=1}

split_audio_streams_to_wav

split_audio_streams_to_wav(
    input_path,
    output_dir=None,
    sample_rate=48000,
    bit_depth=16,
    overwrite=True,
)

Extract each audio stream in a container to its own WAV file.

Parameters:

Name Type Description Default
input_path str | PathLike

Video or audio container readable by FFmpeg.

required
output_dir str | PathLike | None

Destination directory. If None, defaults to ./audio in the current working directory (predictable write location).

None
sample_rate int

Target sample rate for the output WAVs (Hz).

48000
bit_depth (16, 24, 32)

Output PCM bit depth (little-endian).

16,24,32
overwrite bool

If True, overwrite existing files. If False and a target exists, raises :class:FileExistsError.

True

Returns:

Type Description
list[str]

Absolute paths to the created WAVs.

Behavior
  • Output file names are constructed from the input base name and stream metadata: <stem>_a<index>[_<lang>][_<title>].wav with safe slugs.
  • Uses -map 0:a:<N> to select the N-th audio stream in the container.
  • Runs FFmpeg with -nostdin and quiet loglevel to avoid TTY lockups.

Examples:

>>> split_audio_streams_to_wav("session.mp4")
['.../audio/session_a0_eng.wav', '.../audio/session_a1_eng.wav']
Source code in src\taters\audio\extract_wav_from_video.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def split_audio_streams_to_wav(
    input_path: str | os.PathLike,
    output_dir: str | os.PathLike | None = None,     # <-- now optional
    sample_rate: int = 48000,
    bit_depth: int = 16,
    overwrite: bool = True,
) -> List[str]:
    """
    Extract each audio stream in a container to its own WAV file.

    Parameters
    ----------
    input_path : str | os.PathLike
        Video or audio container readable by FFmpeg.
    output_dir : str | os.PathLike | None, optional
        Destination directory. If None, defaults to ``./audio`` in the current
        working directory (predictable write location).
    sample_rate : int, default 48000
        Target sample rate for the output WAVs (Hz).
    bit_depth : {16,24,32}, default 16
        Output PCM bit depth (little-endian).
    overwrite : bool, default True
        If True, overwrite existing files. If False and a target exists,
        raises :class:`FileExistsError`.

    Returns
    -------
    list[str]
        Absolute paths to the created WAVs.

    Behavior
    --------
    - Output file names are constructed from the input base name and stream
      metadata: ``<stem>_a<index>[_<lang>][_<title>].wav`` with safe slugs.
    - Uses ``-map 0:a:<N>`` to select the N-th audio stream in the container.
    - Runs FFmpeg with ``-nostdin`` and quiet loglevel to avoid TTY lockups.

    Examples
    --------
    >>> split_audio_streams_to_wav("session.mp4")
    ['.../audio/session_a0_eng.wav', '.../audio/session_a1_eng.wav']
    """

    _check_binaries()

    in_path = Path(input_path)
    if not in_path.exists():
        raise FileNotFoundError(f"Input file not found: {in_path}")

    # Default predictable location when none is provided
    if output_dir is None:
        out_dir = Path.cwd() / "audio"
    else:
        out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    print(f"Extracting audio streams from {in_path} to {out_dir} at {sample_rate} Hz, bit depth: {bit_depth}")

    streams = _probe_audio_streams(in_path)
    if not streams:
        raise ValueError("No audio streams found in input.")

    pcm_fmt_map = {16: "pcm_s16le", 24: "pcm_s24le", 32: "pcm_s32le"}
    if bit_depth not in pcm_fmt_map:
        raise ValueError("bit_depth must be one of {16, 24, 32}.")
    pcm_codec = pcm_fmt_map[bit_depth]

    created_files: List[str] = []
    base = in_path.stem

    for s in streams:
        idx = s.get("index")
        tags = s.get("tags", {}) or {}
        lang = tags.get("language")
        title = tags.get("title")

        print(f"Extracting audio stream:\n"
              f"index: {idx}\n"
              f"tags: {tags}\n"
              f"language: {lang}\n"
              f"title: {title}\n")

        out_name = _build_wav_name(base, idx, lang, title)
        out_path = out_dir / out_name

        ffmpeg_cmd = [
            "ffmpeg",
            "-nostdin",
            "-hide_banner",
            "-loglevel", "error",
            "-y" if overwrite else "-n",
            "-i", str(in_path),
            "-map", f"0:a:{streams.index(s)}",  # Nth audio stream
            "-acodec", pcm_codec,
            "-ar", str(sample_rate),
            str(out_path),
        ]

        result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, stdin=subprocess.DEVNULL)
        if result.returncode != 0:
            if not overwrite and out_path.exists():
                raise FileExistsError(f"Target exists (use overwrite=True): {out_path}")
            raise RuntimeError(f"ffmpeg failed for stream {idx}: {result.stderr.strip()}")

        created_files.append(str(out_path))

    return created_files

High-level, environment-safe wrapper for exporting Whisper encoder embeddings.

This module provides a single entry point, :func:extract_whisper_embeddings, which (by default) launches a subprocess to extract embeddings using a dedicated worker module. The subprocess approach avoids CUDA/Torch collisions with other parts of your pipeline.

Two modes are supported:

1) Transcript-driven mode Pass transcript_csv to compute one embedding vector per transcript row (e.g., per diarized segment). The output is a CSV with columns start_time,end_time,speaker,e0..e{D-1}.

2) General-audio mode Omit transcript_csv to analyze the raw WAV. You can segment by fixed windows or by non-silent regions; optionally aggregate to a single mean row.

extract_whisper_embeddings

extract_whisper_embeddings(
    *,
    source_wav,
    transcript_csv=None,
    time_unit="auto",
    strategy="windows",
    window_s=30.0,
    hop_s=15.0,
    min_seg_s=1.0,
    top_db=30.0,
    aggregate="none",
    output_dir=None,
    overwrite_existing=False,
    model_name="base",
    device="auto",
    compute_type="float16",
    run_in_subprocess=True,
    extra_env=None,
    verbose=True,
    extractor_module="taters.audio.extract_whisper_embeddings_subproc"
)

Export Whisper encoder embeddings to a CSV file, using a subprocess by default.

Parameters:

Name Type Description Default
source_wav str | Path

Path to the input WAV. Must be readable by librosa.

required
transcript_csv str | Path | None

If provided, enables transcript-driven mode. The CSV is expected to contain timestamp columns and (optionally) a speaker column. A row is emitted per transcript segment.

None
time_unit ('auto', 'ms', 's', 'samples')

How to interpret timestamps in transcript_csv. In "auto", the worker heuristically infers the unit from max end time vs audio duration.

"auto","ms","s","samples"
strategy ('windows', 'nonsilent')

General-audio mode only. "windows" uses fixed sized windows with overlap; "nonsilent" uses an energy-based splitter (librosa.effects.split).

"windows","nonsilent"
window_s float

General-audio mode only. Window length and hop (seconds).

30.0, 15.0
hop_s float

General-audio mode only. Window length and hop (seconds).

30.0, 15.0
min_seg_s float

General-audio mode only. Skip segments shorter than this many seconds.

1.0
top_db float

General-audio mode only ("nonsilent"). Threshold (dB) below reference to consider as silence. Smaller → more segments; larger → fewer.

30.0
aggregate ('none', 'mean')

General-audio mode only. If "mean", a single pooled row is written covering the entire file; otherwise one row per segment.

"none","mean"
output_dir str | Path | None

Directory for the output CSV. If None, defaults to ./features/whisper-embeddings.

None
model_name str

Model identifier passed through to the worker (e.g., "tiny", "base", "small", "large-v3" or a local CTranslate2 model directory).

"base"
device ('auto', 'cuda', 'cpu')

Runtime device. If "cpu", environment variables are set to disable CUDA in the child process.

"auto","cuda","cpu"
compute_type str

CTranslate2 compute type (e.g., "float16", "int8", "float32"); passed to the worker module.

"float16"
run_in_subprocess bool

If True (recommended), runs extraction in a separate Python process to isolate Torch/CUDA state from the parent process.

True
extra_env dict | None

Additional environment variables to inject into the child process.

None
verbose bool

If True, print the launched command and the child's stdout.

True
extractor_module str

Dotted module path whose __main__ implements the extractor CLI.

"chopshop.audio.extract_whisper_embeddings_subproc"

Returns:

Type Description
Path

Path to the written embeddings CSV. Pattern: <output_dir>/<source_stem>_embeddings.csv.

Notes
  • The subprocess writes and exits. The parent returns once the file exists.
  • If transcript_csv is supplied, the worker runs in transcript mode; otherwise general-audio mode is used with the given segmentation strategy.
  • Failures in the child process are re-raised with the captured stdout/stderr to ease debugging.

Examples:

Transcript per-segment embeddings:

>>> extract_whisper_embeddings(
...     source_wav="audio/session.wav",
...     transcript_csv="transcripts/session.csv",
...     time_unit="ms",
...     model_name="small",
...     device="cuda",
... )

Whole-file mean embedding:

>>> extract_whisper_embeddings(
...     source_wav="audio/session.wav",
...     strategy="nonsilent",
...     aggregate="mean",
...     output_dir="features/whisper-embeddings",
... )
Source code in src\taters\audio\extract_whisper_embeddings.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def extract_whisper_embeddings(
    *,
    # required
    source_wav: Union[str, Path],

    # optional transcript-driven mode
    transcript_csv: Optional[Union[str, Path]] = None,
    time_unit: Literal["auto", "ms", "s", "samples"] = "auto",

    # general-audio mode (used when transcript_csv is None)
    strategy: Literal["windows", "nonsilent"] = "windows",
    window_s: float = 30.0,
    hop_s: float = 15.0,
    min_seg_s: float = 1.0,
    top_db: float = 30.0,
    aggregate: Literal["none", "mean"] = "none",

    # outputs
    output_dir: Optional[Union[str, Path]] = None,
    overwrite_existing: bool = False,  # if the file already exists, let's not overwrite by default

    # model/runtime
    model_name: str = "base",
    device: Literal["auto", "cuda", "cpu"] = "auto",
    compute_type: str = "float16",

    # execution strategy
    run_in_subprocess: bool = True,
    extra_env: Optional[dict] = None,
    verbose: bool = True,

    # where the extractor lives (python -m <module>)
    extractor_module: str = "taters.audio.extract_whisper_embeddings_subproc",
) -> Path:
    """
    Export Whisper encoder embeddings to a CSV file, using a subprocess by default.

    Parameters
    ----------
    source_wav : str | Path
        Path to the input WAV. Must be readable by `librosa`.
    transcript_csv : str | Path | None, optional
        If provided, enables transcript-driven mode. The CSV is expected to contain
        timestamp columns and (optionally) a speaker column. A row is emitted per
        transcript segment.
    time_unit : {"auto","ms","s","samples"}, default "auto"
        How to interpret timestamps in `transcript_csv`. In "auto", the worker
        heuristically infers the unit from max end time vs audio duration.
    strategy : {"windows","nonsilent"}, default "windows"
        General-audio mode only. "windows" uses fixed sized windows with overlap;
        "nonsilent" uses an energy-based splitter (librosa.effects.split).
    window_s, hop_s : float, default 30.0, 15.0
        General-audio mode only. Window length and hop (seconds).
    min_seg_s : float, default 1.0
        General-audio mode only. Skip segments shorter than this many seconds.
    top_db : float, default 30.0
        General-audio mode only ("nonsilent"). Threshold (dB) below reference to
        consider as silence. Smaller → more segments; larger → fewer.
    aggregate : {"none","mean"}, default "none"
        General-audio mode only. If "mean", a single pooled row is written covering
        the entire file; otherwise one row per segment.
    output_dir : str | Path | None, optional
        Directory for the output CSV. If None, defaults to
        ``./features/whisper-embeddings``.
    model_name : str, default "base"
        Model identifier passed through to the worker (e.g., "tiny", "base",
        "small", "large-v3" or a local CTranslate2 model directory).
    device : {"auto","cuda","cpu"}, default "auto"
        Runtime device. If "cpu", environment variables are set to disable CUDA
        in the child process.
    compute_type : str, default "float16"
        CTranslate2 compute type (e.g., "float16", "int8", "float32"); passed to
        the worker module.
    run_in_subprocess : bool, default True
        If True (recommended), runs extraction in a separate Python process to
        isolate Torch/CUDA state from the parent process.
    extra_env : dict | None, optional
        Additional environment variables to inject into the child process.
    verbose : bool, default True
        If True, print the launched command and the child's stdout.
    extractor_module : str, default "chopshop.audio.extract_whisper_embeddings_subproc"
        Dotted module path whose ``__main__`` implements the extractor CLI.

    Returns
    -------
    Path
        Path to the written embeddings CSV. Pattern:
        ``<output_dir>/<source_stem>_embeddings.csv``.

    Notes
    -----
    - The subprocess writes and exits. The parent returns once the file exists.
    - If `transcript_csv` is supplied, the worker runs in transcript mode; otherwise
      general-audio mode is used with the given segmentation strategy.
    - Failures in the child process are re-raised with the captured stdout/stderr
      to ease debugging.

    Examples
    --------
    Transcript per-segment embeddings:

    >>> extract_whisper_embeddings(
    ...     source_wav="audio/session.wav",
    ...     transcript_csv="transcripts/session.csv",
    ...     time_unit="ms",
    ...     model_name="small",
    ...     device="cuda",
    ... )

    Whole-file mean embedding:

    >>> extract_whisper_embeddings(
    ...     source_wav="audio/session.wav",
    ...     strategy="nonsilent",
    ...     aggregate="mean",
    ...     output_dir="features/whisper-embeddings",
    ... )
    """

    source_wav = Path(source_wav).resolve()
    # default to ./features/whisper-embeddings when not provided
    out_dir_final = (
        Path(output_dir).resolve()
        if output_dir
        else (Path.cwd() / "features" / "whisper-embeddings")
    )

    out_dir_final.mkdir(parents=True, exist_ok=True)
    output_csv = out_dir_final / f"{source_wav.stem}_embeddings.csv"

    if not overwrite_existing and Path(output_csv).is_file():
        print("Whisper embedding feature output file already exists; returning existing file.")
        return output_csv

    if not run_in_subprocess:
        # ---- In-process path (only when you’re sure no Torch/CUDA conflicts) ----
        from ..audio.extract_whisper_embeddings import (  # type: ignore
            export_segment_embeddings_csv,
            export_audio_embeddings_csv,
            EmbedConfig,
        )
        cfg = EmbedConfig(model_name=model_name, device=device, compute_type=compute_type, time_unit=time_unit)
        if transcript_csv is not None:
            transcript_csv = Path(transcript_csv).resolve()
            return Path(
                export_segment_embeddings_csv(
                    transcript_csv=transcript_csv,
                    source_wav=source_wav,
                    output_dir=out_dir_final,
                    config=cfg,
                )
            )
        else:
            return Path(
                export_audio_embeddings_csv(
                    source_wav=source_wav,
                    output_dir=out_dir_final,
                    config=cfg,
                    strategy=strategy,
                    window_s=window_s,
                    hop_s=hop_s,
                    min_seg_s=min_seg_s,
                    top_db=top_db,
                    aggregate=aggregate,
                )
            )

    # ---- Subprocess path (recommended) ----
    env = os.environ.copy()
    # Keep Transformers from importing heavy backends in the child
    env.setdefault("TRANSFORMERS_NO_TORCH", "1")
    env.setdefault("TRANSFORMERS_NO_TF", "1")
    env.setdefault("TRANSFORMERS_NO_FLAX", "1")

    if extra_env:
        env.update({k: str(v) for k, v in extra_env.items()})

    if device == "cpu":
        # Make sure the child won’t try CUDA
        env.update({"CUDA_VISIBLE_DEVICES": "", "USE_CUDA": "0", "FORCE_CPU": "1"})
    else:
        # Best-effort: prepend cuDNN wheel's lib dir if available
        try:
            import nvidia.cudnn, pathlib  # type: ignore
            cudnn_lib = str(pathlib.Path(nvidia.cudnn.__file__).with_name("lib"))
            env["LD_LIBRARY_PATH"] = cudnn_lib + ":" + env.get("LD_LIBRARY_PATH", "")
        except Exception:
            pass

    cmd = [
        sys.executable, "-m", extractor_module,
        "--source_wav", str(source_wav),
        "--output_dir", str(out_dir_final),
        "--model_name", model_name,
        "--device", device,
        "--compute_type", compute_type,
    ]

    if transcript_csv is not None:
        transcript_csv = Path(transcript_csv).resolve()
        cmd += ["--transcript_csv", str(transcript_csv), "--time_unit", time_unit]
    else:
        cmd += [
            "--strategy", strategy,
            "--window_s", str(window_s),
            "--hop_s", str(hop_s),
            "--min_seg_s", str(min_seg_s),
            "--top_db", str(top_db),
            "--aggregate", aggregate,
        ]

    if verbose:
        print("Launching embedding subprocess:")
        print(" ", shlex.join(cmd))

    try:
        res = subprocess.run(cmd, check=True, env=env, capture_output=True, text=True, stdin=subprocess.DEVNULL)
        if verbose and res.stdout:
            print(res.stdout.strip())
    except subprocess.CalledProcessError as e:
        raise RuntimeError(
            f"Embedding subprocess failed with code {e.returncode}\n"
            f"CMD: {shlex.join(cmd)}\n"
            f"STDOUT:\n{(e.stdout or '').strip()}\n\n"
            f"STDERR:\n{(e.stderr or '').strip()}"
        ) from e

    if not output_csv.exists():
        raise FileNotFoundError(f"Expected embeddings CSV not found: {output_csv}")

    if verbose:
        print(f"Embeddings CSV written to: {output_csv}")

    return output_csv

Subprocess worker that computes Whisper encoder embeddings.

This module is meant to be executed with python -m ... by the wrapper in extract_whisper_embeddings.py. It avoids importing heavyweight torch packages in the parent process and keeps CUDA state isolated.

Two entry functions implement I/O and shape-handling:

  • :func:export_segment_embeddings_csv — transcript-driven, one vector per row.
  • :func:export_audio_embeddings_csv — general WAVs; segmentation + optional pooling.

Both functions use faster-whisper (CTranslate2 backend) and WhisperFeatureExtractor to produce encoder features, then pool the encoder outputs into fixed-length vectors.

export_audio_embeddings_csv

export_audio_embeddings_csv(
    source_wav,
    output_dir=None,
    *,
    config=EmbedConfig(),
    sr=16000,
    strategy="windows",
    window_s=30.0,
    hop_s=15.0,
    min_seg_s=1.0,
    top_db=30.0,
    apply_l2_normalization=False,
    aggregate="none"
)

Compute Whisper encoder embeddings for an arbitrary WAV (no transcript).

Parameters:

Name Type Description Default
source_wav str | Path

Input audio (any format librosa can read).

required
output_dir str | Path | None

Directory for the output CSV. Defaults to the WAV's parent if None.

None
config (EmbedConfig, keyword - only)

Model/device/compute configuration.

EmbedConfig()
sr int

Resample rate used by the feature extractor.

16000
strategy ('windows', 'nonsilent')
  • "windows": fixed windows with hop (overlap allowed).
  • "nonsilent": energy-based voice activity detection via librosa.
"windows","nonsilent"
window_s float

Window length and hop size (seconds). Used by both strategies.

30.0
hop_s float

Window length and hop size (seconds). Used by both strategies.

30.0
min_seg_s float

Discard segments shorter than this length (seconds).

1.0
top_db float

Silence threshold for "nonsilent". Higher → fewer segments.

30.0
aggregate ('none', 'mean')

If "mean", write a single pooled vector over the whole file.

"none","mean"

Returns:

Type Description
Path

CSV path: <output_dir>/<wav_stem>_embeddings.csv.

Notes
  • When aggregate="none", rows are start_time,end_time,SEGMENT_i,e0...
  • When aggregate="mean", a single row 0.000,<dur>,GLOBAL_MEAN,e0.. is written.
Source code in src\taters\audio\extract_whisper_embeddings_subproc.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
def export_audio_embeddings_csv(
    source_wav: str | Path,
    output_dir: Optional[str | Path] = None,
    *,
    config: EmbedConfig = EmbedConfig(),
    sr: int = 16000,
    strategy: Literal["windows", "nonsilent"] = "windows",
    window_s: float = 30.0,
    hop_s: float = 15.0,
    min_seg_s: float = 1.0,
    top_db: float = 30.0,
    apply_l2_normalization: bool = False,
    aggregate: Literal["none", "mean"] = "none",
) -> Path:
    """
    Compute Whisper encoder embeddings for an arbitrary WAV (no transcript).

    Parameters
    ----------
    source_wav : str | Path
        Input audio (any format `librosa` can read).
    output_dir : str | Path | None, optional
        Directory for the output CSV. Defaults to the WAV's parent if None.
    config : EmbedConfig, keyword-only
        Model/device/compute configuration.
    sr : int, default 16000
        Resample rate used by the feature extractor.
    strategy : {"windows","nonsilent"}, default "windows"
        - "windows": fixed windows with hop (overlap allowed).
        - "nonsilent": energy-based voice activity detection via librosa.
    window_s, hop_s : float
        Window length and hop size (seconds). Used by both strategies.
    min_seg_s : float
        Discard segments shorter than this length (seconds).
    top_db : float
        Silence threshold for "nonsilent". Higher → fewer segments.
    aggregate : {"none","mean"}, default "none"
        If "mean", write a single pooled vector over the whole file.

    Returns
    -------
    Path
        CSV path: ``<output_dir>/<wav_stem>_embeddings.csv``.

    Notes
    -----
    - When `aggregate="none"`, rows are ``start_time,end_time,SEGMENT_i,e0..``.
    - When `aggregate="mean"`, a single row ``0.000,<dur>,GLOBAL_MEAN,e0..`` is written.
    """

    source_wav = Path(source_wav)
    if output_dir is None:
        output_dir = source_wav.parent
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    out_csv = output_dir / f"{source_wav.stem}_embeddings.csv"

    # 1) Load audio
    y, in_sr = librosa.load(str(source_wav), sr=sr, mono=True)
    n = len(y)
    if n == 0:
        # Write an empty header-only file
        with out_csv.open("w", encoding="utf-8", newline="") as f:
            csv.writer(f).writerow(["start_time", "end_time", "speaker"])
        return out_csv

    # 2) Load faster-whisper + ct2 + feature extractor (same as transcript path)
    fw = WhisperModel(config.model_name, device=config.device, compute_type=config.compute_type)
    try:
        ct2_model: ctranslate2.models.Whisper = fw.model  # type: ignore[attr-defined]
    except AttributeError:
        model_dir = getattr(fw, "model_dir", None) or getattr(fw, "_model_dir", None)
        if not model_dir:
            raise RuntimeError(
                "Could not access the underlying CTranslate2 model from faster-whisper. "
                "Consider passing a local CTranslate2 model directory as model_name."
            )
        ct2_model = ctranslate2.models.Whisper(str(model_dir), device=config.device, compute_type=config.compute_type)

    fe = WhisperFeatureExtractor.from_pretrained(_hf_repo_for(config.model_name))

    # 3) Build segments (in samples)
    segs: list[tuple[int, int]] = []
    win = max(1, int(round(window_s * sr)))
    hop = max(1, int(round(hop_s * sr)))
    min_len = max(1, int(round(min_seg_s * sr)))

    if strategy == "windows":
        if n <= win:
            segs = [(0, n)]
        else:
            s = 0
            while s < n:
                e = min(n, s + win)
                segs.append((s, e))
                if e >= n:
                    break
                s += hop
    elif strategy == "nonsilent":
        # basic energy-based VAD; torch-free and fast
        intervals = librosa.effects.split(y, top_db=top_db)
        for s, e in intervals:
            if e - s < min_len:
                continue
            # subdivide very long spans into ~window_s chunks
            cur = s
            while cur < e:
                nxt = min(e, cur + win)
                if nxt - cur >= min_len:
                    segs.append((cur, nxt))
                cur = nxt
        if not segs:
            # fallback: whole file as one segment
            segs = [(0, n)]
    else:
        raise ValueError("strategy must be 'windows' or 'nonsilent'")

    # 4) Encode each segment
    rows_out: list[list[Any]] = []
    embed_dim: Optional[int] = None
    vectors: list[np.ndarray] = []

    for i, (s, e) in enumerate(segs):
        clip = y[s:e]
        feats = fe(clip, sampling_rate=sr, return_tensors="np")["input_features"]
        vec = _encode_features_any_layout(ct2_model, feats)
        if vec is None:
            continue
        if embed_dim is None:
            embed_dim = int(vec.shape[-1])
        vectors.append(vec)
        # keep per-chunk row unless we're aggregating
        if aggregate == "none":
            t0 = s / float(sr)
            t1 = e / float(sr)
            rows_out.append([f"{t0:.3f}", f"{t1:.3f}", f"SEGMENT_{i}"] + vec.tolist())

    # 5) Aggregate if requested
    if vectors and aggregate == "mean":
        vec = np.vstack(vectors).mean(axis=0)
        if apply_l2_normalization:
            vec = l2_normalize(vec)
        embed_dim = int(vec.shape[-1])
        rows_out = [["0.000", f"{n/float(sr):.3f}", "GLOBAL_MEAN"] + vec.tolist()]

    # 6) Write CSV (header even if empty)
    if embed_dim is None:
        header = ["start_time", "end_time", "speaker"]
    else:
        header = ["start_time", "end_time", "speaker"] + [f"e{i}" for i in range(embed_dim)]

    with out_csv.open("w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(rows_out)

    if _os.environ.get("TATERS_DEBUG") == "1":
        print(f"[emb-any] segments={len(segs)}, kept={len(rows_out)}, aggregate={aggregate}")
        print(f"[emb-any] wrote: {out_csv}")

    return out_csv

export_segment_embeddings_csv

export_segment_embeddings_csv(
    transcript_csv,
    source_wav,
    output_dir=None,
    *,
    config=EmbedConfig(),
    start_col="start_time",
    end_col="end_time",
    speaker_col="speaker",
    apply_l2_normalization=False,
    sr=16000
)

Compute Whisper encoder embeddings for each transcript segment and write a CSV.

Expected transcript columns (auto-resolved with fallbacks): - start_time (or: start, from, t0, start_ms, start_sec) - end_time (or: end, to, t1, end_ms, end_sec) - speaker (optional; fallbacks include speaker_label, spk, speaker_id, ...)

Parameters:

Name Type Description Default
transcript_csv str | Path

CSV with segment timings (and optionally speaker labels).

required
source_wav str | Path

Audio file to slice. Will be resampled to sr.

required
output_dir str | Path | None

Directory for the output CSV. If None, defaults to the WAV's parent.

None
config (EmbedConfig, keyword - only)

Configuration for model name, device, compute type, and time unit.

EmbedConfig()
start_col str

Column name hints. The function will fall back to common aliases if the exact names are not present.

'start_time'
end_col str

Column name hints. The function will fall back to common aliases if the exact names are not present.

'start_time'
speaker_col str

Column name hints. The function will fall back to common aliases if the exact names are not present.

'start_time'
sr int

Sample rate for feature extraction (audio is resampled as needed).

16000

Returns:

Type Description
Path

Path to the written CSV: <output_dir>/<wav_stem>_embeddings.csv

Behavior
  • Attempts to infer time units ("s", "ms", "samples") when config.time_unit == "auto".
  • Skips invalid or tiny segments (< 2 samples after rounding).
  • Pools encoder outputs to a fixed-length vector (mean over time).
  • Writes header even if no valid segments remain (empty payload).
See Also

export_audio_embeddings_csv : transcript-free embeddings.

Source code in src\taters\audio\extract_whisper_embeddings_subproc.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def export_segment_embeddings_csv(
    transcript_csv: str | Path,
    source_wav: str | Path,
    output_dir: Optional[str | Path] = None,
    *,
    config: EmbedConfig = EmbedConfig(),
    start_col: str = "start_time",
    end_col: str = "end_time",
    speaker_col: str = "speaker",
    apply_l2_normalization: bool = False,
    sr: int = 16000,
) -> Path:
    """
    Compute Whisper encoder embeddings for each transcript segment and write a CSV.

    Expected transcript columns (auto-resolved with fallbacks):
    - start_time (or: start, from, t0, start_ms, start_sec)
    - end_time   (or: end, to, t1, end_ms, end_sec)
    - speaker    (optional; fallbacks include speaker_label, spk, speaker_id, ...)

    Parameters
    ----------
    transcript_csv : str | Path
        CSV with segment timings (and optionally speaker labels).
    source_wav : str | Path
        Audio file to slice. Will be resampled to `sr`.
    output_dir : str | Path | None, optional
        Directory for the output CSV. If None, defaults to the WAV's parent.
    config : EmbedConfig, keyword-only
        Configuration for model name, device, compute type, and time unit.
    start_col, end_col, speaker_col : str
        Column name hints. The function will fall back to common aliases if the
        exact names are not present.
    sr : int, default 16000
        Sample rate for feature extraction (audio is resampled as needed).

    Returns
    -------
    Path
        Path to the written CSV: ``<output_dir>/<wav_stem>_embeddings.csv``

    Behavior
    --------
    - Attempts to infer time units ("s", "ms", "samples") when config.time_unit == "auto".
    - Skips invalid or tiny segments (< 2 samples after rounding).
    - Pools encoder outputs to a fixed-length vector (mean over time).
    - Writes header even if no valid segments remain (empty payload).

    See Also
    --------
    export_audio_embeddings_csv : transcript-free embeddings.
    """

    transcript_csv = Path(transcript_csv)
    source_wav = Path(source_wav)

    # Decide output directory
    if output_dir is None:
        output_dir = source_wav.parent
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Final output path
    output_csv = output_dir / f"{source_wav.stem}_embeddings.csv"

    # 1) Load audio (mono, sr)
    audio, in_sr = librosa.load(str(source_wav), sr=sr, mono=True)
    n_samples = len(audio)
    dur_s = n_samples / float(sr)

    # 2) Load faster-whisper and ct2 model
    fw = WhisperModel(config.model_name, device=config.device, compute_type=config.compute_type)
    try:
        ct2_model: ctranslate2.models.Whisper = fw.model  # type: ignore[attr-defined]
    except AttributeError:
        model_dir = getattr(fw, "model_dir", None) or getattr(fw, "_model_dir", None)
        if not model_dir:
            raise RuntimeError(
                "Could not access the underlying CTranslate2 model from faster-whisper. "
                "Consider passing a local CTranslate2 model directory as model_name."
            )
        ct2_model = ctranslate2.models.Whisper(str(model_dir), device=config.device, compute_type=config.compute_type)

    # 3) Feature extractor
    fe = WhisperFeatureExtractor.from_pretrained(_hf_repo_for(config.model_name))

    # 4) Read transcript and decide time unit
    if not transcript_csv.exists():
        raise FileNotFoundError(f"Transcript CSV not found: {transcript_csv}")

    rows_out: list[list[Any]] = []
    embed_dim: Optional[int] = None

    # First pass: inspect header and a few rows to guess units if needed
    with transcript_csv.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        fields = reader.fieldnames or []
        sc, ec, pc = _resolve_columns(fields, start_col, end_col, speaker_col)

        # Peek up to 100 rows to find a reasonable max end time
        sample_vals: List[float] = []
        for i, row in enumerate(reader):
            try:
                sample_vals.append(float(row[ec]))
            except Exception:
                pass
            if i >= 99:
                break

        # Re-open for the real pass
    # Decide unit
    if config.time_unit not in {"auto", "ms", "s", "samples"}:
        raise ValueError("config.time_unit must be 'auto', 'ms', 's', or 'samples'")

    guessed_unit = None
    if config.time_unit == "auto":
        max_end = max(sample_vals) if sample_vals else 0.0
        guessed_unit = _guess_time_unit(max_end, dur_s, n_samples)
        unit = guessed_unit
    else:
        unit = config.time_unit

    if _os.environ.get("TATERS_DEBUG") == "1":
        print(f"[emb] audio duration: {dur_s:.3f}s @ {sr}Hz (samples={n_samples})")
        if guessed_unit:
            print(f"[emb] time unit guessed -> {guessed_unit}")
        print(f"[emb] time unit in use -> {unit}")

    # Conversion lambdas
    if unit == "s":
        to_sec = lambda x: float(x)
        to_idx = lambda t: int(round(float(t) * sr))
    elif unit == "ms":
        to_sec = lambda x: float(x) * 0.001
        to_idx = lambda t: int(round(float(t) * sr * 0.001))
    elif unit == "samples":
        to_sec = lambda x: float(x) / float(sr)
        to_idx = lambda t: int(round(float(t)))
    else:
        raise RuntimeError("Unexpected time unit.")

    # Real pass
    n_total = n_parsed = n_kept = 0
    n_oob = n_too_short = n_shape_skip = 0

    with transcript_csv.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        fields = reader.fieldnames or []
        sc, ec, pc = _resolve_columns(fields, start_col, end_col, speaker_col)

        for row in reader:
            n_total += 1
            try:
                t0_sec = to_sec(row[sc])
                t1_sec = to_sec(row[ec])
            except Exception:
                continue
            if not (t1_sec > t0_sec):
                continue
            n_parsed += 1

            s = max(0, min(n_samples, to_idx(row[sc])))
            e = max(0, min(n_samples, to_idx(row[ec])))
            if e <= s:
                n_oob += 1
                continue

            # Slice; skip ultra tiny after rounding (< 2 samples)
            if e - s < 2:
                n_too_short += 1
                continue

            clip = audio[s:e]

            # Build input features (float32, no torch)
            feats = fe(clip, sampling_rate=sr, return_tensors="np")["input_features"]

            # Encode with CT2, trying both layouts; pool to [D]
            vec = _encode_features_any_layout(ct2_model, feats)

            # --- Debug: show raw candidate shapes for the first few rows ---
            if _os.environ.get("TATERS_DEBUG") == "1" and n_parsed <= 3:
                try:
                    a = np.ascontiguousarray(feats.astype("float32", copy=False))
                    a1 = a if a.ndim == 3 else a[None, ...]
                    a2 = np.transpose(a1, (0, 2, 1))
                    print(f"[emb] feats shapes tried: {getattr(a1, 'shape', None)} and {getattr(a2, 'shape', None)}")
                except Exception:
                    pass
            # ----------------------------------------------------------------

            if vec is None:
                n_shape_skip += 1
                continue

            if apply_l2_normalization:
                vec = l2_normalize(vec)

            if embed_dim is None:
                embed_dim = int(vec.shape[-1])

            speaker = row.get(pc, "SPEAKER_0")
            rows_out.append([row[sc], row[ec], speaker] + vec.tolist())
            n_kept += 1


    # 5) Write CSV (header even if empty)
    if embed_dim is None:
        header = ["start_time", "end_time", "speaker"]
    else:
        header = ["start_time", "end_time", "speaker"] + [f"e{i}" for i in range(embed_dim)]

    with output_csv.open("w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(rows_out)

    if _os.environ.get("TATERS_DEBUG") == "1":
        print(f"[emb] rows: total={n_total}, parsed={n_parsed}, kept={n_kept}, oob={n_oob}, tiny={n_too_short}, shape_skip={n_shape_skip}")
        print(f"[emb] columns: {header}")
        print(f"[emb] wrote: {output_csv}")

    return output_csv

make_speaker_wavs_from_csv

make_speaker_wavs_from_csv(
    source_wav,
    transcript_csv_path,
    output_dir=None,
    *,
    overwrite_existing=False,
    start_col="start_time",
    end_col="end_time",
    speaker_col="speaker",
    time_unit="ms",
    silence_ms=1000,
    pre_silence_ms=None,
    post_silence_ms=None,
    sr=16000,
    mono=True,
    min_dur_ms=50,
    merge_consecutive=True
)

Concatenate speaker-specific segments into per-speaker WAV files.

If merge_consecutive=True (default), adjacent transcript rows with the same speaker are merged into a single, longer segment spanning from the first start to the last end — including any silence between those turns. If you need the strict per-row behavior, set merge_consecutive=False.

Parameters:

Name Type Description Default
source_wav str | Path

Path to the source WAV.

required
transcript_csv_path str | Path

CSV with timing and speaker columns (e.g., diarization output).

required
output_dir str | Path | None

Where to write the per-speaker files. If None, defaults to ./audio_split/<source_stem>/.

None
start_col str

Column names in the transcript CSV.

'start_time'
end_col str

Column names in the transcript CSV.

'start_time'
speaker_col str

Column names in the transcript CSV.

'start_time'
time_unit ('ms', 's')

Units for start/end columns.

"ms","s"
silence_ms int

If pre_silence_ms/post_silence_ms are None, use this for both sides.

1000
pre_silence_ms int | None

Explicit padding (ms) before/after each segment; overrides silence_ms.

None
post_silence_ms int | None

Explicit padding (ms) before/after each segment; overrides silence_ms.

None
sr int | None

Resample output to this rate. If None, keep original rate.

16000
mono bool

Downmix to mono if True.

True
min_dur_ms int

Skip segments shorter than this duration (ms).

50
merge_consecutive bool

Merge back-to-back turns for the same speaker into one segment span (including any inter-turn silence). If False, emit one clip per row.

True

Returns:

Type Description
dict[str, Path]

Mapping from friendly speaker label → output WAV path.

Behavior
  • Input speaker labels are sanitized for filenames but a more readable label (without path-hostile characters) is preserved for naming.
  • Segments are sorted by start time per speaker before concatenation.
  • If a speaker ends up with zero valid segments, no file is written.

Examples:

>>> make_speaker_wavs_from_csv(
...     source_wav="audio/session.wav",
...     transcript_csv_path="transcripts/session.csv",
...     time_unit="ms",
...     silence_ms=0,  # no padding
...     sr=16000,
...     mono=True,
... )
Source code in src\taters\audio\split_wav_by_speaker.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def make_speaker_wavs_from_csv(
    source_wav: Union[str, Path],
    transcript_csv_path: Union[str, Path],
    output_dir: Union[str, Path, None] = None,
    *,
    overwrite_existing: bool = False,
    start_col: str = "start_time",
    end_col: str = "end_time",
    speaker_col: str = "speaker",
    time_unit: str = "ms",             # "ms" or "s"
    silence_ms: int = 1000,
    pre_silence_ms: Optional[int] = None,
    post_silence_ms: Optional[int] = None,
    sr: Optional[int] = 16000,
    mono: bool = True,
    min_dur_ms: int = 50,
    merge_consecutive: bool = True,    # NEW: merge back-to-back turns by same speaker
) -> Dict[str, Path]:
    """
    Concatenate speaker-specific segments into per-speaker WAV files.

    If `merge_consecutive=True` (default), adjacent transcript rows with the same
    speaker are merged into a single, longer segment spanning from the first
    start to the last end — including any silence between those turns. If you
    need the strict per-row behavior, set `merge_consecutive=False`.

    Parameters
    ----------
    source_wav : str | Path
        Path to the source WAV.
    transcript_csv_path : str | Path
        CSV with timing and speaker columns (e.g., diarization output).
    output_dir : str | Path | None, optional
        Where to write the per-speaker files. If None, defaults to
        ``./audio_split/<source_stem>/``.
    start_col, end_col, speaker_col : str
        Column names in the transcript CSV.
    time_unit : {"ms","s"}, default "ms"
        Units for start/end columns.
    silence_ms : int, default 1000
        If `pre_silence_ms`/`post_silence_ms` are None, use this for both sides.
    pre_silence_ms, post_silence_ms : int | None
        Explicit padding (ms) before/after each segment; overrides `silence_ms`.
    sr : int | None, default 16000
        Resample output to this rate. If None, keep original rate.
    mono : bool, default True
        Downmix to mono if True.
    min_dur_ms : int, default 50
        Skip segments shorter than this duration (ms).
    merge_consecutive : bool, default True
        Merge back-to-back turns for the same speaker into one segment span
        (including any inter-turn silence). If False, emit one clip per row.

    Returns
    -------
    dict[str, Path]
        Mapping from friendly speaker label → output WAV path.

    Behavior
    --------
    - Input speaker labels are sanitized for filenames but a more readable label
      (without path-hostile characters) is preserved for naming.
    - Segments are sorted by start time per speaker before concatenation.
    - If a speaker ends up with zero valid segments, no file is written.

    Examples
    --------
    >>> make_speaker_wavs_from_csv(
    ...     source_wav="audio/session.wav",
    ...     transcript_csv_path="transcripts/session.csv",
    ...     time_unit="ms",
    ...     silence_ms=0,  # no padding
    ...     sr=16000,
    ...     mono=True,
    ... )
    """
    if time_unit not in ("ms", "s"):
        raise ValueError("time_unit must be 'ms' or 's'")

    def _friendly_filename_label(name: str) -> str:
        s = (name or "").strip()
        s = s.replace("/", "_").replace("\\", "_")
        s = re.sub(r'[<>:"|?*]', "", s)
        s = re.sub(r"\s+", " ", s)
        return s or "SPEAKER_0"

    source_wav = Path(source_wav)
    transcript_csv_path = Path(transcript_csv_path)
    out_dir = Path(output_dir) if output_dir is not None else (Path.cwd() / "audio_split" / source_wav.stem)
    out_dir.mkdir(parents=True, exist_ok=True)
    base_stem = source_wav.stem

    audio = AudioSegment.from_file(source_wav)
    if sr:
        audio = audio.set_frame_rate(sr)
    if mono:
        audio = audio.set_channels(1)

    factor = 1000.0 if time_unit == "s" else 1.0
    audio_len_ms = len(audio)

    with transcript_csv_path.open(newline="", encoding="utf-8") as f:
        rows = list(csv.DictReader(f))

    segs_by_spk: Dict[str, List[tuple[int, int]]] = {}
    label_for_key: Dict[str, str] = {}

    # Build segments with awareness of original row order so that we can merge
    # adjacent turns for the same speaker when requested.
    prev_spk_key: Optional[str] = None
    for row in rows:
        try:
            start_raw = float(row[start_col])
            end_raw   = float(row[end_col])
            raw_spk   = str(row.get(speaker_col, "SPEAKER_0"))
        except Exception:
            continue

        start_ms = int(round(start_raw * factor))
        end_ms   = int(round(end_raw   * factor))
        if end_ms <= start_ms:
            continue

        start_ms = _clamp(start_ms, 0, audio_len_ms)
        end_ms   = _clamp(end_ms,   0, audio_len_ms)
        if end_ms <= start_ms:
            continue

        spk_key = _sanitize_speaker(raw_spk)
        label_for_key.setdefault(spk_key, _friendly_filename_label(raw_spk))

        if merge_consecutive and prev_spk_key == spk_key and segs_by_spk.get(spk_key):
            # Extend the last segment for this speaker to cover the new end
            s0, e0 = segs_by_spk[spk_key][-1]
            # Keep the earliest start, extend to the latest end
            s_new = min(s0, start_ms)
            e_new = max(e0, end_ms)
            segs_by_spk[spk_key][-1] = (s_new, e_new)
        else:
            # Strictly append a new segment
            segs_by_spk.setdefault(spk_key, []).append((start_ms, end_ms))

        prev_spk_key = spk_key

    # Optional: drop very short segments after merging
    for spk_key, segs in list(segs_by_spk.items()):
        segs_by_spk[spk_key] = [(s, e) for (s, e) in segs if (e - s) >= min_dur_ms]

    pre_ms  = silence_ms if pre_silence_ms  is None else pre_silence_ms
    post_ms = silence_ms if post_silence_ms is None else post_silence_ms
    pre_sil  = AudioSegment.silent(duration=max(0, pre_ms),  frame_rate=audio.frame_rate)
    post_sil = AudioSegment.silent(duration=max(0, post_ms), frame_rate=audio.frame_rate)
    if mono:
        pre_sil  = pre_sil.set_channels(1)
        post_sil = post_sil.set_channels(1)

    results: Dict[str, Path] = {}
    for spk_key, segs in segs_by_spk.items():
        if not segs:
            continue

        friendly = label_for_key.get(spk_key, spk_key)
        out_path = out_dir / f"{base_stem}_{friendly}.wav"

        if (not overwrite_existing) and out_path.is_file():
            results[friendly] = out_path
            continue

        out = AudioSegment.silent(duration=0, frame_rate=audio.frame_rate)
        if mono:
            out = out.set_channels(1)

        for (s, e) in segs:
            clip = audio[s:e]
            if len(clip) < min_dur_ms:
                continue
            out += pre_sil + clip + post_sil

        if len(out) == 0:
            continue

        out.export(out_path, format="wav", codec="pcm_s16le")
        results[friendly] = out_path

    return results