1
0

Add phase 2 - download default branch commits for repositories collected in phase 1

This commit is contained in:
HBrahmbhatt
2026-04-23 19:16:50 -07:00
parent 96906ee00f
commit 98696ddb29
18 changed files with 991 additions and 381 deletions

121
README.md Normal file
View File

@@ -0,0 +1,121 @@
# GitHub Data Pipeline
This project builds a two-phase GitHub data pipeline:
- Phase 1 samples repositories from GitHub Search and stores repository metadata.
- Phase 2 reads the saved repositories, refreshes repository metadata, and downloads commit history from the current default branch.
The project uses Poetry for dependency management and a `src` package layout for Python modules.
## Project Layout
- `src/github_datapipe/core/`
Shared config, GitHub API, runtime, and file IO helpers.
- `src/github_datapipe/phases/phase1_repository_sampling/`
Phase 1 repository discovery and persistence.
- `src/github_datapipe/phases/phase2_commit_ingestion/`
Phase 2 commit fetching and persistence.
- `tests/`
Automated tests for the pipeline behavior.
## Prerequisites
- Python 3.12 installed locally
- Poetry installed locally
- GitHub personal access token stored in `.env` as:
```env
github_token=YOUR_TOKEN_HERE
```
## Install Dependencies
From the project root, install the local environment with:
```powershell
poetry install
```
## Run Phase 1
The default phase 1 command collects 10 repositories using the default search query from `src/github_datapipe/core/config.py`.
```powershell
poetry run github-datapipe sample-repos
```
Useful overrides:
```powershell
poetry run github-datapipe sample-repos --count 25
poetry run github-datapipe sample-repos --count 25 --query "is:public stars:>50 size:>5000 archived:false"
poetry run github-datapipe sample-repos --mode fresh
```
### Phase 1 Outputs
After phase 1 completes, check:
- `runs/<run_id>/phase1_repository_sampling/repos.jsonl`
- `runs/<run_id>/phase1_repository_sampling/manifest.json`
- `runs/seen_repo_ids.json`
The command prints the generated `run_id`, which you will use for phase 2.
## Run Phase 2
Phase 2 consumes the saved phase 1 repositories and downloads commit history from each repository's current default branch.
Run against a phase 1 run:
```powershell
poetry run github-datapipe fetch-commits --run-id <run_id>
```
Useful overrides:
```powershell
poetry run github-datapipe fetch-commits --run-id <run_id> --mode resume
poetry run github-datapipe fetch-commits --run-id <run_id> --max-pages-per-repo 3
poetry run github-datapipe fetch-commits --run-id <run_id> --per-page 50
```
### Phase 2 Defaults
- `mode=refresh`
- `per_page=100`
- `max_pages_per_repo=1`
The default `max_pages_per_repo=1` keeps the prototype small and limits commit downloads to the first page for each repository.
### Phase 2 Outputs
After phase 2 completes, check:
- `runs/<run_id>/phase2_commit_ingestion/commits/commits-0001.jsonl`
- `runs/<run_id>/phase2_commit_ingestion/repo_status.jsonl`
- `runs/<run_id>/phase2_commit_ingestion/manifest.json`
## Run Tests
Run the automated tests with:
```powershell
poetry run pytest -q
```
## Alternate CLI Invocation
If the Poetry script entrypoint is not available yet, run the CLI module directly:
```powershell
poetry run python -m github_datapipe.cli sample-repos
poetry run python -m github_datapipe.cli fetch-commits --run-id <run_id>
```
## Notes
- Phase 1 currently uses deterministic GitHub Search traversal rather than randomized sampling.
- Phase 2 stores one normalized commit per JSONL line.
- Resume mode in phase 2 skips repositories already marked `complete`.
- Truncated repositories are marked as `success_with_warning` when the page cap is reached.

View File

@@ -16,8 +16,12 @@ dependencies = [
] ]
[project.scripts] [project.scripts]
github-datapipe = "github_datapipe.extract_repos:main" github-datapipe = "github_datapipe.cli:main"
[tool.poetry]
packages = [
{ include = "github_datapipe", from = "src" }
]
[build-system] [build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"] requires = ["poetry-core>=2.0.0,<3.0.0"]

View File

@@ -0,0 +1 @@
"""GitHub data pipeline package."""

165
src/github_datapipe/cli.py Normal file
View File

@@ -0,0 +1,165 @@
from __future__ import annotations
import argparse
from pathlib import Path
from github_datapipe.core.config import GithubConfig
# Phase1 is concerned with querying the repo data
from github_datapipe.phases.phase1_repository_sampling.service import (
SampleReposOptions,
resolve_query,
sample_repositories,
)
# Phase 2 is concerned with downloading the commit data for repos collected in phase 1
from github_datapipe.phases.phase2_commit_ingestion.service import (
FetchCommitsOptions,
fetch_commits,
)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="GitHub data pipeline CLI")
subparsers = parser.add_subparsers(dest="command", required=True)
sample_parser = subparsers.add_parser(
"sample-repos",
help="Collect repositories from GitHub Search and save phase 1 outputs.",
)
sample_parser.add_argument(
"--count",
type=int,
default=GithubConfig.default_repo_count,
help=f"Number of repositories to sample. Defaults to {GithubConfig.default_repo_count}.",
)
sample_parser.add_argument(
"--query",
type=str,
default=None,
help="Optional raw GitHub repository search query. Replaces config defaults when provided.",
)
sample_parser.add_argument(
"--output-root",
type=Path,
default=Path(GithubConfig.default_output_root),
help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.",
)
sample_parser.add_argument(
"--mode",
choices=("append-deduped", "fresh"),
default="append-deduped",
help="Whether to dedupe against the persisted seen-repo index or start a fresh phase-1 run.",
)
sample_parser.add_argument(
"--per-page",
type=int,
default=GithubConfig.default_per_page,
help=f"GitHub Search page size. Defaults to {GithubConfig.default_per_page}.",
)
sample_parser.add_argument(
"--run-id",
type=str,
default=None,
help="Optional run identifier. If omitted, a run id is generated automatically.",
)
commit_parser = subparsers.add_parser(
"fetch-commits",
help="Fetch commit history for repositories collected in phase 1.",
)
commit_input = commit_parser.add_mutually_exclusive_group(required=True)
commit_input.add_argument(
"--run-id",
type=str,
help="Run identifier whose phase 1 repository dataset should be consumed.",
)
commit_input.add_argument(
"--repos-file",
type=Path,
help="Path to a phase 1 repos.jsonl file.",
)
commit_parser.add_argument(
"--output-root",
type=Path,
default=Path(GithubConfig.default_output_root),
help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.",
)
commit_parser.add_argument(
"--mode",
choices=("refresh", "resume"),
default="refresh",
help="Whether to fetch all repositories from scratch or skip those already marked complete.",
)
commit_parser.add_argument(
"--max-pages-per-repo",
type=int,
default=GithubConfig.default_max_pages_per_repo,
help=(
"Maximum number of commit pages to fetch per repository. "
f"Defaults to {GithubConfig.default_max_pages_per_repo}."
),
)
commit_parser.add_argument(
"--per-page",
type=int,
default=GithubConfig.default_per_page,
help=f"GitHub commit page size. Defaults to {GithubConfig.default_per_page}.",
)
commit_parser.add_argument(
"--retry-count",
type=int,
default=GithubConfig.default_retry_count,
help=f"Number of retries for repository metadata and commit requests. Defaults to {GithubConfig.default_retry_count}.",
)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.command == "sample-repos":
options = SampleReposOptions(
count=args.count,
output_root=args.output_root,
query=resolve_query(args.query),
per_page=args.per_page,
mode=args.mode,
run_id=args.run_id,
)
result = sample_repositories(options)
print(f"Run ID: {result['run_id']}")
print(f"Collected repositories: {result['count_collected']}")
print(f"Repositories file: {result['repos_path']}")
print(f"Manifest file: {result['manifest_path']}")
if result["seen_index_path"] is not None:
print(f"Seen repo index: {result['seen_index_path']}")
return 0
if args.command == "fetch-commits":
options = FetchCommitsOptions(
output_root=args.output_root,
run_id=args.run_id,
repos_file=args.repos_file,
mode=args.mode,
per_page=args.per_page,
max_pages_per_repo=args.max_pages_per_repo,
retry_count=args.retry_count,
)
result = fetch_commits(options)
print(f"Run ID: {result['run_id']}")
print(f"Processed repositories: {result['processed_repositories']}")
print(f"Completed repositories: {result['completed_repositories']}")
print(f"Warning repositories: {result['warning_repositories']}")
print(f"Failed repositories: {result['failed_repositories']}")
print(f"Commits file: {result['commits_path']}")
print(f"Status file: {result['status_path']}")
print(f"Manifest file: {result['manifest_path']}")
return 0
parser.error(f"Unsupported command: {args.command}")
return 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1 @@
"""Shared building blocks for GitHub pipeline phases."""

View File

@@ -4,7 +4,7 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
@dataclass(frozen=True) # This makes the class immutable so in the process, we do not accidently change the variable values of this @dataclass(frozen=True)
class GithubConfig: class GithubConfig:
base_url: str = "https://api.github.com" base_url: str = "https://api.github.com"
api_version: str = "2022-11-28" api_version: str = "2022-11-28"
@@ -13,9 +13,9 @@ class GithubConfig:
default_per_page: int = 100 default_per_page: int = 100
default_output_root: str = "runs" default_output_root: str = "runs"
default_query: str = "is:public stars:>10 size:>1000 archived:false fork:false created:>2025-07-15" default_query: str = "is:public stars:>10 size:>1000 archived:false fork:false created:>2025-07-15"
default_max_pages_per_repo: int = 1
default_retry_count: int = 2
user_agent: str = "github-datapipe/0.1.0" user_agent: str = "github-datapipe/0.1.0"
ROOT_DIR = Path(__file__).resolve().parents[2]
ROOT_DIR = Path(__file__).resolve().parent

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from typing import Any
import requests
from github_datapipe.core.config import GithubConfig
class GithubApiError(RuntimeError):
"""Raised when GitHub responds with an error payload."""
class GithubApiClient:
def __init__(self, token: str, session: requests.Session | None = None) -> None:
self._session = session or requests.Session()
self._session.headers.update(
{
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": GithubConfig.api_version,
"User-Agent": GithubConfig.user_agent,
}
)
def search_repositories(self, query: str, page: int, per_page: int) -> dict[str, Any]:
return self._get_json(
GithubConfig.search_repositories_endpoint,
params={"q": query, "page": page, "per_page": per_page},
)
def get_repository(self, full_name: str) -> dict[str, Any]:
return self._get_json(f"/repos/{full_name}")
def list_commits(self, full_name: str, branch: str, page: int, per_page: int) -> list[dict[str, Any]]:
return self._get_json(
f"/repos/{full_name}/commits",
params={"sha": branch, "page": page, "per_page": per_page},
)
def _get_json(self, path: str, params: dict[str, Any] | None = None) -> Any:
response = self._session.get(
f"{GithubConfig.base_url}{path}",
params=params,
timeout=30,
)
if response.status_code >= 400:
try:
payload = response.json()
except ValueError:
payload = {"message": response.text}
message = payload.get("message", "GitHub API request failed")
raise GithubApiError(f"GitHub request failed ({response.status_code}): {message}")
return response.json()

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Iterable
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row))
handle.write("\n")
def append_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> int:
path.parent.mkdir(parents=True, exist_ok=True)
count = 0
with path.open("a", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row))
handle.write("\n")
count += 1
return count
def read_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def read_jsonl(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
rows: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
if line.strip():
rows.append(json.loads(line))
return rows

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
import os
from datetime import UTC, datetime
from uuid import uuid4
from dotenv import load_dotenv
load_dotenv()
def require_github_token() -> str:
github_token = os.getenv("github_token")
if not github_token:
raise ValueError("GitHub token is not available in .env as `github_token`.")
return github_token
def build_run_id() -> str:
return f"run-{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}-{uuid4().hex[:8]}"
def utc_now() -> str:
return datetime.now(tz=UTC).isoformat()

View File

@@ -1,286 +0,0 @@
from __future__ import annotations
import json
import os
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from uuid import uuid4
import requests
from dotenv import load_dotenv
from github_datapipe.config import GithubConfig
load_dotenv()
class GithubApiError(RuntimeError):
"""Raised when GitHub responds with an error payload."""
@dataclass(frozen=True)
class SampleReposOptions:
"""
To set configuration options for the GitHub repository sampling run.
Attributes:
1. count (int): The total number of unique repositories to collect. Must be > 0. Defaults to GithubConfig.default_repo_count
2. output_root: The base directory where the run folder will be created to store the search repository data.
3. query: The github search string using github offered query params.
4. per_page: The number of results to fetch per API call. Max is 100.
5. mode: 'fresh' (starting a fresh run) or "append-deduped" (if you want to resume half-done previous run and avoid duplicates).
6. run_id: To keep a trace of current run by giving it a unique id.
"""
count: int = GithubConfig.default_repo_count
output_root: Path = Path(GithubConfig.default_output_root)
query: str = GithubConfig.default_query
per_page: int = GithubConfig.default_per_page
mode: str = "append-deduped"
run_id: str | None = None
def resolve_query(query_override: str | None) -> str:
""" To override default query params with user inserted query params. """
return query_override.strip() if query_override else GithubConfig.default_query
class GithubRepoSampler:
def __init__(self, token: str, session: requests.Session | None = None) -> None:
"""
Everytime an instance of GithubRepoSampler is created, this method will be called first to open a session and set headers.
Args:
token (str): A valid GitHub Personal Access Token (PAT).
session (requests.Session | None, optional): An existing session to use. If None, a new session is created.
"""
self._session = session or requests.Session()
self._session.headers.update(
{
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": GithubConfig.api_version,
"User-Agent": GithubConfig.user_agent,
}
)
def search_repositories(self, query: str, page: int, per_page: int) -> dict[str, Any]:
"""
Calls github's /search/repositories endpoint passing query params and other.
Args:
query (str): The GitHub-formatted search query string.
page (int): The specific page of results to fetch.
per_page (int): The number of items to return per page (max 100).
Returns:
dict[str, Any]: The raw JSON payload returned by the GitHub API.
Raises:
GithubApiError: If the API responds with an HTTP status code >= 400
"""
response = self._session.get(
f"{GithubConfig.base_url}{GithubConfig.search_repositories_endpoint}",
params={"q": query, "page": page, "per_page": per_page},
timeout=30,
)
if response.status_code >= 400:
try:
payload = response.json()
except ValueError:
payload = {"message": response.text}
message = payload.get("message", "GitHub API request failed")
raise GithubApiError(f"GitHub search failed ({response.status_code}): {message}")
return response.json()
def sample_repositories(options: SampleReposOptions) -> dict[str, Any]:
"""
Orchestrates the end-to-end execution of a GitHub repository data pull.
This function handles the complete lifecycle of a sampling run: validating credentials,
setting up the local file system for the run, managing state to prevent duplicate
downloads, paginating through the GitHub Search API, and saving the normalized data.
Effects:
- Creates a timestamped run directory inside `options.output_root`.
- Writes newly fetched repository data to `repos.jsonl`.
- Writes or updates a global `seen_repo_ids.json` tracker file.
- Writes a `manifest.json` logging the run's parameters and results.
Args:
options (SampleReposOptions): The configuration parameters for the run.
Returns:
dict[str, Any]: A summary dictionary containing the `run_id`, the total
`count_collected`, and the absolute paths to the generated output files.
"""
# 1. SETUP: Validate the token and creds
github_token = os.getenv("github_token")
if not github_token:
raise ValueError("GitHub token is not available in .env as `github_token`.")
if options.count <= 0:
raise ValueError("`count` must be greater than 0.")
# 2. FILE SYSTEM: Create isolated folders for this specific run
run_id = options.run_id or build_run_id()
output_root = options.output_root.resolve() # returns an absolute path
run_root = output_root / run_id # isolated folder created for one single execution (or "run") of this script.
phase1_root = run_root / "phase1" # the search phase is phase 1
phase1_root.mkdir(parents=True, exist_ok=True)
# Define exact file paths for where data will live
seen_index_path = output_root / "seen_repo_ids.json" # Notes the last seen repo's id
repos_path = phase1_root / "repos.jsonl" # Actual repo metadata
manifest_path = phase1_root / "manifest.json" # log entry
# 3. STATE MGMT: seen_repo_ids will make sure that duplicated data doesn't end up in the result
seen_repo_ids = set() if options.mode == "fresh" else load_seen_repo_ids(seen_index_path)
current_run_seen_repo_ids: set[int] = set()
# 4. NETWORK SETUP: Initialize the session wrapper
sampler = GithubRepoSampler(token=github_token)
accepted_repos: list[dict[str, Any]] = []
page = 1
total_count: int | None = None
# 5. THE MAIN LOOP: Keep fetching until the requested target count is hit
while len(accepted_repos) < options.count:
payload = sampler.search_repositories(
query=options.query,
page=page,
per_page=options.per_page,
)
# Github Search API response = JSON Object containing { search metadata, items: [{ repo obj1 }, { repo obj 2} ...] }
items = payload.get("items", [])
total_count = payload.get("total_count", total_count) # the number of the repos matching your query at exact moment in time
# Break safety: If GitHub returns an empty list, we've exhausted all results of current api call
if not items:
break
sampled_at = utc_now() # to keep note of current time incase we need to revise it
# Process each repository in the current result present in items
for repo in items:
repo_id = int(repo["id"])
if repo_id in seen_repo_ids or repo_id in current_run_seen_repo_ids:
continue
normalized_repo = normalize_repo_record(
repo=repo,
run_id=run_id,
sample_query=options.query,
sample_page=page,
sampled_at=sampled_at,
)
accepted_repos.append(normalized_repo)
current_run_seen_repo_ids.add(repo_id)
if len(accepted_repos) >= options.count:
break
page += 1
write_jsonl(repos_path, accepted_repos)
if options.mode != "fresh":
seen_repo_ids.update(current_run_seen_repo_ids)
write_json(seen_index_path, sorted(seen_repo_ids))
# manifest is a log object that denotes what kind of input was given to achieve certain output.
manifest = {
"run_id": run_id,
"command": "sample-repos",
"resolved_query": options.query,
"count_requested": options.count,
"count_collected": len(accepted_repos),
"per_page": options.per_page,
"mode": options.mode,
"page_count_scanned": max(page - 1, 0),
"github_total_count": total_count,
"output_files": {
"repos_jsonl": str(repos_path),
"seen_repo_ids_json": str(seen_index_path) if options.mode != "fresh" else None,
},
"sampled_at": utc_now(),
}
write_json(manifest_path, manifest)
return {
"run_id": run_id,
"count_collected": len(accepted_repos),
"repos_path": repos_path,
"manifest_path": manifest_path,
"seen_index_path": seen_index_path if options.mode != "fresh" else None,
}
def normalize_repo_record(
repo: dict[str, Any],
run_id: str,
sample_query: str,
sample_page: int,
sampled_at: str,
) -> dict[str, Any]:
"""
Extracts and flattens relevant fields from a raw GitHub repository payload.
Args:
repo (dict[str, Any]): The raw JSON dictionary for a single repository
as returned by the GitHub API.
run_id (str): The unique identifier for the current sampling run.
sample_query (str): The search string used to find this repository.
sample_page (int): The pagination page number this repository was found on.
sampled_at (str): ISO-8601 formatted timestamp of when the record was fetched.
Returns:
dict[str, Any]: A flat dictionary containing the filtered repository metrics
(e.g., repo_id, language, stargazers_count, size_kb) and traceability data.
"""
return {
"run_id": run_id,
"repo_id": repo["id"],
"full_name": repo["full_name"],
"html_url": repo["html_url"],
"api_url": repo["url"],
"default_branch": repo.get("default_branch"),
"language": repo.get("language"),
"description": repo.get("description"),
"stargazers_count": repo.get("stargazers_count"),
"size_kb": repo.get("size"),
"fork": repo.get("fork"),
"archived": repo.get("archived"),
"visibility": repo.get("visibility"),
"sample_query": sample_query,
"sample_page": sample_page,
"sampled_at": sampled_at,
}
def load_seen_repo_ids(path: Path) -> set[int]:
if not path.exists():
return set()
payload = json.loads(path.read_text(encoding="utf-8"))
return {int(repo_id) for repo_id in payload}
def write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
def write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row))
handle.write("\n")
def build_run_id() -> str:
return f"run-{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}-{uuid4().hex[:8]}"
def utc_now() -> str:
return datetime.now(tz=UTC).isoformat()

View File

@@ -1,86 +0,0 @@
from __future__ import annotations
import argparse
from pathlib import Path
from github_datapipe.config import GithubConfig
from github_datapipe.extract_repos import SampleReposOptions, resolve_query, sample_repositories
### parser that unpacks the command line args passed as queries and
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="GitHub data pipeline CLI")
subparsers = parser.add_subparsers(dest="command", required=True)
sample_parser = subparsers.add_parser(
"sample-repos",
help="Collect repositories from GitHub Search and save phase 1 outputs.",
)
sample_parser.add_argument(
"--count",
type=int,
default=GithubConfig.default_repo_count,
help=f"Number of repositories to sample. Defaults to {GithubConfig.default_repo_count}.",
)
sample_parser.add_argument(
"--query",
type=str,
default=None,
help="Optional raw GitHub repository search query. Replaces config defaults when provided.",
)
sample_parser.add_argument(
"--output-root",
type=Path,
default=Path(GithubConfig.default_output_root),
help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.",
)
sample_parser.add_argument(
"--mode",
choices=("append-deduped", "fresh"),
default="append-deduped",
help="Whether to dedupe against the persisted seen-repo index or start a fresh phase-1 run.",
)
sample_parser.add_argument(
"--per-page",
type=int,
default=GithubConfig.default_per_page,
help=f"GitHub Search page size. Defaults to {GithubConfig.default_per_page}.",
)
sample_parser.add_argument(
"--run-id",
type=str,
default=None,
help="Optional run identifier. If omitted, a run id is generated automatically.",
)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.command == "sample-repos":
options = SampleReposOptions(
count=args.count,
output_root=args.output_root,
query=resolve_query(args.query),
per_page=args.per_page,
mode=args.mode,
run_id=args.run_id,
)
# result will return output from the extract repo's sample_repositories
result = sample_repositories(options)
print(f"Run ID: {result['run_id']}")
print(f"Collected repositories: {result['count_collected']}")
print(f"Repositories file: {result['repos_path']}")
print(f"Manifest file: {result['manifest_path']}")
if result["seen_index_path"] is not None:
print(f"Seen repo index: {result['seen_index_path']}")
return 0
parser.error(f"Unsupported command: {args.command}")
return 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1 @@
"""Pipeline phases for repository sampling and commit ingestion."""

View File

@@ -0,0 +1 @@
"""Phase 1: repository discovery and persistence."""

View File

@@ -0,0 +1,148 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from github_datapipe.core.config import GithubConfig
from github_datapipe.core.github_api import GithubApiClient
from github_datapipe.core.io import read_json, write_json, write_jsonl
from github_datapipe.core.runtime import build_run_id, require_github_token, utc_now
@dataclass(frozen=True)
class SampleReposOptions:
count: int = GithubConfig.default_repo_count
output_root: Path = Path(GithubConfig.default_output_root)
query: str = GithubConfig.default_query
per_page: int = GithubConfig.default_per_page
mode: str = "append-deduped"
run_id: str | None = None
def resolve_query(query_override: str | None) -> str:
return query_override.strip() if query_override else GithubConfig.default_query
def sample_repositories(options: SampleReposOptions) -> dict[str, Any]:
token = require_github_token()
if options.count <= 0:
raise ValueError("`count` must be greater than 0.")
run_id = options.run_id or build_run_id()
output_root = options.output_root.resolve()
run_root = output_root / run_id
phase_root = run_root / "phase1_repository_sampling"
phase_root.mkdir(parents=True, exist_ok=True)
seen_index_path = output_root / "seen_repo_ids.json"
repos_path = phase_root / "repos.jsonl"
manifest_path = phase_root / "manifest.json"
seen_repo_ids = set() if options.mode == "fresh" else load_seen_repo_ids(seen_index_path)
current_run_seen_repo_ids: set[int] = set()
client = GithubApiClient(token=token)
accepted_repos: list[dict[str, Any]] = []
page = 1
total_count: int | None = None
while len(accepted_repos) < options.count:
payload = client.search_repositories(
query=options.query,
page=page,
per_page=options.per_page,
)
items = payload.get("items", [])
total_count = payload.get("total_count", total_count)
if not items:
break
sampled_at = utc_now()
for repo in items:
repo_id = int(repo["id"])
if repo_id in seen_repo_ids or repo_id in current_run_seen_repo_ids:
continue
accepted_repos.append(
normalize_repo_record(
repo=repo,
run_id=run_id,
sample_query=options.query,
sample_page=page,
sampled_at=sampled_at,
)
)
current_run_seen_repo_ids.add(repo_id)
if len(accepted_repos) >= options.count:
break
page += 1
write_jsonl(repos_path, accepted_repos)
if options.mode != "fresh":
seen_repo_ids.update(current_run_seen_repo_ids)
write_json(seen_index_path, sorted(seen_repo_ids))
manifest = {
"run_id": run_id,
"phase_name": "phase1_repository_sampling",
"command": "sample-repos",
"resolved_query": options.query,
"count_requested": options.count,
"count_collected": len(accepted_repos),
"per_page": options.per_page,
"mode": options.mode,
"page_count_scanned": max(page - 1, 0),
"github_total_count": total_count,
"output_files": {
"repos_jsonl": str(repos_path),
"seen_repo_ids_json": str(seen_index_path) if options.mode != "fresh" else None,
},
"sampled_at": utc_now(),
}
write_json(manifest_path, manifest)
return {
"run_id": run_id,
"count_collected": len(accepted_repos),
"repos_path": repos_path,
"manifest_path": manifest_path,
"seen_index_path": seen_index_path if options.mode != "fresh" else None,
}
def normalize_repo_record(
repo: dict[str, Any],
run_id: str,
sample_query: str,
sample_page: int,
sampled_at: str,
) -> dict[str, Any]:
return {
"run_id": run_id,
"repo_id": repo["id"],
"full_name": repo["full_name"],
"html_url": repo["html_url"],
"api_url": repo["url"],
"default_branch": repo.get("default_branch"),
"language": repo.get("language"),
"description": repo.get("description"),
"stargazers_count": repo.get("stargazers_count"),
"size_kb": repo.get("size"),
"fork": repo.get("fork"),
"archived": repo.get("archived"),
"visibility": repo.get("visibility"),
"sample_query": sample_query,
"sample_page": sample_page,
"sampled_at": sampled_at,
}
def load_seen_repo_ids(path: Path) -> set[int]:
if not path.exists():
return set()
payload = read_json(path)
return {int(repo_id) for repo_id in payload}

View File

@@ -0,0 +1 @@
"""Phase 2: commit history retrieval and persistence."""

View File

@@ -0,0 +1,293 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from github_datapipe.core.config import GithubConfig
from github_datapipe.core.github_api import GithubApiClient, GithubApiError
from github_datapipe.core.io import append_jsonl, read_json, read_jsonl, write_json, write_jsonl
from github_datapipe.core.runtime import require_github_token, utc_now
@dataclass(frozen=True)
class FetchCommitsOptions:
output_root: Path = Path(GithubConfig.default_output_root)
run_id: str | None = None
repos_file: Path | None = None
mode: str = "refresh"
per_page: int = GithubConfig.default_per_page
max_pages_per_repo: int = GithubConfig.default_max_pages_per_repo
retry_count: int = GithubConfig.default_retry_count
def fetch_commits(options: FetchCommitsOptions) -> dict[str, Any]:
token = require_github_token()
if options.max_pages_per_repo <= 0:
raise ValueError("`max_pages_per_repo` must be greater than 0.")
if options.per_page <= 0:
raise ValueError("`per_page` must be greater than 0.")
output_root = options.output_root.resolve()
repos_path, run_id = resolve_repo_input(options, output_root)
run_root = output_root / run_id
phase_root = run_root / "phase2_commit_ingestion"
commits_root = phase_root / "commits"
commits_path = commits_root / "commits-0001.jsonl"
status_path = phase_root / "repo_status.jsonl"
manifest_path = phase_root / "manifest.json"
repos = read_jsonl(repos_path)
if not repos:
raise ValueError(f"No repositories found in {repos_path}.")
prior_statuses = load_status_index(status_path) if options.mode == "resume" else {}
if options.mode == "refresh" and commits_path.exists():
commits_path.unlink()
status_rows: list[dict[str, Any]] = []
if options.mode == "resume":
status_rows.extend(read_jsonl(status_path))
processed_repositories = 0
completed_repositories = 0
warning_repositories = 0
failed_repositories = 0
written_commit_count = 0 if options.mode == "refresh" else count_jsonl_rows(commits_path)
client: GithubApiClient | None = None
for repo in repos:
repo_id = int(repo["repo_id"])
previous_status = prior_statuses.get(repo_id)
if options.mode == "resume" and previous_status == "complete":
continue
if client is None:
client = GithubApiClient(token=token)
processed_repositories += 1
status_record, commit_rows = process_repository(
client=client,
repo=repo,
run_id=run_id,
per_page=options.per_page,
max_pages_per_repo=options.max_pages_per_repo,
retry_count=options.retry_count,
)
status_rows.append(status_record)
written_commit_count += append_jsonl(commits_path, commit_rows)
if status_record["status"] == "complete":
completed_repositories += 1
elif status_record["status"] == "success_with_warning":
warning_repositories += 1
else:
failed_repositories += 1
write_jsonl(status_path, dedupe_status_rows(status_rows))
manifest = {
"run_id": run_id,
"phase_name": "phase2_commit_ingestion",
"command": "fetch-commits",
"mode": options.mode,
"per_page": options.per_page,
"max_pages_per_repo": options.max_pages_per_repo,
"retry_count": options.retry_count,
"processed_repositories": processed_repositories,
"completed_repositories": completed_repositories,
"warning_repositories": warning_repositories,
"failed_repositories": failed_repositories,
"written_commit_count": written_commit_count,
"input_files": {"repos_jsonl": str(repos_path)},
"output_files": {
"commits_jsonl": str(commits_path),
"repo_status_jsonl": str(status_path),
},
"fetched_at": utc_now(),
}
write_json(manifest_path, manifest)
return {
"run_id": run_id,
"processed_repositories": processed_repositories,
"completed_repositories": completed_repositories,
"warning_repositories": warning_repositories,
"failed_repositories": failed_repositories,
"commits_path": commits_path,
"status_path": status_path,
"manifest_path": manifest_path,
}
def resolve_repo_input(options: FetchCommitsOptions, output_root: Path) -> tuple[Path, str]:
if options.run_id is not None:
return (
output_root / options.run_id / "phase1_repository_sampling" / "repos.jsonl",
options.run_id,
)
assert options.repos_file is not None
repos_path = options.repos_file.resolve()
repos = read_jsonl(repos_path)
if not repos:
raise ValueError(f"No repositories found in {repos_path}.")
return repos_path, str(repos[0]["run_id"])
def process_repository(
client: GithubApiClient,
repo: dict[str, Any],
run_id: str,
per_page: int,
max_pages_per_repo: int,
retry_count: int,
) -> tuple[dict[str, Any], list[dict[str, Any]]]:
full_name = repo["full_name"]
fetched_at = utc_now()
try:
repository = with_retries(
lambda: client.get_repository(full_name),
retry_count=retry_count,
)
branch = repository["default_branch"]
commit_rows: list[dict[str, Any]] = []
page = 1
last_page_size = 0
while page <= max_pages_per_repo:
commits = with_retries(
lambda page_number=page: client.list_commits(
full_name=full_name,
branch=branch,
page=page_number,
per_page=per_page,
),
retry_count=retry_count,
)
if not commits:
break
last_page_size = len(commits)
commit_rows.extend(
normalize_commit_record(
repo=repo,
repository=repository,
commit_payload=commit_payload,
run_id=run_id,
branch=branch,
page_number=page,
fetched_at=fetched_at,
)
for commit_payload in commits
)
if len(commits) < per_page:
break
page += 1
truncated = bool(commit_rows) and page > max_pages_per_repo and last_page_size == per_page
status = "success_with_warning" if truncated else "complete"
status_record = {
"run_id": run_id,
"repo_id": repo["repo_id"],
"repo_full_name": full_name,
"status": status,
"commit_count": len(commit_rows),
"default_branch_at_fetch": branch,
"truncated": truncated,
"truncation_reason": "max_pages" if truncated else None,
"failure_reason": None,
"fetched_at": fetched_at,
}
return status_record, commit_rows
except GithubApiError as exc:
status_record = {
"run_id": run_id,
"repo_id": repo["repo_id"],
"repo_full_name": full_name,
"status": "failed",
"commit_count": 0,
"default_branch_at_fetch": None,
"truncated": False,
"truncation_reason": None,
"failure_reason": str(exc),
"fetched_at": fetched_at,
}
return status_record, []
def normalize_commit_record(
repo: dict[str, Any],
repository: dict[str, Any],
commit_payload: dict[str, Any],
run_id: str,
branch: str,
page_number: int,
fetched_at: str,
) -> dict[str, Any]:
commit = commit_payload["commit"]
author = commit.get("author") or {}
committer = commit.get("committer") or {}
return {
"run_id": run_id,
"repo_id": repo["repo_id"],
"repo_full_name": repo["full_name"],
"repo_html_url": repo["html_url"],
"default_branch_at_fetch": branch,
"sha": commit_payload["sha"],
"commit_key": f"{repo['repo_id']}:{commit_payload['sha']}",
"parent_shas": [parent["sha"] for parent in commit_payload.get("parents", [])],
"author_name": author.get("name"),
"author_email": author.get("email"),
"author_date": author.get("date"),
"committer_name": committer.get("name"),
"committer_email": committer.get("email"),
"committer_date": committer.get("date"),
"message": commit.get("message"),
"html_url": commit_payload.get("html_url"),
"fetched_at": fetched_at,
"source_endpoint": f"/repos/{repository['full_name']}/commits",
"page_number": page_number,
"truncated": False,
}
def with_retries(operation: Any, retry_count: int) -> Any:
last_error: Exception | None = None
for _ in range(retry_count + 1):
try:
return operation()
except GithubApiError as exc:
last_error = exc
assert last_error is not None
raise last_error
def load_status_index(path: Path) -> dict[int, str]:
rows = read_jsonl(path)
latest_by_repo: dict[int, str] = {}
for row in rows:
latest_by_repo[int(row["repo_id"])] = row["status"]
return latest_by_repo
def dedupe_status_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
latest_by_repo: dict[int, dict[str, Any]] = {}
order: list[int] = []
for row in rows:
repo_id = int(row["repo_id"])
if repo_id not in latest_by_repo:
order.append(repo_id)
latest_by_repo[repo_id] = row
return [latest_by_repo[repo_id] for repo_id in order]
def count_jsonl_rows(path: Path) -> int:
if not path.exists():
return 0
return len(path.read_text(encoding="utf-8").splitlines())

View File

@@ -5,8 +5,12 @@ from pathlib import Path
import requests import requests
from github_datapipe.config import GithubConfig from github_datapipe.core.config import GithubConfig
from github_datapipe.extract_repos import SampleReposOptions, resolve_query, sample_repositories from github_datapipe.phases.phase1_repository_sampling.service import (
SampleReposOptions,
resolve_query,
sample_repositories,
)
def test_resolve_query_uses_default_when_missing() -> None: def test_resolve_query_uses_default_when_missing() -> None:
@@ -49,7 +53,7 @@ def test_sample_repositories_dedupes_and_persists(monkeypatch, tmp_path: Path) -
return {"total_count": 3, "items": [fake_repo(102, "owner/repo-three")]} return {"total_count": 3, "items": [fake_repo(102, "owner/repo-three")]}
# Inject the mock sampler into the main code # Inject the mock sampler into the main code
monkeypatch.setattr("github_datapipe.extract_repos.GithubRepoSampler", FakeSampler) monkeypatch.setattr("github_datapipe.phases.phase1_repository_sampling.service.GithubApiClient", FakeSampler)
options = SampleReposOptions( options = SampleReposOptions(
count=3, count=3,

121
tests/test_phase2.py Normal file
View File

@@ -0,0 +1,121 @@
from __future__ import annotations
import json
from pathlib import Path
import requests
from github_datapipe.phases.phase2_commit_ingestion.service import (
FetchCommitsOptions,
fetch_commits,
)
def test_fetch_commits_writes_normalized_records_and_warning(monkeypatch, tmp_path: Path) -> None:
monkeypatch.setenv("github_token", "token")
repos_path = tmp_path / "run-test" / "phase1_repository_sampling" / "repos.jsonl"
repos_path.parent.mkdir(parents=True, exist_ok=True)
repos_path.write_text(json.dumps(fake_repo_record()) + "\n", encoding="utf-8")
class FakeClient:
def __init__(self, token: str, session: requests.Session | None = None) -> None:
self.token = token
def get_repository(self, full_name: str) -> dict:
return {"full_name": full_name, "default_branch": "main"}
def list_commits(self, full_name: str, branch: str, page: int, per_page: int) -> list[dict]:
if page == 1:
return [fake_commit_payload("a" * 40), fake_commit_payload("b" * 40)]
return []
monkeypatch.setattr("github_datapipe.phases.phase2_commit_ingestion.service.GithubApiClient", FakeClient)
result = fetch_commits(
FetchCommitsOptions(
output_root=tmp_path,
run_id="run-test",
mode="refresh",
per_page=2,
max_pages_per_repo=1,
retry_count=0,
)
)
commits_path = Path(result["commits_path"])
commit_rows = [json.loads(line) for line in commits_path.read_text(encoding="utf-8").splitlines()]
assert len(commit_rows) == 2
assert commit_rows[0]["commit_key"] == f"{fake_repo_record()['repo_id']}:{'a' * 40}"
status_path = Path(result["status_path"])
status_rows = [json.loads(line) for line in status_path.read_text(encoding="utf-8").splitlines()]
assert status_rows[0]["status"] == "success_with_warning"
assert status_rows[0]["truncated"] is True
def test_fetch_commits_resume_skips_completed_repos(monkeypatch, tmp_path: Path) -> None:
monkeypatch.setenv("github_token", "token")
run_root = tmp_path / "run-test"
repos_path = run_root / "phase1_repository_sampling" / "repos.jsonl"
repos_path.parent.mkdir(parents=True, exist_ok=True)
repos_path.write_text(json.dumps(fake_repo_record()) + "\n", encoding="utf-8")
phase2_root = run_root / "phase2_commit_ingestion"
phase2_root.mkdir(parents=True, exist_ok=True)
(phase2_root / "repo_status.jsonl").write_text(
json.dumps(
{
"repo_id": fake_repo_record()["repo_id"],
"status": "complete",
}
)
+ "\n",
encoding="utf-8",
)
class FakeClient:
def __init__(self, token: str, session: requests.Session | None = None) -> None:
raise AssertionError("Client should not be initialized for already completed repos")
monkeypatch.setattr("github_datapipe.phases.phase2_commit_ingestion.service.GithubApiClient", FakeClient)
result = fetch_commits(
FetchCommitsOptions(
output_root=tmp_path,
run_id="run-test",
mode="resume",
retry_count=0,
)
)
assert result["processed_repositories"] == 0
def fake_repo_record() -> dict:
return {
"run_id": "run-test",
"repo_id": 100,
"full_name": "owner/repo-one",
"html_url": "https://github.com/owner/repo-one",
}
def fake_commit_payload(sha: str) -> dict:
return {
"sha": sha,
"html_url": f"https://github.com/owner/repo-one/commit/{sha}",
"parents": [{"sha": "parent-sha"}],
"commit": {
"author": {
"name": "Alice",
"email": "alice@example.com",
"date": "2024-01-01T00:00:00Z",
},
"committer": {
"name": "Bob",
"email": "bob@example.com",
"date": "2024-01-01T00:00:00Z",
},
"message": "Initial commit",
},
}