Skip to content

API Reference

download_dataset is the canonical public API for downloading dataset archives. save_dataset_to_disk is still available as a deprecated alias for backward compatibility.

datacollective.datasets

download_dataset(dataset_id, download_directory=None, show_progress=True, overwrite_existing=False, enable_logging=False)

Download the dataset archive to a local directory and return the archive path. Skips download if the target file already exists (unless overwrite_existing=True).

Automatically resumes interrupted downloads if a matching .checksum file exists from a previous attempt.

Note: Previously called save_dataset_to_disk, which remains available as a deprecated alias for backward compatibility.

Parameters:

Name Type Description Default
dataset_id str

The dataset ID (as shown in MDC platform) or slug.

required
download_directory str | None

Directory where to save the downloaded archive file. If None or empty, falls back to env MDC_DOWNLOAD_PATH or default.

None
show_progress bool

Whether to show a progress bar during download.

True
overwrite_existing bool

Whether to overwrite the existing archive file.

False
enable_logging bool

Whether to enable SDK logging to console and a local log file.

False

Returns:

Type Description
Path

Path to the downloaded dataset archive.

Raises:

Type Description
ValueError

If dataset_id is empty.

FileNotFoundError

If the dataset does not exist (404).

PermissionError

If access is denied (403) or download directory is not writable.

RuntimeError

If rate limit is exceeded (429) or unexpected response format.

HTTPError

For other non-2xx responses.

Source code in src/datacollective/datasets.py
def download_dataset(
    dataset_id: str,
    download_directory: str | None = None,
    show_progress: bool = True,
    overwrite_existing: bool = False,
    enable_logging: bool = False,
) -> Path:
    """
    Download the dataset archive to a local directory and return the archive path.
    Skips download if the target file already exists (unless `overwrite_existing=True`).

    Automatically resumes interrupted downloads if a matching .checksum file exists from a
    previous attempt.

    Note: Previously called `save_dataset_to_disk`, which remains available as a
    deprecated alias for backward compatibility.

    Args:
        dataset_id: The dataset ID (as shown in MDC platform) or slug.
        download_directory: Directory where to save the downloaded archive file.
            If None or empty, falls back to env MDC_DOWNLOAD_PATH or default.
        show_progress: Whether to show a progress bar during download.
        overwrite_existing: Whether to overwrite the existing archive file.
        enable_logging: Whether to enable SDK logging to console and a local log file.

    Returns:
        Path to the downloaded dataset archive.

    Raises:
        ValueError: If dataset_id is empty.
        FileNotFoundError: If the dataset does not exist (404).
        PermissionError: If access is denied (403) or download directory is not writable.
        RuntimeError: If rate limit is exceeded (429) or unexpected response format.
        requests.HTTPError: For other non-2xx responses.
    """
    _enable_logging(enable_logging)
    logger.info(f"Downloading dataset {dataset_id}")

    _id = resolve_dataset_id(dataset_id)
    download_plan = _get_download_plan(
        _id,
        download_directory,
        download_source=DOWNLOAD_SOURCE_SAVE,
    )
    return _resolve_and_execute_download_plan(
        download_plan=download_plan,
        show_progress=show_progress,
        overwrite_existing=overwrite_existing,
        download_source=DOWNLOAD_SOURCE_SAVE,
    )

get_dataset_details(dataset_id)

Return dataset details from the MDC API as a dictionary.

Parameters:

Name Type Description Default
dataset_id str

The dataset ID (as shown in MDC platform) or slug.

required

Returns:

Type Description
dict[str, Any]

A dict with dataset details as returned by the API.

Raises:

Type Description
ValueError

If dataset_id is empty.

FileNotFoundError

If the dataset does not exist (404).

PermissionError

If access is denied (403).

RuntimeError

If rate limit is exceeded (429).

HTTPError

For other non-2xx responses.

Source code in src/datacollective/datasets.py
def get_dataset_details(dataset_id: str) -> dict[str, Any]:
    """
    Return dataset details from the MDC API as a dictionary.

    Args:
        dataset_id: The dataset ID (as shown in MDC platform) or slug.

    Returns:
        A dict with dataset details as returned by the API.

    Raises:
        ValueError: If dataset_id is empty.
        FileNotFoundError: If the dataset does not exist (404).
        PermissionError: If access is denied (403).
        RuntimeError: If rate limit is exceeded (429).
        requests.HTTPError: For other non-2xx responses.
    """
    if not dataset_id or not dataset_id.strip():
        raise ValueError("`dataset_id` must be a non-empty string")

    url = f"{_get_api_url()}/datasets/{dataset_id}"
    resp = _send_api_request(method="GET", url=url)
    return dict(resp.json())

load_dataset(dataset_id, download_directory=None, show_progress=True, overwrite_existing=False, overwrite_extracted=False, enable_logging=False)

Download (if needed), extract (if not already extracted), and load the dataset into a pandas DataFrame.

If the dataset archive already exists in the download directory, it will not be re-downloaded unless overwrite_existing=True.

If there is a directory with the same name as the archive file without the suffix extension, we assume it has already been extracted, and it will not be re-extracted unless overwrite_extracted=True.

Uses the dataset schema to determine task-specific loading logic.

Automatically resumes interrupted downloads if a .checksum file exists from a previous attempt.

Parameters:

Name Type Description Default
dataset_id str

The dataset ID (as shown in MDC platform) or slug.

required
download_directory str | None

Directory where to save the downloaded archive file. If None or empty, falls back to env MDC_DOWNLOAD_PATH or default.

None
show_progress bool

Whether to show a progress bar during download.

True
overwrite_existing bool

Whether to overwrite existing archive.

False
overwrite_extracted bool

Whether to overwrite existing extracted files by re-extracting the archive file. Only makes sense when overwrite_existing is False. Will check in the download directory for existing extracted files with the default naming of the folder.

False
enable_logging bool

Whether to enable SDK logging to console and a local log file.

False

Returns: A pandas DataFrame with the loaded dataset.

Raises:

Type Description
ValueError

If dataset_id is empty or schema is unsupported.

FileNotFoundError

If the dataset does not exist (404).

PermissionError

If access is denied (403) or download directory is not writable.

RuntimeError

If rate limit is exceeded (429) or unexpected response format.

HTTPError

For other non-2xx responses.

Source code in src/datacollective/datasets.py
def load_dataset(
    dataset_id: str,
    download_directory: str | None = None,
    show_progress: bool = True,
    overwrite_existing: bool = False,
    overwrite_extracted: bool = False,
    enable_logging: bool = False,
) -> pd.DataFrame:
    """
    Download (if needed), extract (if not already extracted), and load the dataset into a pandas DataFrame.

    If the dataset archive already exists in the download directory, it will not be re-downloaded
    unless `overwrite_existing=True`.

    If there is a directory with the same name as the archive file without the suffix extension, we assume
    it has already been extracted, and it will not be re-extracted unless `overwrite_extracted=True`.

    Uses the dataset schema to determine task-specific loading logic.

    Automatically resumes interrupted downloads if a .checksum file exists from a
    previous attempt.

    Args:
        dataset_id: The dataset ID (as shown in MDC platform) or slug.
        download_directory: Directory where to save the downloaded archive file.
            If None or empty, falls back to env MDC_DOWNLOAD_PATH or default.
        show_progress: Whether to show a progress bar during download.
        overwrite_existing: Whether to overwrite existing archive.
        overwrite_extracted: Whether to overwrite existing extracted files by re-extracting the archive file.
            Only makes sense when overwrite_existing is False.
            Will check in the download directory for existing extracted files with the default naming of the folder.
        enable_logging: Whether to enable SDK logging to console and a local log file.
    Returns:
        A pandas DataFrame with the loaded dataset.

    Raises:
        ValueError: If dataset_id is empty or schema is unsupported.
        FileNotFoundError: If the dataset does not exist (404).
        PermissionError: If access is denied (403) or download directory is not writable.
        RuntimeError: If rate limit is exceeded (429) or unexpected response format.
        requests.HTTPError: For other non-2xx responses.
    """
    _enable_logging(enable_logging)
    logger.info("Loading dataset `%s`", dataset_id)

    _id = resolve_dataset_id(dataset_id)
    schema = _get_dataset_schema(_id)
    if schema is None:
        raise RuntimeError(
            f"Dataset '{_id}' exists but is not supported by load_dataset yet. "
            f"You can download the raw archive with: download_dataset('{_id}'). "
            f"If you are the data owner consider submitting a schema for your dataset via the registry: https://mozilla-data-collective.github.io/dataset-schema-registry/"
        )

    download_plan = _get_download_plan(
        _id,
        download_directory,
        download_source=DOWNLOAD_SOURCE_LOAD,
    )
    archive_checksum = download_plan.checksum

    archive_path = _resolve_and_execute_download_plan(
        download_plan=download_plan,
        show_progress=show_progress,
        overwrite_existing=overwrite_existing,
        download_source=DOWNLOAD_SOURCE_LOAD,
    )
    base_dir = _resolve_download_dir(download_directory)
    extract_dir = _extract_archive(
        archive_path=archive_path,
        dest_dir=base_dir,
        overwrite_extracted=overwrite_extracted,
    )

    schema = _resolve_schema(_id, extract_dir, archive_checksum)
    return _load_dataset_from_schema(schema, extract_dir)

resolve_dataset_id(dataset_id)

Resolves a dataset ID or slug to its canonical MDC ID.

Parameters:

Name Type Description Default
dataset_id str

The dataset ID (as shown in MDC platform) or slug.

required

Returns:

Type Description
str

The canonical dataset ID.

Raises:

Type Description
RuntimeError

If the dataset does not exist.

Source code in src/datacollective/datasets.py
def resolve_dataset_id(dataset_id: str) -> str:
    """
    Resolves a dataset ID or slug to its canonical MDC ID.

    Args:
        dataset_id: The dataset ID (as shown in MDC platform) or slug.

    Returns:
        The canonical dataset ID.

    Raises:
        RuntimeError: If the dataset does not exist.
    """
    try:
        dataset_details = get_dataset_details(dataset_id)
        return dataset_details.get("id", "")
    except FileNotFoundError:
        raise RuntimeError(
            f"Dataset '{dataset_id}' does not exist in MDC or the ID is mistyped."
        )

save_dataset_to_disk(dataset_id, download_directory=None, show_progress=True, overwrite_existing=False, enable_logging=False)

Deprecated alias for download_dataset.

Use download_dataset instead. This name is kept for backward compatibility.

Source code in src/datacollective/datasets.py
def save_dataset_to_disk(
    dataset_id: str,
    download_directory: str | None = None,
    show_progress: bool = True,
    overwrite_existing: bool = False,
    enable_logging: bool = False,
) -> Path:
    """
    Deprecated alias for `download_dataset`.

    Use `download_dataset` instead. This name is kept for backward compatibility.
    """
    warnings.warn(
        "`save_dataset_to_disk` is deprecated and will be removed in a future "
        "release. Use `download_dataset` instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    return download_dataset(
        dataset_id=dataset_id,
        download_directory=download_directory,
        show_progress=show_progress,
        overwrite_existing=overwrite_existing,
        enable_logging=enable_logging,
    )

datacollective.download

datacollective.api_utils

datacollective.submissions

create_submission_draft(submission)

Create a draft dataset submission.

Parameters:

Name Type Description Default
submission DatasetSubmission

Dataset submission model containing at least name and longDescription.

required

Returns:

Type Description
dict[str, Any]

The full API response dict (contains a submission key with

dict[str, Any]

the created submission).

Source code in src/datacollective/submissions.py
def create_submission_draft(submission: DatasetSubmission) -> dict[str, Any]:
    """
    Create a draft dataset submission.

    Args:
        submission: Dataset submission model containing at least `name`
            and `longDescription`.

    Returns:
        The full API response dict (contains a ``submission`` key with
        the created submission).
    """
    submission = _ensure_submission_model(submission)
    payload = _payload_for_fields(submission, DRAFT_FIELDS)
    if "name" not in payload:
        raise ValueError("`submission` must include `name`")

    url = f"{_get_api_url()}/submissions"
    resp = _send_api_request("POST", url, json_body=payload)
    return dict(resp.json())

create_submission_with_upload(file_path, submission, state_path=None, enable_logging=False)

Single point function to create a submission, upload a file, update metadata, and submit for review. Allows for resuming an upload if interrupted by persisting state to a file.

Parameters:

Name Type Description Default
file_path str

Path to dataset archive.

required
submission DatasetSubmission

Dataset submission model with metadata fields.

required
state_path str | None

Optional path to persist upload state.

None
enable_logging bool

Whether to enable detailed logging during the process.

False
Source code in src/datacollective/submissions.py
def create_submission_with_upload(
    file_path: str,
    submission: DatasetSubmission,
    state_path: str | None = None,
    enable_logging: bool = False,
) -> dict[str, Any]:
    """
    Single point function to create a submission, upload a file, update metadata, and submit for review.
    Allows for resuming an upload if interrupted by persisting state to a file.

    Args:
        file_path: Path to dataset archive.
        submission: Dataset submission model with metadata fields.
        state_path: Optional path to persist upload state.
        enable_logging: Whether to enable detailed logging during the process.
    """
    _enable_logging(enable_logging)

    submission = _ensure_submission_model(submission)

    _validate_final_submission_fields(submission, require_file_upload_id=False)

    state_file, existing_upload_state = _resolve_upload_state(file_path, state_path)

    if existing_upload_state:
        submission_id = existing_upload_state.submissionId
        logger.info(
            f"Found existing upload state at `{state_file}`. Resuming submission {submission_id}."
        )
    else:
        logger.info(f"Creating submission draft for '{submission.name}'...")

        draft = create_submission_draft(submission)

        submission_payload = draft.get("submission", {})
        submission_id = (
            submission_payload.get("id")
            if isinstance(submission_payload, dict)
            else None
        )
        if not submission_id:
            raise RuntimeError("Draft creation did not return a submission id")

        logger.info(f"Draft created. Submission ID: {submission_id}")

    upload_state = upload_dataset_file(
        file_path=file_path,
        submission_id=submission_id,
        state_path=state_path,
        enable_logging=enable_logging,
    )

    submission.fileUploadId = upload_state.fileUploadId
    _validate_final_submission_fields(submission, require_file_upload_id=True)

    logger.info("Updating submission metadata...")

    update_submission(submission_id, submission)

    logger.info("Submitting dataset for review...")

    response = submit_submission(submission_id, submission)

    logger.info("Submission complete!")

    return response

submit_submission(submission_id, submission)

Submit a dataset submission for review.

Parameters:

Name Type Description Default
submission_id str

Dataset submission ID.

required
submission DatasetSubmission

Dataset submission model with agreeToSubmit=True.

required

Returns:

Type Description
dict[str, Any]

The full API response dict (contains a submission key with

dict[str, Any]

the submission whose status should be "submitted").

Source code in src/datacollective/submissions.py
def submit_submission(
    submission_id: str,
    submission: DatasetSubmission,
) -> dict[str, Any]:
    """
    Submit a dataset submission for review.

    Args:
        submission_id: Dataset submission ID.
        submission: Dataset submission model with `agreeToSubmit=True`.

    Returns:
        The full API response dict (contains a ``submission`` key with
        the submission whose status should be ``"submitted"``).
    """
    submission = _ensure_submission_model(submission)

    if _should_validate_local_final_submission(submission):
        _validate_final_submission_fields(submission, require_file_upload_id=True)
    elif submission.agreeToSubmit is not True:
        raise ValueError("`agreeToSubmit` must be True to submit a dataset")

    payload = _payload_for_fields(submission, SUBMIT_FIELDS)
    url = f"{_get_api_url()}/submissions/{submission_id}"
    resp = _send_api_request("POST", url, json_body=payload)
    return dict(resp.json())

update_submission(submission_id, submission)

Update metadata on an existing dataset submission.

Parameters:

Name Type Description Default
submission_id str

Dataset submission ID.

required
submission DatasetSubmission

Dataset submission model containing update fields.

required

Returns:

Type Description
dict[str, Any]

The full API response dict (contains a submission key).

Source code in src/datacollective/submissions.py
def update_submission(
    submission_id: str,
    submission: DatasetSubmission,
) -> dict[str, Any]:
    """
    Update metadata on an existing dataset submission.

    Args:
        submission_id: Dataset submission ID.
        submission: Dataset submission model containing update fields.

    Returns:
        The full API response dict (contains a ``submission`` key).
    """
    submission = _ensure_submission_model(submission)

    payload = _payload_for_fields(submission, UPDATE_FIELDS)
    if not payload:
        raise ValueError("`submission` must include at least one updatable field")

    url = f"{_get_api_url()}/submissions/{submission_id}"
    resp = _send_api_request("PATCH", url, json_body=payload)
    return dict(resp.json())

datacollective.upload

upload_dataset_file(file_path, submission_id, state_path=None, show_progress=True, enable_logging=False)

Upload a dataset file using multipart uploads with resumable state.

Uploads are limited to 80GB and use the application/gzip MIME type. Pass the submission ID of the target dataset submission. This works for both draft submissions and for uploading a new .tar.gz version to an already approved dataset submission.

Parameters:

Name Type Description Default
file_path str

Path to the dataset archive on disk.

required
submission_id str

Dataset submission ID (not the dataset ID).

required
state_path str | None

Optional path to persist upload state. Defaults to <filename>.mdc-upload.json alongside the archive.

None
enable_logging bool

Whether to enable detailed logging during the upload.

False
show_progress bool

Whether to show a progress bar during upload.

True
Source code in src/datacollective/upload.py
def upload_dataset_file(
    file_path: str,
    submission_id: str,
    state_path: str | None = None,
    show_progress: bool = True,
    enable_logging: bool = False,
) -> UploadState:
    """
    Upload a dataset file using multipart uploads with resumable state.

    Uploads are limited to 80GB and use the `application/gzip` MIME type.
    Pass the submission ID of the target dataset submission. This works for
    both draft submissions and for uploading a new `.tar.gz` version to an
    already approved dataset submission.

    Args:
        file_path: Path to the dataset archive on disk.
        submission_id: Dataset submission ID (not the dataset ID).
        state_path: Optional path to persist upload state. Defaults to
            `<filename>.mdc-upload.json` alongside the archive.
        enable_logging: Whether to enable detailed logging during the upload.
        show_progress: Whether to show a progress bar during upload.
    """
    path = Path(file_path)
    _enable_logging(enable_logging)

    if not path.exists():
        raise FileNotFoundError(f"File not found: `{file_path}`")

    file_size = path.stat().st_size
    if file_size <= 0:
        raise ValueError("`file_path` must point to a non-empty file")
    if file_size > MAX_UPLOAD_BYTES:
        raise ValueError("`file_path` exceeds the 80GB upload limit")

    final_filename = path.name

    state_file = Path(state_path) if state_path else _default_state_path(path)

    state = _load_or_create_state(
        state_file=state_file,
        submission_id=submission_id,
        final_filename=final_filename,
        file_size=file_size,
    )

    expected_parts = _expected_parts(state.fileSize, state.partSize)

    parts_by_number = _normalize_parts(state)
    if parts_by_number:
        logger.info(
            f"Resuming: {len(parts_by_number)}/{expected_parts} parts already uploaded."
        )

    logger.info(f"Uploading file: {final_filename}")

    progress_bar = _init_progress_bar(
        show_progress=show_progress,
        file_size=state.fileSize,
        part_size=state.partSize,
        already_uploaded=len(parts_by_number),
    )

    bytes_read, checksum = _upload_missing_parts(
        path=path,
        state=state,
        parts_by_number=parts_by_number,
        expected_parts=expected_parts,
        progress_bar=progress_bar,
        state_file=state_file,
    )

    if progress_bar:
        progress_bar.finish()

    if bytes_read != state.fileSize:
        raise RuntimeError(
            "Upload aborted because file size changed during upload "
            f"(expected {state.fileSize} bytes, read {bytes_read})."
        )

    if len(parts_by_number) != expected_parts:
        raise RuntimeError(
            "Upload incomplete. Expected "
            f"{expected_parts} parts but have {len(parts_by_number)}."
        )

    state.checksum = checksum
    state.parts = _parts_from_mapping(parts_by_number)
    _save_upload_state(state_file, state)

    logger.info("Completing upload...")

    _complete_upload(state.fileUploadId, state.uploadId, state.parts, state.checksum)

    logger.info(f"Upload complete. File upload ID: {state.fileUploadId}")

    _cleanup_state_file(state_file)

    return state

datacollective.models

DatasetSubmission

Bases: NonEmptyStrModel

DatasetSubmission schema aligned with the backend DB representation used for draft creation, metadata updates, and final submission.

Note: Fields are camelCase to match the API payloads.

Source code in src/datacollective/models.py
class DatasetSubmission(NonEmptyStrModel):
    """
    DatasetSubmission schema aligned with the backend DB representation used
    for draft creation, metadata updates, and final submission.

    Note: Fields are camelCase to match the API payloads.
    """

    # Defined by the user
    name: str | None = Field(None, description="Name of the dataset.")
    shortDescription: str | None = Field(
        None, description="Brief description of the dataset."
    )
    longDescription: str | None = Field(
        None, description="Detailed description of the dataset."
    )
    locale: str | None = Field(
        None, description="Language/locale code (e.g., `en-US`, `de-DE`)."
    )
    task: Task | None = Field(
        None,
        description="ML task type — must be one of the Task enum values listed in api.md.",
    )
    format: str | None = Field(None, description="File format (e.g., `TSV`, `WAV`).")
    licenseAbbreviation: License | str | None = Field(
        None,
        description="Either one of the predefined License enum values or, optionally, a custom abbreviated license name.",
    )
    license: str | None = Field(
        None,
        description="Full license name for custom licenses.",
    )
    licenseUrl: str | None = Field(
        None,
        description="Optional URL to the license text for custom licenses.",
    )
    other: str | None = Field(None, description="The datasheet of the dataset.")
    restrictions: str | None = Field(
        None, description="Any restrictions on dataset use."
    )
    forbiddenUsage: str | None = Field(
        None, description="Explicitly forbidden use cases."
    )
    additionalConditions: str | None = Field(
        None, description="Additional conditions for use."
    )
    pointOfContactFullName: str | None = Field(
        None, description="Primary contact name."
    )
    pointOfContactEmail: str | None = Field(None, description="Primary contact email.")
    fundedByFullName: str | None = Field(None, description="Funder's name.")
    fundedByEmail: str | None = Field(None, description="Funder's email.")
    legalContactFullName: str | None = Field(None, description="Legal contact name.")
    legalContactEmail: str | None = Field(None, description="Legal contact email.")
    createdByFullName: str | None = Field(None, description="Creator's name.")
    createdByEmail: str | None = Field(None, description="Creator's email.")
    intendedUsage: str | None = Field(None, description="Intended use of the dataset.")
    ethicalReviewProcess: str | None = Field(
        None, description="Description of ethical review conducted."
    )
    exclusivityOptOut: bool | None = Field(
        None,
        description="True if dataset is non-exclusive; False if hosted exclusively on Mozilla Data Collective (see https://mozilladatacollective.com/terms/providers#appendix-1).",
    )
    agreeToSubmit: bool | None = Field(
        None,
        description="You confirm that you have the right to submit this dataset and that all information provided in the datasheet is accurate. Required to be True to complete the submission process",
    )
    # Defined by the API and not user-editable
    id: str | None = Field(
        None, description="Unique identifier for the submission as returned by the API."
    )
    organizationId: str | None = Field(
        None,
        description="Identifier for the organization associated with the submission.",
    )
    createdBy: str | None = Field(
        None, description="Identifier for the user who created the submission."
    )
    status: str | None = Field(
        None,
        description="Current status of the submission (e.g., 'draft', 'submitted'). Determined by the API.",
    )
    slug: str | None = Field(
        None,
        description="URL-friendly slug for the submission, generated from the name. Determined by the API.",
    )
    fileUploadId: str | None = Field(
        None,
        description="Identifier for the associated file upload, if any. Generated by the API when a file is uploaded.",
    )
    exclusivityOptOutAt: str | None = Field(
        None, description="Timestamp when exclusivity opt-out was set, if applicable."
    )
    submittedAt: str | None = Field(
        None,
        description="Timestamp when the submission was finalized and submitted. Set by the API upon submission.",
    )
    createdAt: str | None = Field(
        None,
        description="Timestamp when the submission was created. Set by the API upon creation.",
    )
    updatedAt: str | None = Field(
        None,
        description="Timestamp when the submission was last updated. Updated by the API on changes.",
    )

    @model_validator(mode="after")
    def _validate_license_details(self) -> DatasetSubmission:
        has_custom_license_abbreviation = (
            self.licenseAbbreviation is not None
            and not isinstance(self.licenseAbbreviation, License)
        )
        requires_license_name = (
            has_custom_license_abbreviation or self.licenseUrl is not None
        )
        if requires_license_name and self.license is None:
            raise ValueError(
                "`license` is required when providing a custom `licenseAbbreviation` or `licenseUrl`"
            )
        return self

License

Bases: str, Enum

List of pre-defined dataset licenses.

Source code in src/datacollective/models.py
class License(str, Enum):
    """List of pre-defined dataset licenses."""

    APACHE_2_0 = "Apache-2.0"
    BSD_3_CLAUSE = "BSD-3-Clause"
    CC_BY_4_0 = "CC-BY-4.0"
    CC_BY_ND_4_0 = "CC-BY-ND-4.0"
    CC_BY_NC_4_0 = "CC-BY-NC-4.0"
    CC_BY_NC_SA_4_0 = "CC-BY-NC-SA-4.0"
    CC_BY_SA_4_0 = "CC-BY-SA-4.0"
    CC_SA_1_0 = "CC-SA-1.0"
    CC0_1_0 = "CC0-1.0"
    EUPL_1_2 = "EUPL-1.2"
    AGPL_3_0 = "AGPL-3.0"
    GFDL_1_3 = "GFDL-1.3"
    GPL_3_0 = "GPL-3.0"
    LGPLLR = "LGPLLR"
    MIT = "MIT"
    MPL_2_0 = "MPL-2.0"
    NLOD_2_0 = "NLOD-2.0"
    NOODL_1_0 = "NOODL-1.0"
    ODC_BY_1_0 = "ODC-By-1.0"
    ODBL_1_0 = "ODbL-1.0"
    OGL_CANADA_2_0 = "OGL-Canada-2.0"
    OGL_UK_3_0 = "OGL-UK-3.0"
    OPUBL_1_0 = "OPUBL-1.0"
    OGDL_TAIWAN_1_0 = "OGDL-Taiwan-1.0"
    UNLICENSE = "Unlicense"

NonEmptyStrModel

Bases: BaseModel

Base model that trims string fields and rejects empty values.

Source code in src/datacollective/models.py
class NonEmptyStrModel(BaseModel):
    """Base model that trims string fields and rejects empty values."""

    model_config = ConfigDict(extra="forbid")
    _allow_empty_trimmed_strings: ClassVar[frozenset[str]] = frozenset(
        {"licenseAbbreviation"}
    )

    @field_validator("*", mode="before")
    @classmethod
    def _non_empty_strings(cls, value: Any, info: Any) -> Any:
        if value is None:
            return value
        if isinstance(value, Enum):
            return value
        if isinstance(value, str):
            trimmed = value.strip()
            if not trimmed:
                if info.field_name in cls._allow_empty_trimmed_strings:
                    return trimmed
                raise ValueError(f"`{info.field_name}` must be a non-empty string")
            return trimmed
        return value

Task

Bases: str, Enum

Valid ML task types for a dataset submission.

Source code in src/datacollective/models.py
class Task(str, Enum):
    """Valid ML task types for a dataset submission."""

    NA = "N/A"
    NLP = "NLP"
    ASR = "ASR"
    LI = "LI"
    TTS = "TTS"
    MT = "MT"
    LM = "LM"
    LLM = "LLM"
    NLU = "NLU"
    NLG = "NLG"
    CALL = "CALL"
    RAG = "RAG"
    CV = "CV"
    ML = "ML"
    OTHER = "Other"

UploadPart

Bases: BaseModel

A single multipart upload part.

Source code in src/datacollective/models.py
class UploadPart(BaseModel):
    """A single multipart upload part."""

    model_config = ConfigDict(extra="forbid")

    partNumber: int = Field(..., ge=1)
    etag: str

datacollective.schema

ColumnMapping

Bases: BaseModel

A single column mapping entry inside a schema.

Used by index-based tasks to describe how columns in the index file map to logical fields and their data types.

Source code in src/datacollective/schema.py
class ColumnMapping(BaseModel):
    """
    A single column mapping entry inside a schema.

    Used by index-based tasks to describe how columns in the
    index file map to logical fields and their data types.
    """

    model_config = ConfigDict(frozen=True)

    source_column: str | int = Field(
        description="column name (str) or positional index (int) for headerless files"
    )
    dtype: str = "string"
    optional: bool = False
    path_match_strategy: Literal["direct", "exact", "contains"] = "direct"
    file_extension: str | None = Field(
        default=None,
        description=(
            'optional extension used when resolving file_path columns (e.g. ".wav")'
        ),
    )
    path_template: str | None = Field(
        default=None,
        description=(
            "optional template used to construct file_path values from one or more "
            "metadata columns. Supports relative paths and ${value}, e.g. "
            '"${Speaker ID}_khm_${Sentence ID}.wav" or "${Split}/${value}.wav"'
        ),
    )

ContentMapping

Bases: BaseModel

Describes how file contents / metadata map to DataFrame columns.

Used by glob-based tasks (e.g. LM) to specify how to extract text and metadata from files found via glob patterns. For example, the text content might come from the file contents, while metadata (e.g. language code) might come from the file name or parent directory.

Source code in src/datacollective/schema.py
class ContentMapping(BaseModel):
    """
    Describes how file contents / metadata map to DataFrame columns.

    Used by glob-based tasks (e.g. LM) to specify how to extract text and metadata
    from files found via glob patterns.  For example, the text content might come
    from the file contents, while metadata (e.g. language code) might come from
    the file name or parent directory.
    """

    model_config = ConfigDict(frozen=True)

    text: str | None = Field(default=None, description='e.g. "file_content"')
    meta_source: str | None = Field(default=None, description='e.g. "file_name"')

DatasetSchema

Bases: BaseModel

Task-agnostic representation of a dataset schema, as defined by a schema.yaml file.

Every schema must have dataset_id and task. The remaining fields depend on the task type and the root_strategy ("index" vs "glob").

New task types only need to populate the fields they care about; the loader registered for that task will decide which fields are required at load time.

Source code in src/datacollective/schema.py
class DatasetSchema(BaseModel):
    """
    Task-agnostic representation of a dataset schema, as defined by a ``schema.yaml`` file.

    Every schema **must** have ``dataset_id`` and ``task``.  The remaining
    fields depend on the task type and the ``root_strategy``
    (``"index"`` vs ``"glob"``).

    New task types only need to populate the fields they care about;
    the loader registered for that task will decide which fields are
    required at load time.
    """

    model_config = ConfigDict(frozen=False)

    dataset_id: str = Field(
        description="Unique identifier for the dataset in the registry"
    )
    task: str = Field(
        description="A task as defined in the MDC Platform e.g. ASR, TTS etc"
    )

    # --- Index-based strategy (ASR / TTS) ---
    format: str | None = Field(
        default=None,
        description=(
            'optional format hint (e.g. "csv", "tsv", "pipe"); '
            "inferred from the index file when omitted"
        ),
    )
    index_file: str | None = Field(default=None, description='e.g. "train.csv"')
    base_audio_path: str | list[str] | None = Field(
        default=None,
        description=(
            'e.g. "clips/" or ["clips/", "wavs/"]". Entries may also use '
            'metadata placeholders such as "${Split}/clips/".'
        ),
    )
    columns: dict[str, ColumnMapping] = Field(
        default_factory=dict, description="Mapping of index columns to logical fields"
    )
    separator: str | None = Field(
        default=None, description='explicit separator override (e.g. "|")'
    )
    has_header: bool = Field(
        default=True, description="whether the index file has a header row"
    )
    encoding: str = Field(
        default="utf-8", description='file encoding (e.g. "utf-8-sig" for BOM)'
    )

    # --- Glob-based strategy (LM, paired-file TTS) ---
    root_strategy: str | None = Field(
        default=None, description='"glob" | "paired_glob" | "multi_split"'
    )
    file_pattern: str | None = Field(default=None, description='e.g. "**/*.txt"')
    audio_extension: str | None = Field(
        default=None, description='for paired-file TTS: e.g. ".webm"'
    )
    content_mapping: ContentMapping | None = Field(
        default=None, description="Mapping for glob-based content extraction"
    )

    # --- Multi-split strategy (e.g. Common Voice) ---
    splits: list[str] | None = Field(
        default=None, description='split names to load, e.g. ["train", "dev", "test"]'
    )
    splits_file_pattern: str | None = Field(
        default=None, description='glob pattern for split files, e.g. "**/*.tsv"'
    )
    # --- Multi-section strategy
    sections: list[str] | None = None
    section_root: str | None = None

    # --- Schema versioning ---
    checksum: str | None = Field(
        default=None, description="archive checksum for cache validation"
    )

    # --- Catch-all for future / unknown keys ---
    extra: dict[str, Any] = Field(
        default_factory=dict, description="Catch-all for future / unknown keys"
    )

    def to_yaml_dict(self) -> dict[str, Any]:
        """
        Serialise the schema to a plain dict suitable for YAML output.

        Excludes fields that are at their default values so that the
        generated ``schema.yaml`` stays compact and readable.  The
        ``extra`` dict is merged into the top level.
        """
        data = self.model_dump(exclude_defaults=True, exclude={"extra"})
        # Merge extra keys into the top level
        if self.extra:
            data.update(self.extra)
        return data

to_yaml_dict()

Serialise the schema to a plain dict suitable for YAML output.

Excludes fields that are at their default values so that the generated schema.yaml stays compact and readable. The extra dict is merged into the top level.

Source code in src/datacollective/schema.py
def to_yaml_dict(self) -> dict[str, Any]:
    """
    Serialise the schema to a plain dict suitable for YAML output.

    Excludes fields that are at their default values so that the
    generated ``schema.yaml`` stays compact and readable.  The
    ``extra`` dict is merged into the top level.
    """
    data = self.model_dump(exclude_defaults=True, exclude={"extra"})
    # Merge extra keys into the top level
    if self.extra:
        data.update(self.extra)
    return data

datacollective.schema_loaders.base

BaseSchemaLoader

Bases: ABC

Interface that every task-specific loader must implement.

Parameters:

Name Type Description Default
schema DatasetSchema

The parsed schema for the dataset.

required
extract_dir Path

The directory where the dataset files have been extracted.

required
Source code in src/datacollective/schema_loaders/base.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
class BaseSchemaLoader(abc.ABC):
    """
    Interface that every task-specific loader must implement.

    Args:
        schema (DatasetSchema): The parsed schema for the dataset.
        extract_dir (Path): The directory where the dataset files have been extracted.
    """

    def __init__(self, schema: DatasetSchema, extract_dir: Path) -> None:
        self.schema = schema
        self.extract_dir = extract_dir.expanduser().resolve()
        self._resolved_index_file: Path | None = None
        self._dataset_root: Path | None = None
        self._audio_file_cache: dict[
            tuple[tuple[str, ...], str | None], list[Path]
        ] = {}

    @abc.abstractmethod
    def load(self) -> pd.DataFrame:
        """Load the dataset into a pandas DataFrame according to ``self.schema``."""
        ...

    def _load_index_file(self) -> pd.DataFrame:
        """Locate the index file and read it into a raw `~pandas.DataFrame`.

        Resolves the separator from ``schema.separator`` (explicit override) or
        ``schema.format`` via `FORMAT_SEP`, then delegates the file
        lookup to `_resolve_index_file`.

        Used by all index-based loaders (ASR, TTS, ...) so that each loader
        only needs to call `_apply_column_mappings` on the result.

        Returns:
            A raw (unmapped) DataFrame exactly as read from the index file.
        """
        index_path = self._resolve_index_file()
        return self._read_delimited_file(index_path)

    def _resolve_index_file(self) -> Path:
        """Find the index file inside the extracted directory.

        The method searches recursively and returns the shallowest match.

        Used by index-based loaders.

        Raises:
            FileNotFoundError: If no matching file is found.
        """
        if self._resolved_index_file is not None:
            return self._resolved_index_file

        assert self.schema.index_file is not None
        candidates = list(self.extract_dir.rglob(self.schema.index_file))
        if not candidates:
            raise FileNotFoundError(
                f"Index file '{self.schema.index_file}' not found "
                f"under '{self.extract_dir}'"
            )
        # Prefer the shallowest match
        candidates.sort(key=lambda p: len(p.parts))
        self._resolved_index_file = candidates[0]
        self._dataset_root = self._derive_dataset_root(
            self._resolved_index_file, self.schema.index_file
        )
        return self._resolved_index_file

    def _load_multi_sections(self) -> pd.DataFrame:
        """
        Parsing logic for archives with multiple directories, and each directory
        has its own index file. The section name is inferred from the parent directory of the index file.
        """
        sections = self._resolve_sections()
        df = pd.DataFrame()
        for section_path in sections:
            section_df = self._read_delimited_file(section_path)
            section_df["section"] = section_path.parents[0].name
            df = pd.concat([df, section_df])

        return df

    def _resolve_sections(self) -> list:
        """
        Get a list of valid sections, i.e. subdirectories that include an index file.
        """

        assert self.schema.sections is not None
        assert self.schema.index_file is not None
        assert self.schema.section_root is not None
        sections = self.schema.sections
        section_paths = []
        for section in sections:
            section_path = (
                self.extract_dir
                / Path(self.schema.section_root)
                / Path(section)
                / self.schema.index_file
            )
            if not section_path.exists():
                raise FileNotFoundError(f"Index file '{section_path}' not found ")
            section_paths.append(section_path)

        return section_paths

    def _apply_column_mappings(self, raw_df: pd.DataFrame) -> pd.DataFrame:
        """Select and rename columns according to the schema, applying dtype conversions.

        Used by index-based loaders.

        Raises:
            KeyError: If a required column is not found in *raw_df*.
        """
        result_cols: dict[str, pd.Series] = {}

        for logical_name, col_map in self.schema.columns.items():
            source = col_map.source_column
            resolved_source = self._resolve_source_column(raw_df, source)

            if resolved_source is None:
                if col_map.optional:
                    logger.debug(f"Optional column '{source}' not found — skipping.")
                    continue
                raise KeyError(
                    f"Required column '{source}' not found in index file. "
                    f"Available columns: {list(raw_df.columns)}"
                )

            series = raw_df[resolved_source]

            if col_map.dtype == "file_path":
                series = raw_df.apply(
                    lambda row, _col_map=col_map, _source=resolved_source: (
                        self._resolve_file_path(row[_source], _col_map, row)
                    ),
                    axis=1,
                )
            elif col_map.dtype == "file_content":
                series = raw_df.apply(
                    lambda row, _col_map=col_map, _source=resolved_source: (
                        self._load_file_content(row[_source], _col_map, row)
                    ),
                    axis=1,
                )
            elif col_map.dtype == "category":
                series = series.astype("category")
            elif col_map.dtype == "int":
                series = pd.to_numeric(series, errors="coerce").astype("Int64")
            elif col_map.dtype == "float":
                series = pd.to_numeric(series, errors="coerce")
            else:
                # default: treat as string
                series = series.astype(str)

            result_cols[logical_name] = series

        return pd.DataFrame(result_cols)

    def _read_delimited_file(self, file_path: Path) -> pd.DataFrame:
        sep = self._resolve_separator(file_path)
        header = "infer" if self.schema.has_header else None

        logger.debug(f"Reading delimited file: {file_path} (sep={sep!r})")
        df = self._read_csv(file_path, sep=sep, header=header)

        sniffed_sep = self._maybe_sniff_separator(file_path, df, sep)
        if sniffed_sep is not None and sniffed_sep != sep:
            logger.debug(
                "Retrying %s with sniffed separator %r instead of %r",
                file_path,
                sniffed_sep,
                sep,
            )
            df = self._read_csv(file_path, sep=sniffed_sep, header=header)

        return self._normalize_dataframe_columns(df)

    def _read_csv(
        self, file_path: Path, sep: str | None, header: str | None
    ) -> pd.DataFrame:
        kwargs: dict[str, object] = {
            "header": header,
            "encoding": self.schema.encoding,
            "skipinitialspace": True,
        }
        if sep is None:
            kwargs["sep"] = None
            kwargs["engine"] = "python"
        else:
            kwargs["sep"] = sep
        return pd.read_csv(file_path, **kwargs)

    def _resolve_separator(self, file_path: Path | None = None) -> str | None:
        if self.schema.separator:
            return self.schema.separator
        if self.schema.format:
            return FORMAT_SEP.get(self.schema.format.casefold())
        index_file_path = (
            Path(self.schema.index_file) if self.schema.index_file else None
        )
        for candidate in (file_path, index_file_path):
            if not candidate:
                continue
            suffix = candidate.suffix.casefold()
            if suffix in SUFFIX_SEP:
                return SUFFIX_SEP[suffix]
        return None

    def _maybe_sniff_separator(
        self, file_path: Path, raw_df: pd.DataFrame, initial_sep: str | None
    ) -> str | None:
        if self.schema.separator or len(raw_df.columns) != 1 or not self.schema.columns:
            return None

        required_sources = [
            col_map.source_column
            for col_map in self.schema.columns.values()
            if not col_map.optional
        ]
        if not required_sources:
            return None
        if all(
            self._resolve_source_column(raw_df, source) is not None
            for source in required_sources
        ):
            return None

        with file_path.open(
            "r", encoding=self.schema.encoding, errors="ignore"
        ) as handle:
            sample = handle.read(4096)

        delimiters = "".join(
            delim
            for delim in (",", "\t", "|", ";")
            if delim in sample and delim != initial_sep
        )
        if not delimiters:
            return None

        try:
            dialect = csv.Sniffer().sniff(sample, delimiters=delimiters)
        except csv.Error:
            return None

        return dialect.delimiter

    def _normalize_dataframe_columns(self, raw_df: pd.DataFrame) -> pd.DataFrame:
        if raw_df.empty and not len(raw_df.columns):
            return raw_df

        normalized_columns: list[str | int] = []
        for column in raw_df.columns:
            if isinstance(column, str):
                normalized_columns.append(column.replace("\ufeff", "").strip())
            else:
                normalized_columns.append(column)

        result = raw_df.copy()
        result.columns = normalized_columns
        return result

    def _resolve_source_column(
        self, raw_df: pd.DataFrame, source: str | int
    ) -> str | int | None:
        if source in raw_df.columns:
            return source
        if isinstance(source, int):
            return source if source in raw_df.columns else None

        stripped_source = source.strip()
        if stripped_source in raw_df.columns:
            return stripped_source

        normalized_source = self._normalize_column_key(stripped_source)
        matches = [
            column
            for column in raw_df.columns
            if isinstance(column, str)
            and self._normalize_column_key(column) == normalized_source
        ]
        if len(matches) == 1:
            return matches[0]
        if len(matches) > 1:
            raise KeyError(
                f"Column '{source}' matched multiple index columns after normalization: {matches}"
            )
        return None

    def _normalize_column_key(self, column: str) -> str:
        cleaned = column.replace("\ufeff", "").strip()
        return " ".join(cleaned.split()).casefold()

    def _resolve_file_path(
        self, value: object, col_map: ColumnMapping, row: pd.Series | None = None
    ) -> str:
        if pd.isna(value):
            return str(value)

        source_value = str(value).strip()
        raw_value = source_value
        if row is not None and col_map.path_template:
            raw_value = self._render_path_template(
                source_value, row, col_map.path_template
            )
        if not raw_value:
            return raw_value

        direct_candidates = self._build_direct_file_candidates(
            raw_value,
            col_map.file_extension,
            row=row,
            template_value=source_value,
        )
        for candidate in direct_candidates:
            if candidate.exists():
                return str(candidate)

        if col_map.path_match_strategy != "direct":
            matched_path = self._search_audio_file(
                raw_value,
                col_map,
                row=row,
                template_value=source_value,
            )
            if matched_path is not None:
                return str(matched_path)
            raise FileNotFoundError(
                f"Could not resolve file_path value '{raw_value}' using "
                f"path_match_strategy='{col_map.path_match_strategy}' "
                f"under base_audio_path={self.schema.base_audio_path!r}"
            )

        if direct_candidates:
            return str(direct_candidates[0])
        return raw_value

    def _load_file_content(
        self, value: object, col_map: ColumnMapping, row: pd.Series | None = None
    ) -> str:
        """Resolve a file path (like ``file_path`` dtype) and return its text content."""
        if pd.isna(value):  # if missing value, skip loading
            return str(value)

        # Remove whitespaces in the path
        raw = str(value).strip()
        parts = Path(raw).parts
        if parts:
            raw = str(Path(*[p.strip() for p in parts]))

        resolved = self._resolve_file_path(raw, col_map, row)
        path = Path(resolved)
        if path.is_file():
            return path.read_text(encoding=self.schema.encoding).strip()
        return resolved

    def _build_direct_file_candidates(
        self,
        raw_value: str,
        file_extension: str | None,
        row: pd.Series | None = None,
        template_value: str | None = None,
    ) -> list[Path]:
        relative_candidates = [Path(raw_value)]
        normalized_extension = self._normalize_extension(file_extension)
        if normalized_extension is not None and not Path(raw_value).suffix:
            relative_candidates.append(
                Path(raw_value).with_suffix(normalized_extension)
            )

        candidates: list[Path] = []
        seen: set[str] = set()
        for relative_candidate in relative_candidates:
            if relative_candidate.is_absolute():
                path_candidates = [relative_candidate]
            else:
                path_candidates = list(
                    root / relative_candidate
                    for root in self._get_audio_search_roots(
                        row=row, template_value=template_value or raw_value
                    )
                )
                dataset_root = self._get_dataset_root()
                path_candidates.append(dataset_root / relative_candidate)
                if dataset_root != self.extract_dir:
                    path_candidates.append(self.extract_dir / relative_candidate)

            for candidate in path_candidates:
                key = str(candidate)
                if key in seen:
                    continue
                seen.add(key)
                candidates.append(candidate)

        return candidates

    def _get_audio_search_roots(
        self,
        row: pd.Series | None = None,
        template_value: str | None = None,
    ) -> list[Path]:
        raw_paths = self.schema.base_audio_path
        dataset_root = self._get_dataset_root()
        if raw_paths is None or raw_paths == "":
            return [dataset_root]

        path_values = raw_paths if isinstance(raw_paths, list) else [raw_paths]
        roots: list[Path] = []
        seen: set[str] = set()
        for raw_path in path_values:
            if raw_path in (None, ""):
                root = dataset_root
            else:
                rendered_path = raw_path
                if row is not None and "${" in raw_path:
                    rendered_path = self._render_path_template(
                        template_value or "",
                        row,
                        raw_path,
                        template_name="base_audio_path",
                    )

                if rendered_path in (None, ""):
                    root = dataset_root
                else:
                    path = Path(rendered_path)
                    root = path if path.is_absolute() else dataset_root / path

            key = str(root)
            if key in seen:
                continue
            seen.add(key)
            roots.append(root)

        return roots or [dataset_root]

    def _search_audio_file(
        self,
        raw_value: str,
        col_map: ColumnMapping,
        row: pd.Series | None = None,
        template_value: str | None = None,
    ) -> Path | None:
        search_roots = self._get_audio_search_roots(
            row=row, template_value=template_value or raw_value
        )
        search_files = self._get_searchable_audio_files(
            search_roots, col_map.file_extension
        )
        normalized_extension = self._normalize_extension(col_map.file_extension)
        raw_path = Path(raw_value)
        expected_name = raw_path.name
        expected_stem = raw_path.stem if raw_path.suffix else raw_path.name
        normalized_value = raw_value.casefold()
        normalized_relative_value = raw_path.as_posix().casefold()
        normalized_relative_with_extension = None
        if not raw_path.suffix and normalized_extension is not None:
            normalized_relative_with_extension = (
                f"{normalized_relative_value}{normalized_extension.casefold()}"
            )
        matches: list[Path] = []
        seen_matches: set[str] = set()

        for candidate in search_files:
            is_match = False
            relative_paths = self._candidate_relative_paths(candidate, search_roots)
            if col_map.path_match_strategy == "exact":
                if candidate.name == expected_name:
                    is_match = True
                elif not raw_path.suffix and candidate.stem == expected_stem:
                    is_match = True
                elif (
                    not raw_path.suffix
                    and normalized_extension is not None
                    and candidate.name == f"{expected_name}{normalized_extension}"
                ):
                    is_match = True
                elif normalized_relative_value in relative_paths:
                    is_match = True
                elif (
                    normalized_relative_with_extension is not None
                    and normalized_relative_with_extension in relative_paths
                ):
                    is_match = True
            elif col_map.path_match_strategy == "contains":
                relative_strings = [
                    candidate.name.casefold(),
                    candidate.stem.casefold(),
                ]
                relative_strings.extend(relative_paths)
                if any(
                    normalized_value in relative_string
                    for relative_string in relative_strings
                ):
                    is_match = True

            if not is_match:
                continue

            candidate_key = str(candidate)
            if candidate_key in seen_matches:
                continue
            seen_matches.add(candidate_key)
            matches.append(candidate)

        if len(matches) > 1:
            raise ValueError(
                f"Ambiguous file_path value '{raw_value}' using "
                f"path_match_strategy='{col_map.path_match_strategy}'. "
                f"Matches: {[str(match) for match in matches[:5]]}"
            )
        return matches[0] if matches else None

    def _candidate_relative_paths(
        self, candidate: Path, search_roots: list[Path]
    ) -> list[str]:
        relative_paths: list[str] = []
        for root in search_roots:
            try:
                relative_paths.append(candidate.relative_to(root).as_posix().casefold())
            except ValueError:
                continue
        return relative_paths

    def _get_searchable_audio_files(
        self, search_roots: list[Path], file_extension: str | None
    ) -> list[Path]:
        normalized_extension = self._normalize_extension(file_extension)
        cache_key = (tuple(str(root) for root in search_roots), normalized_extension)
        cached = self._audio_file_cache.get(cache_key)
        if cached is not None:
            return cached

        files: list[Path] = []
        for root in search_roots:
            if root.is_file():
                if self._is_searchable_audio_file(root, normalized_extension):
                    files.append(root)
                continue
            if not root.exists():
                continue

            root_files = [
                path
                for path in root.rglob("*")
                if self._is_searchable_audio_file(path, normalized_extension)
            ]
            root_files.sort(
                key=lambda path: (len(path.relative_to(root).parts), str(path))
            )
            files.extend(root_files)

        self._audio_file_cache[cache_key] = files
        return files

    def _matches_extension(self, path: Path, extension: str | None) -> bool:
        if extension is None:
            return True
        return path.suffix.casefold() == extension.casefold()

    def _is_searchable_audio_file(self, path: Path, extension: str | None) -> bool:
        return (
            path.is_file()
            and not path.name.startswith("._")
            and self._matches_extension(path, extension)
        )

    def _normalize_extension(self, extension: str | None) -> str | None:
        if extension is None or extension == "":
            return None
        return extension if extension.startswith(".") else f".{extension}"

    def _get_dataset_root(self) -> Path:
        return self._dataset_root or self.extract_dir

    def _derive_dataset_root(
        self, resolved_path: Path, relative_path: str | None
    ) -> Path:
        if not relative_path:
            return resolved_path.parent

        relative = Path(relative_path)
        if relative.is_absolute():
            return relative.parent

        num_parts = len(relative.parts)
        if num_parts <= 1:
            return resolved_path.parent

        return resolved_path.parents[num_parts - 1]

    def _render_path_template(
        self,
        raw_value: str,
        row: pd.Series,
        template: str,
        template_name: str = "path_template",
    ) -> str:
        def replace(match: re.Match[str]) -> str:
            placeholder = match.group(1).strip()
            if placeholder == "value":
                return raw_value

            row_key = self._resolve_row_column(row, placeholder)
            if row_key is None:
                raise KeyError(
                    f"Could not render {template_name} placeholder '{placeholder}'. "
                    f"Available columns: {list(row.index)}"
                )

            cell_value = row[row_key]
            if pd.isna(cell_value):
                return ""
            return str(cell_value).strip()

        return re.sub(r"\$\{([^}]+)\}", replace, template)

    def _resolve_row_column(
        self, row: pd.Series, source: str | int
    ) -> str | int | None:
        if source in row.index:
            return source
        if isinstance(source, int):
            return source if source in row.index else None

        stripped_source = source.strip()
        if stripped_source in row.index:
            return stripped_source

        normalized_source = self._normalize_column_key(stripped_source)
        matches = [
            column
            for column in row.index
            if isinstance(column, str)
            and self._normalize_column_key(column) == normalized_source
        ]
        if len(matches) == 1:
            return matches[0]
        if len(matches) > 1:
            raise KeyError(
                f"Column '{source}' matched multiple row columns after normalization: {matches}"
            )
        return None

load() abstractmethod

Load the dataset into a pandas DataFrame according to self.schema.

Source code in src/datacollective/schema_loaders/base.py
@abc.abstractmethod
def load(self) -> pd.DataFrame:
    """Load the dataset into a pandas DataFrame according to ``self.schema``."""
    ...

Strategy

Bases: StrEnum

Loading strategies recognised by schema loaders.

Source code in src/datacollective/schema_loaders/base.py
class Strategy(StrEnum):
    """Loading strategies recognised by schema loaders."""

    MULTI_SPLIT = "multi_split"
    MULTI_SECTIONS = "multi_sections"
    PAIRED_GLOB = "paired_glob"
    GLOB = "glob"

datacollective.schema_loaders.registry

datacollective.schema_loaders.cache_schema

datacollective.schema_loaders.tasks.asr

ASRLoader

Bases: BaseSchemaLoader

Load an ASR dataset described by a DatasetSchema.

Source code in src/datacollective/schema_loaders/tasks/asr.py
class ASRLoader(BaseSchemaLoader):
    """Load an ASR dataset described by a `DatasetSchema`."""

    def __init__(self, schema: DatasetSchema, extract_dir: Path) -> None:
        super().__init__(schema, extract_dir)
        if schema.root_strategy == Strategy.MULTI_SPLIT:
            if not schema.splits:
                raise ValueError(
                    "ASR multi_split schema must specify 'splits' (list of split names)"
                )
        else:
            if not schema.index_file:
                raise ValueError("ASR schema must specify 'index_file'")
            if not schema.columns:
                raise ValueError(
                    "ASR schema must specify at least two column mappings for audio and transcription"
                )

    def load(self) -> pd.DataFrame:
        if self.schema.root_strategy == Strategy.MULTI_SPLIT:
            return self._load_multi_split()
        raw_df = self._load_index_file()
        return self._apply_column_mappings(raw_df)

    def _load_multi_split(self) -> pd.DataFrame:
        """
        Load all split TSV/CSV files whose stems match the ``splits`` list,
        add a ``split`` column to each, apply column mappings, and concatenate.
        """
        assert self.schema.splits is not None

        pattern = self.schema.splits_file_pattern or "**/*.tsv"
        allowed_splits = set(self.schema.splits)

        split_files: dict[str, Path] = {}
        for path in self.extract_dir.rglob(pattern):
            if path.stem in allowed_splits:
                # Prefer the shallowest match per split name
                if path.stem not in split_files or len(path.parts) < len(
                    split_files[path.stem].parts
                ):
                    split_files[path.stem] = path

        if not split_files:
            raise RuntimeError(
                f"No split files matching pattern '{pattern}' with stems in "
                f"{sorted(allowed_splits)} found under '{self.extract_dir}'"
            )

        frames: list[pd.DataFrame] = []

        for split_name, file_path in sorted(split_files.items()):
            logger.debug(f"Reading split '{split_name}' from {file_path}")
            raw_df = self._read_delimited_file(file_path)
            raw_df["split"] = split_name

            if self.schema.columns:
                mapped = self._apply_column_mappings(raw_df)
                mapped["split"] = split_name
                frames.append(mapped)
            else:
                frames.append(raw_df)

        return pd.concat(frames, ignore_index=True)

datacollective.schema_loaders.tasks.tts

TTSLoader

Bases: BaseSchemaLoader

Load a TTS dataset described by a DatasetSchema.

See docs/loaders/tts.md for details on supported loading strategies and schema fields.

Source code in src/datacollective/schema_loaders/tasks/tts.py
class TTSLoader(BaseSchemaLoader):
    """Load a TTS dataset described by a `DatasetSchema`.

    See docs/loaders/tts.md for details on supported loading strategies and schema fields.
    """

    def __init__(self, schema: DatasetSchema, extract_dir: Path) -> None:
        super().__init__(schema, extract_dir)

    def load(self) -> pd.DataFrame:
        if self.schema.root_strategy == Strategy.PAIRED_GLOB:
            return self._load_paired_glob()
        elif self.schema.root_strategy == Strategy.MULTI_SECTIONS:
            return self._load_multi_sections()
        return self._load_based_on_index()

    def _load_based_on_index(self) -> pd.DataFrame:
        """
        Load a TTS dataset using the "index" strategy, where an index file (e.g. CSV) maps audio paths to transcriptions.
        """
        if not self.schema.index_file:
            raise ValueError("TTS index-based schema must specify 'index_file'")

        raw_df = self._load_index_file()

        if not self.schema.columns:
            # No column mapping -> return the raw dataframe as-is
            return raw_df

        return self._apply_column_mappings(raw_df)

    def _load_paired_glob(self) -> pd.DataFrame:
        """
        Load a TTS dataset using the "paired_glob" strategy, where each audio file has a
        matching `.txt` file containing the transcription. The loader searches
        recursively for all text files matching the specified `file_pattern`,
        reads their contents, and pairs them with the corresponding audio files based
        on the same filename stem. The parent directory name of each text/audio pair
        is captured as a `split` column in the resulting DataFrame.
        """
        if not self.schema.file_pattern:
            raise ValueError("TTS paired_glob schema must specify 'file_pattern'")
        if not self.schema.audio_extension:
            raise ValueError("TTS paired_glob schema must specify 'audio_extension'")

        text_files = sorted(self.extract_dir.rglob(self.schema.file_pattern))
        if not text_files:
            raise FileNotFoundError(
                f"No files matching '{self.schema.file_pattern}' "
                f"found under '{self.extract_dir}'"
            )

        logger.debug(
            f"Found {len(text_files)} text files matching '{self.schema.file_pattern}'"
        )

        audio_ext = self.schema.audio_extension
        rows: list[dict[str, str]] = []

        for txt_path in text_files:
            audio_path = txt_path.with_suffix(audio_ext)
            if not audio_path.exists():
                logger.debug(
                    f"No matching audio file for '{txt_path.name}' — skipping."
                )
                continue

            transcription = txt_path.read_text(encoding=self.schema.encoding).strip()
            row: dict[str, str] = {
                "audio_path": str(audio_path),
                "transcription": transcription,
            }

            # Derive domain / split from parent directory name if present
            parent_name = txt_path.parent.name
            if parent_name:
                row["split"] = parent_name

            rows.append(row)

        if not rows:
            raise FileNotFoundError(
                f"No paired (text + {audio_ext}) files found under '{self.extract_dir}'"
            )

        return pd.DataFrame(rows)