1
0

Refactored to scale cli with click library

This commit is contained in:
HBrahmbhatt
2026-04-24 13:08:24 -07:00
parent 98696ddb29
commit f2677cb1ad
10 changed files with 377 additions and 145 deletions

View File

@@ -36,6 +36,21 @@ From the project root, install the local environment with:
poetry install poetry install
``` ```
## CLI Help
To see the available commands and their descriptions, run:
```powershell
poetry run github-datapipe --help
```
To see the available arguments and options for a specific command, use the `--help` flag with that command:
```powershell
poetry run github-datapipe sample-repos --help
poetry run github-datapipe fetch-commits --help
```
## Run Phase 1 ## Run Phase 1
The default phase 1 command collects 10 repositories using the default search query from `src/github_datapipe/core/config.py`. The default phase 1 command collects 10 repositories using the default search query from `src/github_datapipe/core/config.py`.

19
poetry.lock generated
View File

@@ -151,6 +151,21 @@ files = [
{file = "charset_normalizer-3.4.7.tar.gz", hash = "sha256:ae89db9e5f98a11a4bf50407d4363e7b09b31e55bc117b4f7d80aab97ba009e5"}, {file = "charset_normalizer-3.4.7.tar.gz", hash = "sha256:ae89db9e5f98a11a4bf50407d4363e7b09b31e55bc117b4f7d80aab97ba009e5"},
] ]
[[package]]
name = "click"
version = "8.3.3"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "click-8.3.3-py3-none-any.whl", hash = "sha256:a2bf429bb3033c89fa4936ffb35d5cb471e3719e1f3c8a7c3fff0b8314305613"},
{file = "click-8.3.3.tar.gz", hash = "sha256:398329ad4837b2ff7cbe1dd166a4c0f8900c3ca3a218de04466f38f6497f18a2"},
]
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]] [[package]]
name = "colorama" name = "colorama"
version = "0.4.6" version = "0.4.6"
@@ -158,7 +173,7 @@ description = "Cross-platform colored terminal text."
optional = false optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
groups = ["main"] groups = ["main"]
markers = "sys_platform == \"win32\"" markers = "sys_platform == \"win32\" or platform_system == \"Windows\""
files = [ files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
@@ -606,4 +621,4 @@ zstd = ["backports-zstd (>=1.0.0) ; python_version < \"3.14\""]
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.12" python-versions = ">=3.12"
content-hash = "3c8765a0ae32ec30eec764e892bef0c0e87524d351f88387e91b78c4b604d112" content-hash = "43a523cd40c7fc6a64072c377740862a41c1fd19a2be575974aa319838191e02"

View File

@@ -12,7 +12,8 @@ dependencies = [
"pyarrow (>=23.0.1,<24.0.0)", "pyarrow (>=23.0.1,<24.0.0)",
"python-dotenv (>=1.2.2,<2.0.0)", "python-dotenv (>=1.2.2,<2.0.0)",
"pytest (>=9.0.3,<10.0.0)", "pytest (>=9.0.3,<10.0.0)",
"requests-mock (>=1.12.1,<2.0.0)" "requests-mock (>=1.12.1,<2.0.0)",
"click (>=8.3.3,<9.0.0)"
] ]
[project.scripts] [project.scripts]

View File

@@ -1,6 +1,13 @@
"""
Command-line interface for the GitHub data pipeline.
This module provides the entry point for the CLI, allowing users to run different
phases of the pipeline, such as repository sampling and commit ingestion.
"""
from __future__ import annotations from __future__ import annotations
import argparse import click
from pathlib import Path from pathlib import Path
from github_datapipe.core.config import GithubConfig from github_datapipe.core.config import GithubConfig
@@ -17,79 +24,113 @@ from github_datapipe.phases.phase2_commit_ingestion.service import (
) )
def build_parser() -> argparse.ArgumentParser: @click.group()
parser = argparse.ArgumentParser(description="GitHub data pipeline CLI") def main() -> None:
subparsers = parser.add_subparsers(dest="command", required=True) """
GitHub data pipeline CLI.
sample_parser = subparsers.add_parser( Use the subcommands to run different phases of the data extraction process.
"sample-repos", """
help="Collect repositories from GitHub Search and save phase 1 outputs.", pass
)
sample_parser.add_argument(
"""
"""
@main.command(name="sample-repos")
@click.option(
"--count", "--count",
type=int, type=int,
default=GithubConfig.default_repo_count, default=GithubConfig.default_repo_count,
help=f"Number of repositories to sample. Defaults to {GithubConfig.default_repo_count}.", help=f"Number of repositories to sample. Defaults to {GithubConfig.default_repo_count}.",
) )
sample_parser.add_argument( @click.option(
"--query", "--query",
type=str, type=str,
default=None, default=GithubConfig.default_query,
help="Optional raw GitHub repository search query. Replaces config defaults when provided.", help="Optional raw GitHub repository search query. Replaces config defaults when provided.",
) )
sample_parser.add_argument( @click.option(
"--output-root", "--output-root",
type=Path, type=click.Path(path_type=Path),
default=Path(GithubConfig.default_output_root), default=Path(GithubConfig.default_output_root),
help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.", help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.",
) )
sample_parser.add_argument( @click.option(
"--mode", "--mode",
choices=("append-deduped", "fresh"), type=click.Choice(["append-deduped", "fresh"]),
default="append-deduped", default="fresh",
help="Whether to dedupe against the persisted seen-repo index or start a fresh phase-1 run.", help="Whether to dedupe against the persisted seen-repo index or start a fresh phase-1 run.",
) )
sample_parser.add_argument( @click.option(
"--per-page", "--per-page",
type=int, type=int,
default=GithubConfig.default_per_page, default=GithubConfig.default_per_page,
help=f"GitHub Search page size. Defaults to {GithubConfig.default_per_page}.", help=f"GitHub Search page size. Defaults to {GithubConfig.default_per_page}.",
) )
sample_parser.add_argument( @click.option(
"--run-id", "--run-id",
type=str, type=str,
default=None, default=None,
help="Optional run identifier. If omitted, a run id is generated automatically.", help="Optional run identifier. If omitted, a run id is generated automatically.",
) )
def sample_repos(
count: int,
query: str | None,
output_root: Path,
mode: str,
per_page: int,
run_id: str | None,
) -> None:
"""
Collect repositories from GitHub Search and save phase 1 outputs.
commit_parser = subparsers.add_parser( This command searches for repositories matching criteria and saves their metadata
"fetch-commits", to a JSONL file for further processing.
help="Fetch commit history for repositories collected in phase 1.", """
options = SampleReposOptions(
count=count,
output_root=output_root,
query=resolve_query(query),
per_page=per_page,
mode=mode,
run_id=run_id,
) )
commit_input = commit_parser.add_mutually_exclusive_group(required=True) result = sample_repositories(options)
commit_input.add_argument( click.echo(f"Run ID: {result['run_id']}")
click.echo(f"Collected repositories: {result['count_collected']}")
click.echo(f"Repositories file: {result['repos_path']}")
click.echo(f"Manifest file: {result['manifest_path']}")
if result["seen_index_path"] is not None:
click.echo(f"Seen repo index: {result['seen_index_path']}")
@main.command(name="fetch-commits")
@click.option(
"--run-id", "--run-id",
type=str, type=str,
required = True,
help="Run identifier whose phase 1 repository dataset should be consumed.", help="Run identifier whose phase 1 repository dataset should be consumed.",
) )
commit_input.add_argument( @click.option(
"--repos-file", "--repos-file",
type=Path, type=click.Path(exists=True, path_type=Path),
help="Path to a phase 1 repos.jsonl file.", help="Path to a phase 1 repos.jsonl file.",
) )
commit_parser.add_argument( @click.option(
"--output-root", "--output-root",
type=Path, type=click.Path(path_type=Path),
default=Path(GithubConfig.default_output_root), default=Path(GithubConfig.default_output_root),
help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.", help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.",
) )
commit_parser.add_argument( @click.option(
"--mode", "--mode",
choices=("refresh", "resume"), type=click.Choice(["refresh", "resume"]),
default="refresh", default="refresh",
help="Whether to fetch all repositories from scratch or skip those already marked complete.", help="Whether to fetch all repositories from scratch or skip those already marked complete.",
) )
commit_parser.add_argument( @click.option(
"--max-pages-per-repo", "--max-pages-per-repo",
type=int, type=int,
default=GithubConfig.default_max_pages_per_repo, default=GithubConfig.default_max_pages_per_repo,
@@ -97,69 +138,58 @@ def build_parser() -> argparse.ArgumentParser:
"Maximum number of commit pages to fetch per repository. " "Maximum number of commit pages to fetch per repository. "
f"Defaults to {GithubConfig.default_max_pages_per_repo}." f"Defaults to {GithubConfig.default_max_pages_per_repo}."
), ),
) )
commit_parser.add_argument( @click.option(
"--per-page", "--per-page",
type=int, type=int,
default=GithubConfig.default_per_page, default=GithubConfig.default_per_page,
help=f"GitHub commit page size. Defaults to {GithubConfig.default_per_page}.", help=f"GitHub commit page size. Defaults to {GithubConfig.default_per_page}.",
) )
commit_parser.add_argument( @click.option(
"--retry-count", "--retry-count",
type=int, type=int,
default=GithubConfig.default_retry_count, default=GithubConfig.default_retry_count,
help=f"Number of retries for repository metadata and commit requests. Defaults to {GithubConfig.default_retry_count}.", help=f"Number of retries for repository metadata and commit requests. Defaults to {GithubConfig.default_retry_count}.",
) )
def fetch_commits_cmd(
run_id: str | None,
repos_file: Path | None,
output_root: Path,
mode: str,
max_pages_per_repo: int,
per_page: int,
retry_count: int,
) -> None:
"""
Fetch commit history for repositories collected in phase 1.
return parser This command reads a list of repositories and downloads their commit history
using the GitHub API.
"""
if not run_id and not repos_file:
raise click.UsageError("Must provide either --run-id or --repos-file")
if run_id and repos_file:
raise click.UsageError("Cannot provide both --run-id and --repos-file")
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.command == "sample-repos":
options = SampleReposOptions(
count=args.count,
output_root=args.output_root,
query=resolve_query(args.query),
per_page=args.per_page,
mode=args.mode,
run_id=args.run_id,
)
result = sample_repositories(options)
print(f"Run ID: {result['run_id']}")
print(f"Collected repositories: {result['count_collected']}")
print(f"Repositories file: {result['repos_path']}")
print(f"Manifest file: {result['manifest_path']}")
if result["seen_index_path"] is not None:
print(f"Seen repo index: {result['seen_index_path']}")
return 0
if args.command == "fetch-commits":
options = FetchCommitsOptions( options = FetchCommitsOptions(
output_root=args.output_root, output_root=output_root,
run_id=args.run_id, run_id=run_id,
repos_file=args.repos_file, repos_file=repos_file,
mode=args.mode, mode=mode,
per_page=args.per_page, per_page=per_page,
max_pages_per_repo=args.max_pages_per_repo, max_pages_per_repo=max_pages_per_repo,
retry_count=args.retry_count, retry_count=retry_count,
) )
result = fetch_commits(options) result = fetch_commits(options)
print(f"Run ID: {result['run_id']}") click.echo(f"Run ID: {result['run_id']}")
print(f"Processed repositories: {result['processed_repositories']}") click.echo(f"Processed repositories: {result['processed_repositories']}")
print(f"Completed repositories: {result['completed_repositories']}") click.echo(f"Completed repositories: {result['completed_repositories']}")
print(f"Warning repositories: {result['warning_repositories']}") click.echo(f"Warning repositories: {result['warning_repositories']}")
print(f"Failed repositories: {result['failed_repositories']}") click.echo(f"Failed repositories: {result['failed_repositories']}")
print(f"Commits file: {result['commits_path']}") click.echo(f"Commits file: {result['commits_path']}")
print(f"Status file: {result['status_path']}") click.echo(f"Status file: {result['status_path']}")
print(f"Manifest file: {result['manifest_path']}") click.echo(f"Manifest file: {result['manifest_path']}")
return 0
parser.error(f"Unsupported command: {args.command}")
return 1
if __name__ == "__main__": if __name__ == "__main__":
raise SystemExit(main()) main()

View File

@@ -1,3 +1,10 @@
"""
Configuration settings for the GitHub data pipeline.
This module defines the GithubConfig dataclass, which holds default values
and endpoints for the GitHub API and the local execution environment.
"""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@@ -6,6 +13,21 @@ from pathlib import Path
@dataclass(frozen=True) @dataclass(frozen=True)
class GithubConfig: class GithubConfig:
"""
Centralized configuration for the GitHub data pipeline.
Attributes:
base_url: The root URL for the GitHub API.
api_version: The version of the GitHub API to use.
search_repositories_endpoint: The endpoint for searching repositories.
default_repo_count: Default number of repositories to sample.
default_per_page: Default results per page for API requests.
default_output_root: Default root directory for output files.
default_query: Default search query for repository sampling.
default_max_pages_per_repo: Default maximum pages of commits to fetch per repository.
default_retry_count: Default number of retries for failed API requests.
user_agent: User agent string to send with GitHub API requests.
"""
base_url: str = "https://api.github.com" base_url: str = "https://api.github.com"
api_version: str = "2022-11-28" api_version: str = "2022-11-28"
search_repositories_endpoint: str = "/search/repositories" search_repositories_endpoint: str = "/search/repositories"
@@ -18,4 +40,5 @@ class GithubConfig:
user_agent: str = "github-datapipe/0.1.0" user_agent: str = "github-datapipe/0.1.0"
# The root directory of the project, used for resolving relative paths.
ROOT_DIR = Path(__file__).resolve().parents[2] ROOT_DIR = Path(__file__).resolve().parents[2]

View File

@@ -1,3 +1,10 @@
"""
GitHub API client for the data pipeline.
This module provides a wrapper around the GitHub REST API, handling authentication,
headers, and common requests such as searching repositories and listing commits.
"""
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any
@@ -12,7 +19,20 @@ class GithubApiError(RuntimeError):
class GithubApiClient: class GithubApiClient:
"""
A client for interacting with the GitHub REST API.
Handles session management, authentication headers, and provides methods
for specific API endpoints.
"""
def __init__(self, token: str, session: requests.Session | None = None) -> None: def __init__(self, token: str, session: requests.Session | None = None) -> None:
"""
Initialize the client with a GitHub token.
Args:
token: GitHub Personal Access Token (PAT).
session: Optional pre-configured requests session.
"""
self._session = session or requests.Session() self._session = session or requests.Session()
self._session.headers.update( self._session.headers.update(
{ {
@@ -24,21 +44,63 @@ class GithubApiClient:
) )
def search_repositories(self, query: str, page: int, per_page: int) -> dict[str, Any]: def search_repositories(self, query: str, page: int, per_page: int) -> dict[str, Any]:
return self._get_json( """
Search for repositories matching a query.
Args:
query: Raw GitHub search query string.
page: Page number to retrieve.
per_page: Number of items per page.
Returns:
The JSON response payload from GitHub.
"""
return self._get_api_resp(
GithubConfig.search_repositories_endpoint, GithubConfig.search_repositories_endpoint,
params={"q": query, "page": page, "per_page": per_page}, params={"q": query, "page": page, "per_page": per_page},
) )
def get_repository(self, full_name: str) -> dict[str, Any]: def get_repository(self, full_name: str) -> dict[str, Any]:
return self._get_json(f"/repos/{full_name}") """
Retrieve metadata for a single repository.
Args:
full_name: The full name of the repository (e.g., 'owner/repo').
Returns:
The JSON response payload from GitHub.
"""
return self._get_api_resp(f"/repos/{full_name}")
def list_commits(self, full_name: str, branch: str, page: int, per_page: int) -> list[dict[str, Any]]: def list_commits(self, full_name: str, branch: str, page: int, per_page: int) -> list[dict[str, Any]]:
return self._get_json( """
List commits for a specific repository and branch.
Args:
full_name: The full name of the repository.
branch: The branch name (or SHA) to list commits from.
page: Page number to retrieve.
per_page: Number of items per page.
Returns:
A list of commit data payloads.
"""
return self._get_api_resp(
f"/repos/{full_name}/commits", f"/repos/{full_name}/commits",
params={"sha": branch, "page": page, "per_page": per_page}, params={"sha": branch, "page": page, "per_page": per_page},
) )
def _get_json(self, path: str, params: dict[str, Any] | None = None) -> Any: def _get_api_resp(self, path: str, params: dict[str, Any] | None = None) -> Any:
"""
Perform a GET request and return the JSON response.
Args:
path: The API path (excluding the base URL).
params: Optional query parameters.
Raises:
GithubApiError: If the request fails or returns an error status code.
"""
response = self._session.get( response = self._session.get(
f"{GithubConfig.base_url}{path}", f"{GithubConfig.base_url}{path}",
params=params, params=params,

View File

@@ -1,3 +1,10 @@
"""
Input/Output utilities for the data pipeline.
Provides helper functions for reading and writing JSON and JSONL (JSON Lines) files,
including support for appending to JSONL files.
"""
from __future__ import annotations from __future__ import annotations
import json import json
@@ -6,11 +13,13 @@ from typing import Any, Iterable
def write_json(path: Path, payload: Any) -> None: def write_json(path: Path, payload: Any) -> None:
"""Write a Python object to a file as formatted JSON."""
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8") path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> None: def write_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> None:
"""Write an iterable of dictionaries to a file in JSONL format."""
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle: with path.open("w", encoding="utf-8") as handle:
for row in rows: for row in rows:
@@ -19,6 +28,12 @@ def write_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> None:
def append_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> int: def append_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> int:
"""
Append an iterable of dictionaries to an existing JSONL file.
Returns:
The number of rows successfully appended.
"""
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
count = 0 count = 0
with path.open("a", encoding="utf-8") as handle: with path.open("a", encoding="utf-8") as handle:
@@ -30,10 +45,16 @@ def append_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> int:
def read_json(path: Path) -> Any: def read_json(path: Path) -> Any:
"""Read and parse a JSON file."""
return json.loads(path.read_text(encoding="utf-8")) return json.loads(path.read_text(encoding="utf-8"))
def read_jsonl(path: Path) -> list[dict[str, Any]]: def read_jsonl(path: Path) -> list[dict[str, Any]]:
"""
Read a JSONL file and return a list of dictionaries.
Returns an empty list if the file does not exist.
"""
if not path.exists(): if not path.exists():
return [] return []
rows: list[dict[str, Any]] = [] rows: list[dict[str, Any]] = []

View File

@@ -1,3 +1,10 @@
"""
Runtime environment and utility helpers.
Handles loading environment variables (like GitHub tokens), generating
unique run identifiers, and providing consistent UTC timestamps.
"""
from __future__ import annotations from __future__ import annotations
import os import os
@@ -10,6 +17,12 @@ load_dotenv()
def require_github_token() -> str: def require_github_token() -> str:
"""
Ensure a GitHub token is available in the environment.
Raises:
ValueError: If the `github_token` environment variable is missing.
"""
github_token = os.getenv("github_token") github_token = os.getenv("github_token")
if not github_token: if not github_token:
raise ValueError("GitHub token is not available in .env as `github_token`.") raise ValueError("GitHub token is not available in .env as `github_token`.")
@@ -17,8 +30,10 @@ def require_github_token() -> str:
def build_run_id() -> str: def build_run_id() -> str:
"""Generate a unique run ID based on the current timestamp and a random suffix."""
return f"run-{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}-{uuid4().hex[:8]}" return f"run-{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}-{uuid4().hex[:8]}"
def utc_now() -> str: def utc_now() -> str:
"""Return the current UTC time in ISO 8601 format."""
return datetime.now(tz=UTC).isoformat() return datetime.now(tz=UTC).isoformat()

View File

@@ -1,3 +1,10 @@
"""
Phase 1: Repository Sampling Service.
This service is responsible for searching GitHub for repositories based on a query,
filtering out previously seen repositories (if requested), and saving the results.
"""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@@ -12,6 +19,7 @@ from github_datapipe.core.runtime import build_run_id, require_github_token, utc
@dataclass(frozen=True) @dataclass(frozen=True)
class SampleReposOptions: class SampleReposOptions:
"""Options for the repository sampling phase."""
count: int = GithubConfig.default_repo_count count: int = GithubConfig.default_repo_count
output_root: Path = Path(GithubConfig.default_output_root) output_root: Path = Path(GithubConfig.default_output_root)
query: str = GithubConfig.default_query query: str = GithubConfig.default_query
@@ -21,10 +29,23 @@ class SampleReposOptions:
def resolve_query(query_override: str | None) -> str: def resolve_query(query_override: str | None) -> str:
"""Resolve the search query, using the override if provided, else the default."""
return query_override.strip() if query_override else GithubConfig.default_query return query_override.strip() if query_override else GithubConfig.default_query
def sample_repositories(options: SampleReposOptions) -> dict[str, Any]: def sample_repositories(options: SampleReposOptions) -> dict[str, Any]:
"""
Execute Phase 1: Sample repositories from GitHub.
Searches for repositories, handles pagination and deduplication, and saves
the collected repository data and a run manifest.
Args:
options: Configuration options for sampling.
Returns:
A dictionary containing the run ID and paths to generated files.
"""
token = require_github_token() token = require_github_token()
if options.count <= 0: if options.count <= 0:
raise ValueError("`count` must be greater than 0.") raise ValueError("`count` must be greater than 0.")
@@ -121,6 +142,7 @@ def normalize_repo_record(
sample_page: int, sample_page: int,
sampled_at: str, sampled_at: str,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Normalize a raw GitHub repository payload into our internal schema."""
return { return {
"run_id": run_id, "run_id": run_id,
"repo_id": repo["id"], "repo_id": repo["id"],
@@ -142,6 +164,7 @@ def normalize_repo_record(
def load_seen_repo_ids(path: Path) -> set[int]: def load_seen_repo_ids(path: Path) -> set[int]:
"""Load the set of repository IDs that have already been sampled."""
if not path.exists(): if not path.exists():
return set() return set()
payload = read_json(path) payload = read_json(path)

View File

@@ -1,3 +1,10 @@
"""
Phase 2: Commit Ingestion Service.
This service reads a list of repositories (usually from Phase 1), and for each one,
fetches its commit history from the GitHub API and saves it in JSONL format.
"""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@@ -12,6 +19,7 @@ from github_datapipe.core.runtime import require_github_token, utc_now
@dataclass(frozen=True) @dataclass(frozen=True)
class FetchCommitsOptions: class FetchCommitsOptions:
"""Options for the commit ingestion phase."""
output_root: Path = Path(GithubConfig.default_output_root) output_root: Path = Path(GithubConfig.default_output_root)
run_id: str | None = None run_id: str | None = None
repos_file: Path | None = None repos_file: Path | None = None
@@ -22,6 +30,18 @@ class FetchCommitsOptions:
def fetch_commits(options: FetchCommitsOptions) -> dict[str, Any]: def fetch_commits(options: FetchCommitsOptions) -> dict[str, Any]:
"""
Execute Phase 2: Fetch commit history for multiple repositories.
Iterates through repositories, downloads commits, handles errors and retries,
and updates the run manifest and status records.
Args:
options: Configuration options for ingestion.
Returns:
A dictionary containing processing statistics and file paths.
"""
token = require_github_token() token = require_github_token()
if options.max_pages_per_repo <= 0: if options.max_pages_per_repo <= 0:
raise ValueError("`max_pages_per_repo` must be greater than 0.") raise ValueError("`max_pages_per_repo` must be greater than 0.")
@@ -123,6 +143,7 @@ def fetch_commits(options: FetchCommitsOptions) -> dict[str, Any]:
def resolve_repo_input(options: FetchCommitsOptions, output_root: Path) -> tuple[Path, str]: def resolve_repo_input(options: FetchCommitsOptions, output_root: Path) -> tuple[Path, str]:
"""Resolve the input repository list from either a run ID or a specific file path."""
if options.run_id is not None: if options.run_id is not None:
return ( return (
output_root / options.run_id / "phase1_repository_sampling" / "repos.jsonl", output_root / options.run_id / "phase1_repository_sampling" / "repos.jsonl",
@@ -145,6 +166,7 @@ def process_repository(
max_pages_per_repo: int, max_pages_per_repo: int,
retry_count: int, retry_count: int,
) -> tuple[dict[str, Any], list[dict[str, Any]]]: ) -> tuple[dict[str, Any], list[dict[str, Any]]]:
"""Fetch all commits for a single repository, handling pagination and errors."""
full_name = repo["full_name"] full_name = repo["full_name"]
fetched_at = utc_now() fetched_at = utc_now()
@@ -230,6 +252,7 @@ def normalize_commit_record(
page_number: int, page_number: int,
fetched_at: str, fetched_at: str,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Normalize a raw GitHub commit payload into our internal schema."""
commit = commit_payload["commit"] commit = commit_payload["commit"]
author = commit.get("author") or {} author = commit.get("author") or {}
committer = commit.get("committer") or {} committer = commit.get("committer") or {}
@@ -258,6 +281,7 @@ def normalize_commit_record(
def with_retries(operation: Any, retry_count: int) -> Any: def with_retries(operation: Any, retry_count: int) -> Any:
"""Execute a function with a specified number of retries on GitHubApiError."""
last_error: Exception | None = None last_error: Exception | None = None
for _ in range(retry_count + 1): for _ in range(retry_count + 1):
try: try:
@@ -269,6 +293,7 @@ def with_retries(operation: Any, retry_count: int) -> Any:
def load_status_index(path: Path) -> dict[int, str]: def load_status_index(path: Path) -> dict[int, str]:
"""Load an index of repository processing statuses to support resumption."""
rows = read_jsonl(path) rows = read_jsonl(path)
latest_by_repo: dict[int, str] = {} latest_by_repo: dict[int, str] = {}
for row in rows: for row in rows:
@@ -277,6 +302,7 @@ def load_status_index(path: Path) -> dict[int, str]:
def dedupe_status_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: def dedupe_status_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Deduplicate status records, keeping only the latest record for each repository."""
latest_by_repo: dict[int, dict[str, Any]] = {} latest_by_repo: dict[int, dict[str, Any]] = {}
order: list[int] = [] order: list[int] = []
for row in rows: for row in rows:
@@ -288,6 +314,7 @@ def dedupe_status_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
def count_jsonl_rows(path: Path) -> int: def count_jsonl_rows(path: Path) -> int:
"""Return the number of lines in a JSONL file."""
if not path.exists(): if not path.exists():
return 0 return 0
return len(path.read_text(encoding="utf-8").splitlines()) return len(path.read_text(encoding="utf-8").splitlines())