diff --git a/README.md b/README.md new file mode 100644 index 0000000..f1dc622 --- /dev/null +++ b/README.md @@ -0,0 +1,121 @@ +# GitHub Data Pipeline + +This project builds a two-phase GitHub data pipeline: + +- Phase 1 samples repositories from GitHub Search and stores repository metadata. +- Phase 2 reads the saved repositories, refreshes repository metadata, and downloads commit history from the current default branch. + +The project uses Poetry for dependency management and a `src` package layout for Python modules. + +## Project Layout + +- `src/github_datapipe/core/` + Shared config, GitHub API, runtime, and file IO helpers. +- `src/github_datapipe/phases/phase1_repository_sampling/` + Phase 1 repository discovery and persistence. +- `src/github_datapipe/phases/phase2_commit_ingestion/` + Phase 2 commit fetching and persistence. +- `tests/` + Automated tests for the pipeline behavior. + +## Prerequisites + +- Python 3.12 installed locally +- Poetry installed locally +- GitHub personal access token stored in `.env` as: + +```env +github_token=YOUR_TOKEN_HERE +``` + +## Install Dependencies + +From the project root, install the local environment with: + +```powershell +poetry install +``` + +## Run Phase 1 + +The default phase 1 command collects 10 repositories using the default search query from `src/github_datapipe/core/config.py`. + +```powershell +poetry run github-datapipe sample-repos +``` + +Useful overrides: + +```powershell +poetry run github-datapipe sample-repos --count 25 +poetry run github-datapipe sample-repos --count 25 --query "is:public stars:>50 size:>5000 archived:false" +poetry run github-datapipe sample-repos --mode fresh +``` + +### Phase 1 Outputs + +After phase 1 completes, check: + +- `runs//phase1_repository_sampling/repos.jsonl` +- `runs//phase1_repository_sampling/manifest.json` +- `runs/seen_repo_ids.json` + +The command prints the generated `run_id`, which you will use for phase 2. + +## Run Phase 2 + +Phase 2 consumes the saved phase 1 repositories and downloads commit history from each repository's current default branch. + +Run against a phase 1 run: + +```powershell +poetry run github-datapipe fetch-commits --run-id +``` + +Useful overrides: + +```powershell +poetry run github-datapipe fetch-commits --run-id --mode resume +poetry run github-datapipe fetch-commits --run-id --max-pages-per-repo 3 +poetry run github-datapipe fetch-commits --run-id --per-page 50 +``` + +### Phase 2 Defaults + +- `mode=refresh` +- `per_page=100` +- `max_pages_per_repo=1` + +The default `max_pages_per_repo=1` keeps the prototype small and limits commit downloads to the first page for each repository. + +### Phase 2 Outputs + +After phase 2 completes, check: + +- `runs//phase2_commit_ingestion/commits/commits-0001.jsonl` +- `runs//phase2_commit_ingestion/repo_status.jsonl` +- `runs//phase2_commit_ingestion/manifest.json` + +## Run Tests + +Run the automated tests with: + +```powershell +poetry run pytest -q +``` + +## Alternate CLI Invocation + +If the Poetry script entrypoint is not available yet, run the CLI module directly: + +```powershell +poetry run python -m github_datapipe.cli sample-repos +poetry run python -m github_datapipe.cli fetch-commits --run-id +``` + +## Notes + +- Phase 1 currently uses deterministic GitHub Search traversal rather than randomized sampling. +- Phase 2 stores one normalized commit per JSONL line. +- Resume mode in phase 2 skips repositories already marked `complete`. +- Truncated repositories are marked as `success_with_warning` when the page cap is reached. diff --git a/pyproject.toml b/pyproject.toml index eaf7165..3911a03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,12 +16,16 @@ dependencies = [ ] [project.scripts] -github-datapipe = "github_datapipe.extract_repos:main" +github-datapipe = "github_datapipe.cli:main" +[tool.poetry] +packages = [ + { include = "github_datapipe", from = "src" } +] [build-system] requires = ["poetry-core>=2.0.0,<3.0.0"] build-backend = "poetry.core.masonry.api" [tool.pytest.ini_options] -pythonpath = ["src"] \ No newline at end of file +pythonpath = ["src"] diff --git a/src/github_datapipe/__init__.py b/src/github_datapipe/__init__.py index e69de29..0b52f1b 100644 --- a/src/github_datapipe/__init__.py +++ b/src/github_datapipe/__init__.py @@ -0,0 +1 @@ +"""GitHub data pipeline package.""" diff --git a/src/github_datapipe/cli.py b/src/github_datapipe/cli.py new file mode 100644 index 0000000..5f499c7 --- /dev/null +++ b/src/github_datapipe/cli.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +from github_datapipe.core.config import GithubConfig +# Phase1 is concerned with querying the repo data +from github_datapipe.phases.phase1_repository_sampling.service import ( + SampleReposOptions, + resolve_query, + sample_repositories, +) +# Phase 2 is concerned with downloading the commit data for repos collected in phase 1 +from github_datapipe.phases.phase2_commit_ingestion.service import ( + FetchCommitsOptions, + fetch_commits, +) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="GitHub data pipeline CLI") + subparsers = parser.add_subparsers(dest="command", required=True) + + sample_parser = subparsers.add_parser( + "sample-repos", + help="Collect repositories from GitHub Search and save phase 1 outputs.", + ) + sample_parser.add_argument( + "--count", + type=int, + default=GithubConfig.default_repo_count, + help=f"Number of repositories to sample. Defaults to {GithubConfig.default_repo_count}.", + ) + sample_parser.add_argument( + "--query", + type=str, + default=None, + help="Optional raw GitHub repository search query. Replaces config defaults when provided.", + ) + sample_parser.add_argument( + "--output-root", + type=Path, + default=Path(GithubConfig.default_output_root), + help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.", + ) + sample_parser.add_argument( + "--mode", + choices=("append-deduped", "fresh"), + default="append-deduped", + help="Whether to dedupe against the persisted seen-repo index or start a fresh phase-1 run.", + ) + sample_parser.add_argument( + "--per-page", + type=int, + default=GithubConfig.default_per_page, + help=f"GitHub Search page size. Defaults to {GithubConfig.default_per_page}.", + ) + sample_parser.add_argument( + "--run-id", + type=str, + default=None, + help="Optional run identifier. If omitted, a run id is generated automatically.", + ) + + commit_parser = subparsers.add_parser( + "fetch-commits", + help="Fetch commit history for repositories collected in phase 1.", + ) + commit_input = commit_parser.add_mutually_exclusive_group(required=True) + commit_input.add_argument( + "--run-id", + type=str, + help="Run identifier whose phase 1 repository dataset should be consumed.", + ) + commit_input.add_argument( + "--repos-file", + type=Path, + help="Path to a phase 1 repos.jsonl file.", + ) + commit_parser.add_argument( + "--output-root", + type=Path, + default=Path(GithubConfig.default_output_root), + help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.", + ) + commit_parser.add_argument( + "--mode", + choices=("refresh", "resume"), + default="refresh", + help="Whether to fetch all repositories from scratch or skip those already marked complete.", + ) + commit_parser.add_argument( + "--max-pages-per-repo", + type=int, + default=GithubConfig.default_max_pages_per_repo, + help=( + "Maximum number of commit pages to fetch per repository. " + f"Defaults to {GithubConfig.default_max_pages_per_repo}." + ), + ) + commit_parser.add_argument( + "--per-page", + type=int, + default=GithubConfig.default_per_page, + help=f"GitHub commit page size. Defaults to {GithubConfig.default_per_page}.", + ) + commit_parser.add_argument( + "--retry-count", + type=int, + default=GithubConfig.default_retry_count, + help=f"Number of retries for repository metadata and commit requests. Defaults to {GithubConfig.default_retry_count}.", + ) + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + + if args.command == "sample-repos": + options = SampleReposOptions( + count=args.count, + output_root=args.output_root, + query=resolve_query(args.query), + per_page=args.per_page, + mode=args.mode, + run_id=args.run_id, + ) + result = sample_repositories(options) + print(f"Run ID: {result['run_id']}") + print(f"Collected repositories: {result['count_collected']}") + print(f"Repositories file: {result['repos_path']}") + print(f"Manifest file: {result['manifest_path']}") + if result["seen_index_path"] is not None: + print(f"Seen repo index: {result['seen_index_path']}") + return 0 + + if args.command == "fetch-commits": + options = FetchCommitsOptions( + output_root=args.output_root, + run_id=args.run_id, + repos_file=args.repos_file, + mode=args.mode, + per_page=args.per_page, + max_pages_per_repo=args.max_pages_per_repo, + retry_count=args.retry_count, + ) + result = fetch_commits(options) + print(f"Run ID: {result['run_id']}") + print(f"Processed repositories: {result['processed_repositories']}") + print(f"Completed repositories: {result['completed_repositories']}") + print(f"Warning repositories: {result['warning_repositories']}") + print(f"Failed repositories: {result['failed_repositories']}") + print(f"Commits file: {result['commits_path']}") + print(f"Status file: {result['status_path']}") + print(f"Manifest file: {result['manifest_path']}") + return 0 + + parser.error(f"Unsupported command: {args.command}") + return 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/github_datapipe/core/__init__.py b/src/github_datapipe/core/__init__.py new file mode 100644 index 0000000..db1fbb7 --- /dev/null +++ b/src/github_datapipe/core/__init__.py @@ -0,0 +1 @@ +"""Shared building blocks for GitHub pipeline phases.""" diff --git a/src/github_datapipe/config.py b/src/github_datapipe/core/config.py similarity index 74% rename from src/github_datapipe/config.py rename to src/github_datapipe/core/config.py index ac85ebf..0ca16d2 100644 --- a/src/github_datapipe/config.py +++ b/src/github_datapipe/core/config.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from pathlib import Path -@dataclass(frozen=True) # This makes the class immutable so in the process, we do not accidently change the variable values of this +@dataclass(frozen=True) class GithubConfig: base_url: str = "https://api.github.com" api_version: str = "2022-11-28" @@ -13,9 +13,9 @@ class GithubConfig: default_per_page: int = 100 default_output_root: str = "runs" default_query: str = "is:public stars:>10 size:>1000 archived:false fork:false created:>2025-07-15" + default_max_pages_per_repo: int = 1 + default_retry_count: int = 2 user_agent: str = "github-datapipe/0.1.0" - - -ROOT_DIR = Path(__file__).resolve().parent +ROOT_DIR = Path(__file__).resolve().parents[2] diff --git a/src/github_datapipe/core/github_api.py b/src/github_datapipe/core/github_api.py new file mode 100644 index 0000000..9f91df3 --- /dev/null +++ b/src/github_datapipe/core/github_api.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from typing import Any + +import requests + +from github_datapipe.core.config import GithubConfig + + +class GithubApiError(RuntimeError): + """Raised when GitHub responds with an error payload.""" + + +class GithubApiClient: + def __init__(self, token: str, session: requests.Session | None = None) -> None: + self._session = session or requests.Session() + self._session.headers.update( + { + "Accept": "application/vnd.github+json", + "Authorization": f"Bearer {token}", + "X-GitHub-Api-Version": GithubConfig.api_version, + "User-Agent": GithubConfig.user_agent, + } + ) + + def search_repositories(self, query: str, page: int, per_page: int) -> dict[str, Any]: + return self._get_json( + GithubConfig.search_repositories_endpoint, + params={"q": query, "page": page, "per_page": per_page}, + ) + + def get_repository(self, full_name: str) -> dict[str, Any]: + return self._get_json(f"/repos/{full_name}") + + def list_commits(self, full_name: str, branch: str, page: int, per_page: int) -> list[dict[str, Any]]: + return self._get_json( + f"/repos/{full_name}/commits", + params={"sha": branch, "page": page, "per_page": per_page}, + ) + + def _get_json(self, path: str, params: dict[str, Any] | None = None) -> Any: + response = self._session.get( + f"{GithubConfig.base_url}{path}", + params=params, + timeout=30, + ) + if response.status_code >= 400: + try: + payload = response.json() + except ValueError: + payload = {"message": response.text} + message = payload.get("message", "GitHub API request failed") + raise GithubApiError(f"GitHub request failed ({response.status_code}): {message}") + return response.json() diff --git a/src/github_datapipe/core/io.py b/src/github_datapipe/core/io.py new file mode 100644 index 0000000..55e6f6a --- /dev/null +++ b/src/github_datapipe/core/io.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Iterable + + +def write_json(path: Path, payload: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, indent=2), encoding="utf-8") + + +def write_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as handle: + for row in rows: + handle.write(json.dumps(row)) + handle.write("\n") + + +def append_jsonl(path: Path, rows: Iterable[dict[str, Any]]) -> int: + path.parent.mkdir(parents=True, exist_ok=True) + count = 0 + with path.open("a", encoding="utf-8") as handle: + for row in rows: + handle.write(json.dumps(row)) + handle.write("\n") + count += 1 + return count + + +def read_json(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8")) + + +def read_jsonl(path: Path) -> list[dict[str, Any]]: + if not path.exists(): + return [] + rows: list[dict[str, Any]] = [] + for line in path.read_text(encoding="utf-8").splitlines(): + if line.strip(): + rows.append(json.loads(line)) + return rows diff --git a/src/github_datapipe/core/runtime.py b/src/github_datapipe/core/runtime.py new file mode 100644 index 0000000..4a33100 --- /dev/null +++ b/src/github_datapipe/core/runtime.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import os +from datetime import UTC, datetime +from uuid import uuid4 + +from dotenv import load_dotenv + +load_dotenv() + + +def require_github_token() -> str: + github_token = os.getenv("github_token") + if not github_token: + raise ValueError("GitHub token is not available in .env as `github_token`.") + return github_token + + +def build_run_id() -> str: + return f"run-{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}-{uuid4().hex[:8]}" + + +def utc_now() -> str: + return datetime.now(tz=UTC).isoformat() diff --git a/src/github_datapipe/extract_repos.py b/src/github_datapipe/extract_repos.py deleted file mode 100644 index 09929ae..0000000 --- a/src/github_datapipe/extract_repos.py +++ /dev/null @@ -1,286 +0,0 @@ -from __future__ import annotations - -import json -import os -from dataclasses import dataclass -from datetime import UTC, datetime -from pathlib import Path -from typing import Any -from uuid import uuid4 - -import requests -from dotenv import load_dotenv - -from github_datapipe.config import GithubConfig - -load_dotenv() - - -class GithubApiError(RuntimeError): - """Raised when GitHub responds with an error payload.""" - - -@dataclass(frozen=True) -class SampleReposOptions: - """ - To set configuration options for the GitHub repository sampling run. - Attributes: - 1. count (int): The total number of unique repositories to collect. Must be > 0. Defaults to GithubConfig.default_repo_count - 2. output_root: The base directory where the run folder will be created to store the search repository data. - 3. query: The github search string using github offered query params. - 4. per_page: The number of results to fetch per API call. Max is 100. - 5. mode: 'fresh' (starting a fresh run) or "append-deduped" (if you want to resume half-done previous run and avoid duplicates). - 6. run_id: To keep a trace of current run by giving it a unique id. - """ - count: int = GithubConfig.default_repo_count - output_root: Path = Path(GithubConfig.default_output_root) - query: str = GithubConfig.default_query - per_page: int = GithubConfig.default_per_page - mode: str = "append-deduped" - run_id: str | None = None - - -def resolve_query(query_override: str | None) -> str: - """ To override default query params with user inserted query params. """ - return query_override.strip() if query_override else GithubConfig.default_query - - -class GithubRepoSampler: - def __init__(self, token: str, session: requests.Session | None = None) -> None: - """ - Everytime an instance of GithubRepoSampler is created, this method will be called first to open a session and set headers. - Args: - token (str): A valid GitHub Personal Access Token (PAT). - session (requests.Session | None, optional): An existing session to use. If None, a new session is created. - """ - self._session = session or requests.Session() - self._session.headers.update( - { - "Accept": "application/vnd.github+json", - "Authorization": f"Bearer {token}", - "X-GitHub-Api-Version": GithubConfig.api_version, - "User-Agent": GithubConfig.user_agent, - } - ) - - def search_repositories(self, query: str, page: int, per_page: int) -> dict[str, Any]: - """ - Calls github's /search/repositories endpoint passing query params and other. - - Args: - query (str): The GitHub-formatted search query string. - page (int): The specific page of results to fetch. - per_page (int): The number of items to return per page (max 100). - - Returns: - dict[str, Any]: The raw JSON payload returned by the GitHub API. - - Raises: - GithubApiError: If the API responds with an HTTP status code >= 400 - """ - response = self._session.get( - f"{GithubConfig.base_url}{GithubConfig.search_repositories_endpoint}", - params={"q": query, "page": page, "per_page": per_page}, - timeout=30, - ) - if response.status_code >= 400: - try: - payload = response.json() - except ValueError: - payload = {"message": response.text} - message = payload.get("message", "GitHub API request failed") - raise GithubApiError(f"GitHub search failed ({response.status_code}): {message}") - return response.json() - - -def sample_repositories(options: SampleReposOptions) -> dict[str, Any]: - """ - Orchestrates the end-to-end execution of a GitHub repository data pull. - - This function handles the complete lifecycle of a sampling run: validating credentials, - setting up the local file system for the run, managing state to prevent duplicate - downloads, paginating through the GitHub Search API, and saving the normalized data. - - Effects: - - Creates a timestamped run directory inside `options.output_root`. - - Writes newly fetched repository data to `repos.jsonl`. - - Writes or updates a global `seen_repo_ids.json` tracker file. - - Writes a `manifest.json` logging the run's parameters and results. - - Args: - options (SampleReposOptions): The configuration parameters for the run. - - Returns: - dict[str, Any]: A summary dictionary containing the `run_id`, the total - `count_collected`, and the absolute paths to the generated output files. - """ - # 1. SETUP: Validate the token and creds - github_token = os.getenv("github_token") - if not github_token: - raise ValueError("GitHub token is not available in .env as `github_token`.") - - if options.count <= 0: - raise ValueError("`count` must be greater than 0.") - - # 2. FILE SYSTEM: Create isolated folders for this specific run - run_id = options.run_id or build_run_id() - output_root = options.output_root.resolve() # returns an absolute path - run_root = output_root / run_id # isolated folder created for one single execution (or "run") of this script. - phase1_root = run_root / "phase1" # the search phase is phase 1 - phase1_root.mkdir(parents=True, exist_ok=True) - - # Define exact file paths for where data will live - seen_index_path = output_root / "seen_repo_ids.json" # Notes the last seen repo's id - repos_path = phase1_root / "repos.jsonl" # Actual repo metadata - manifest_path = phase1_root / "manifest.json" # log entry - - # 3. STATE MGMT: seen_repo_ids will make sure that duplicated data doesn't end up in the result - seen_repo_ids = set() if options.mode == "fresh" else load_seen_repo_ids(seen_index_path) - current_run_seen_repo_ids: set[int] = set() - - # 4. NETWORK SETUP: Initialize the session wrapper - sampler = GithubRepoSampler(token=github_token) - accepted_repos: list[dict[str, Any]] = [] - page = 1 - total_count: int | None = None - - # 5. THE MAIN LOOP: Keep fetching until the requested target count is hit - while len(accepted_repos) < options.count: - payload = sampler.search_repositories( - query=options.query, - page=page, - per_page=options.per_page, - ) - # Github Search API response = JSON Object containing { search metadata, items: [{ repo obj1 }, { repo obj 2} ...] } - items = payload.get("items", []) - total_count = payload.get("total_count", total_count) # the number of the repos matching your query at exact moment in time - - # Break safety: If GitHub returns an empty list, we've exhausted all results of current api call - if not items: - break - - sampled_at = utc_now() # to keep note of current time incase we need to revise it - - # Process each repository in the current result present in items - for repo in items: - repo_id = int(repo["id"]) - if repo_id in seen_repo_ids or repo_id in current_run_seen_repo_ids: - continue - - normalized_repo = normalize_repo_record( - repo=repo, - run_id=run_id, - sample_query=options.query, - sample_page=page, - sampled_at=sampled_at, - ) - accepted_repos.append(normalized_repo) - current_run_seen_repo_ids.add(repo_id) - - if len(accepted_repos) >= options.count: - break - - page += 1 - - write_jsonl(repos_path, accepted_repos) - if options.mode != "fresh": - seen_repo_ids.update(current_run_seen_repo_ids) - write_json(seen_index_path, sorted(seen_repo_ids)) - - # manifest is a log object that denotes what kind of input was given to achieve certain output. - manifest = { - "run_id": run_id, - "command": "sample-repos", - "resolved_query": options.query, - "count_requested": options.count, - "count_collected": len(accepted_repos), - "per_page": options.per_page, - "mode": options.mode, - "page_count_scanned": max(page - 1, 0), - "github_total_count": total_count, - "output_files": { - "repos_jsonl": str(repos_path), - "seen_repo_ids_json": str(seen_index_path) if options.mode != "fresh" else None, - }, - "sampled_at": utc_now(), - } - write_json(manifest_path, manifest) - - return { - "run_id": run_id, - "count_collected": len(accepted_repos), - "repos_path": repos_path, - "manifest_path": manifest_path, - "seen_index_path": seen_index_path if options.mode != "fresh" else None, - } - - -def normalize_repo_record( - repo: dict[str, Any], - run_id: str, - sample_query: str, - sample_page: int, - sampled_at: str, -) -> dict[str, Any]: - - """ - Extracts and flattens relevant fields from a raw GitHub repository payload. - - Args: - repo (dict[str, Any]): The raw JSON dictionary for a single repository - as returned by the GitHub API. - run_id (str): The unique identifier for the current sampling run. - sample_query (str): The search string used to find this repository. - sample_page (int): The pagination page number this repository was found on. - sampled_at (str): ISO-8601 formatted timestamp of when the record was fetched. - - Returns: - dict[str, Any]: A flat dictionary containing the filtered repository metrics - (e.g., repo_id, language, stargazers_count, size_kb) and traceability data. - """ - return { - "run_id": run_id, - "repo_id": repo["id"], - "full_name": repo["full_name"], - "html_url": repo["html_url"], - "api_url": repo["url"], - "default_branch": repo.get("default_branch"), - "language": repo.get("language"), - "description": repo.get("description"), - "stargazers_count": repo.get("stargazers_count"), - "size_kb": repo.get("size"), - "fork": repo.get("fork"), - "archived": repo.get("archived"), - "visibility": repo.get("visibility"), - "sample_query": sample_query, - "sample_page": sample_page, - "sampled_at": sampled_at, - } - - -def load_seen_repo_ids(path: Path) -> set[int]: - if not path.exists(): - return set() - payload = json.loads(path.read_text(encoding="utf-8")) - return {int(repo_id) for repo_id in payload} - - -def write_json(path: Path, payload: Any) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(json.dumps(payload, indent=2), encoding="utf-8") - - -def write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - with path.open("w", encoding="utf-8") as handle: - for row in rows: - handle.write(json.dumps(row)) - handle.write("\n") - - -def build_run_id() -> str: - return f"run-{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}-{uuid4().hex[:8]}" - - -def utc_now() -> str: - return datetime.now(tz=UTC).isoformat() \ No newline at end of file diff --git a/src/github_datapipe/main.py b/src/github_datapipe/main.py deleted file mode 100644 index 602832c..0000000 --- a/src/github_datapipe/main.py +++ /dev/null @@ -1,86 +0,0 @@ -from __future__ import annotations - -import argparse -from pathlib import Path - -from github_datapipe.config import GithubConfig -from github_datapipe.extract_repos import SampleReposOptions, resolve_query, sample_repositories - -### parser that unpacks the command line args passed as queries and -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="GitHub data pipeline CLI") - subparsers = parser.add_subparsers(dest="command", required=True) - - sample_parser = subparsers.add_parser( - "sample-repos", - help="Collect repositories from GitHub Search and save phase 1 outputs.", - ) - sample_parser.add_argument( - "--count", - type=int, - default=GithubConfig.default_repo_count, - help=f"Number of repositories to sample. Defaults to {GithubConfig.default_repo_count}.", - ) - sample_parser.add_argument( - "--query", - type=str, - default=None, - help="Optional raw GitHub repository search query. Replaces config defaults when provided.", - ) - sample_parser.add_argument( - "--output-root", - type=Path, - default=Path(GithubConfig.default_output_root), - help=f"Directory where run outputs are stored. Defaults to `{GithubConfig.default_output_root}`.", - ) - sample_parser.add_argument( - "--mode", - choices=("append-deduped", "fresh"), - default="append-deduped", - help="Whether to dedupe against the persisted seen-repo index or start a fresh phase-1 run.", - ) - sample_parser.add_argument( - "--per-page", - type=int, - default=GithubConfig.default_per_page, - help=f"GitHub Search page size. Defaults to {GithubConfig.default_per_page}.", - ) - sample_parser.add_argument( - "--run-id", - type=str, - default=None, - help="Optional run identifier. If omitted, a run id is generated automatically.", - ) - - return parser - - -def main() -> int: - parser = build_parser() - args = parser.parse_args() - - if args.command == "sample-repos": - options = SampleReposOptions( - count=args.count, - output_root=args.output_root, - query=resolve_query(args.query), - per_page=args.per_page, - mode=args.mode, - run_id=args.run_id, - ) - # result will return output from the extract repo's sample_repositories - result = sample_repositories(options) - print(f"Run ID: {result['run_id']}") - print(f"Collected repositories: {result['count_collected']}") - print(f"Repositories file: {result['repos_path']}") - print(f"Manifest file: {result['manifest_path']}") - if result["seen_index_path"] is not None: - print(f"Seen repo index: {result['seen_index_path']}") - return 0 - - parser.error(f"Unsupported command: {args.command}") - return 1 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/src/github_datapipe/phases/__init__.py b/src/github_datapipe/phases/__init__.py new file mode 100644 index 0000000..fe0f601 --- /dev/null +++ b/src/github_datapipe/phases/__init__.py @@ -0,0 +1 @@ +"""Pipeline phases for repository sampling and commit ingestion.""" diff --git a/src/github_datapipe/phases/phase1_repository_sampling/__init__.py b/src/github_datapipe/phases/phase1_repository_sampling/__init__.py new file mode 100644 index 0000000..2cb99da --- /dev/null +++ b/src/github_datapipe/phases/phase1_repository_sampling/__init__.py @@ -0,0 +1 @@ +"""Phase 1: repository discovery and persistence.""" diff --git a/src/github_datapipe/phases/phase1_repository_sampling/service.py b/src/github_datapipe/phases/phase1_repository_sampling/service.py new file mode 100644 index 0000000..8538ad2 --- /dev/null +++ b/src/github_datapipe/phases/phase1_repository_sampling/service.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from github_datapipe.core.config import GithubConfig +from github_datapipe.core.github_api import GithubApiClient +from github_datapipe.core.io import read_json, write_json, write_jsonl +from github_datapipe.core.runtime import build_run_id, require_github_token, utc_now + + +@dataclass(frozen=True) +class SampleReposOptions: + count: int = GithubConfig.default_repo_count + output_root: Path = Path(GithubConfig.default_output_root) + query: str = GithubConfig.default_query + per_page: int = GithubConfig.default_per_page + mode: str = "append-deduped" + run_id: str | None = None + + +def resolve_query(query_override: str | None) -> str: + return query_override.strip() if query_override else GithubConfig.default_query + + +def sample_repositories(options: SampleReposOptions) -> dict[str, Any]: + token = require_github_token() + if options.count <= 0: + raise ValueError("`count` must be greater than 0.") + + run_id = options.run_id or build_run_id() + output_root = options.output_root.resolve() + run_root = output_root / run_id + phase_root = run_root / "phase1_repository_sampling" + phase_root.mkdir(parents=True, exist_ok=True) + + seen_index_path = output_root / "seen_repo_ids.json" + repos_path = phase_root / "repos.jsonl" + manifest_path = phase_root / "manifest.json" + + seen_repo_ids = set() if options.mode == "fresh" else load_seen_repo_ids(seen_index_path) + current_run_seen_repo_ids: set[int] = set() + + client = GithubApiClient(token=token) + accepted_repos: list[dict[str, Any]] = [] + page = 1 + total_count: int | None = None + + while len(accepted_repos) < options.count: + payload = client.search_repositories( + query=options.query, + page=page, + per_page=options.per_page, + ) + items = payload.get("items", []) + total_count = payload.get("total_count", total_count) + + if not items: + break + + sampled_at = utc_now() + for repo in items: + repo_id = int(repo["id"]) + if repo_id in seen_repo_ids or repo_id in current_run_seen_repo_ids: + continue + + accepted_repos.append( + normalize_repo_record( + repo=repo, + run_id=run_id, + sample_query=options.query, + sample_page=page, + sampled_at=sampled_at, + ) + ) + current_run_seen_repo_ids.add(repo_id) + + if len(accepted_repos) >= options.count: + break + + page += 1 + + write_jsonl(repos_path, accepted_repos) + if options.mode != "fresh": + seen_repo_ids.update(current_run_seen_repo_ids) + write_json(seen_index_path, sorted(seen_repo_ids)) + + manifest = { + "run_id": run_id, + "phase_name": "phase1_repository_sampling", + "command": "sample-repos", + "resolved_query": options.query, + "count_requested": options.count, + "count_collected": len(accepted_repos), + "per_page": options.per_page, + "mode": options.mode, + "page_count_scanned": max(page - 1, 0), + "github_total_count": total_count, + "output_files": { + "repos_jsonl": str(repos_path), + "seen_repo_ids_json": str(seen_index_path) if options.mode != "fresh" else None, + }, + "sampled_at": utc_now(), + } + write_json(manifest_path, manifest) + + return { + "run_id": run_id, + "count_collected": len(accepted_repos), + "repos_path": repos_path, + "manifest_path": manifest_path, + "seen_index_path": seen_index_path if options.mode != "fresh" else None, + } + + +def normalize_repo_record( + repo: dict[str, Any], + run_id: str, + sample_query: str, + sample_page: int, + sampled_at: str, +) -> dict[str, Any]: + return { + "run_id": run_id, + "repo_id": repo["id"], + "full_name": repo["full_name"], + "html_url": repo["html_url"], + "api_url": repo["url"], + "default_branch": repo.get("default_branch"), + "language": repo.get("language"), + "description": repo.get("description"), + "stargazers_count": repo.get("stargazers_count"), + "size_kb": repo.get("size"), + "fork": repo.get("fork"), + "archived": repo.get("archived"), + "visibility": repo.get("visibility"), + "sample_query": sample_query, + "sample_page": sample_page, + "sampled_at": sampled_at, + } + + +def load_seen_repo_ids(path: Path) -> set[int]: + if not path.exists(): + return set() + payload = read_json(path) + return {int(repo_id) for repo_id in payload} diff --git a/src/github_datapipe/phases/phase2_commit_ingestion/__init__.py b/src/github_datapipe/phases/phase2_commit_ingestion/__init__.py new file mode 100644 index 0000000..06b0858 --- /dev/null +++ b/src/github_datapipe/phases/phase2_commit_ingestion/__init__.py @@ -0,0 +1 @@ +"""Phase 2: commit history retrieval and persistence.""" diff --git a/src/github_datapipe/phases/phase2_commit_ingestion/service.py b/src/github_datapipe/phases/phase2_commit_ingestion/service.py new file mode 100644 index 0000000..420a5c5 --- /dev/null +++ b/src/github_datapipe/phases/phase2_commit_ingestion/service.py @@ -0,0 +1,293 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from github_datapipe.core.config import GithubConfig +from github_datapipe.core.github_api import GithubApiClient, GithubApiError +from github_datapipe.core.io import append_jsonl, read_json, read_jsonl, write_json, write_jsonl +from github_datapipe.core.runtime import require_github_token, utc_now + + +@dataclass(frozen=True) +class FetchCommitsOptions: + output_root: Path = Path(GithubConfig.default_output_root) + run_id: str | None = None + repos_file: Path | None = None + mode: str = "refresh" + per_page: int = GithubConfig.default_per_page + max_pages_per_repo: int = GithubConfig.default_max_pages_per_repo + retry_count: int = GithubConfig.default_retry_count + + +def fetch_commits(options: FetchCommitsOptions) -> dict[str, Any]: + token = require_github_token() + if options.max_pages_per_repo <= 0: + raise ValueError("`max_pages_per_repo` must be greater than 0.") + if options.per_page <= 0: + raise ValueError("`per_page` must be greater than 0.") + + output_root = options.output_root.resolve() + repos_path, run_id = resolve_repo_input(options, output_root) + + run_root = output_root / run_id + phase_root = run_root / "phase2_commit_ingestion" + commits_root = phase_root / "commits" + commits_path = commits_root / "commits-0001.jsonl" + status_path = phase_root / "repo_status.jsonl" + manifest_path = phase_root / "manifest.json" + + repos = read_jsonl(repos_path) + if not repos: + raise ValueError(f"No repositories found in {repos_path}.") + + prior_statuses = load_status_index(status_path) if options.mode == "resume" else {} + + if options.mode == "refresh" and commits_path.exists(): + commits_path.unlink() + status_rows: list[dict[str, Any]] = [] + if options.mode == "resume": + status_rows.extend(read_jsonl(status_path)) + + processed_repositories = 0 + completed_repositories = 0 + warning_repositories = 0 + failed_repositories = 0 + written_commit_count = 0 if options.mode == "refresh" else count_jsonl_rows(commits_path) + client: GithubApiClient | None = None + + for repo in repos: + repo_id = int(repo["repo_id"]) + previous_status = prior_statuses.get(repo_id) + if options.mode == "resume" and previous_status == "complete": + continue + + if client is None: + client = GithubApiClient(token=token) + + processed_repositories += 1 + status_record, commit_rows = process_repository( + client=client, + repo=repo, + run_id=run_id, + per_page=options.per_page, + max_pages_per_repo=options.max_pages_per_repo, + retry_count=options.retry_count, + ) + + status_rows.append(status_record) + written_commit_count += append_jsonl(commits_path, commit_rows) + + if status_record["status"] == "complete": + completed_repositories += 1 + elif status_record["status"] == "success_with_warning": + warning_repositories += 1 + else: + failed_repositories += 1 + + write_jsonl(status_path, dedupe_status_rows(status_rows)) + + manifest = { + "run_id": run_id, + "phase_name": "phase2_commit_ingestion", + "command": "fetch-commits", + "mode": options.mode, + "per_page": options.per_page, + "max_pages_per_repo": options.max_pages_per_repo, + "retry_count": options.retry_count, + "processed_repositories": processed_repositories, + "completed_repositories": completed_repositories, + "warning_repositories": warning_repositories, + "failed_repositories": failed_repositories, + "written_commit_count": written_commit_count, + "input_files": {"repos_jsonl": str(repos_path)}, + "output_files": { + "commits_jsonl": str(commits_path), + "repo_status_jsonl": str(status_path), + }, + "fetched_at": utc_now(), + } + write_json(manifest_path, manifest) + + return { + "run_id": run_id, + "processed_repositories": processed_repositories, + "completed_repositories": completed_repositories, + "warning_repositories": warning_repositories, + "failed_repositories": failed_repositories, + "commits_path": commits_path, + "status_path": status_path, + "manifest_path": manifest_path, + } + + +def resolve_repo_input(options: FetchCommitsOptions, output_root: Path) -> tuple[Path, str]: + if options.run_id is not None: + return ( + output_root / options.run_id / "phase1_repository_sampling" / "repos.jsonl", + options.run_id, + ) + + assert options.repos_file is not None + repos_path = options.repos_file.resolve() + repos = read_jsonl(repos_path) + if not repos: + raise ValueError(f"No repositories found in {repos_path}.") + return repos_path, str(repos[0]["run_id"]) + + +def process_repository( + client: GithubApiClient, + repo: dict[str, Any], + run_id: str, + per_page: int, + max_pages_per_repo: int, + retry_count: int, +) -> tuple[dict[str, Any], list[dict[str, Any]]]: + full_name = repo["full_name"] + fetched_at = utc_now() + + try: + repository = with_retries( + lambda: client.get_repository(full_name), + retry_count=retry_count, + ) + branch = repository["default_branch"] + commit_rows: list[dict[str, Any]] = [] + page = 1 + last_page_size = 0 + + while page <= max_pages_per_repo: + commits = with_retries( + lambda page_number=page: client.list_commits( + full_name=full_name, + branch=branch, + page=page_number, + per_page=per_page, + ), + retry_count=retry_count, + ) + if not commits: + break + + last_page_size = len(commits) + commit_rows.extend( + normalize_commit_record( + repo=repo, + repository=repository, + commit_payload=commit_payload, + run_id=run_id, + branch=branch, + page_number=page, + fetched_at=fetched_at, + ) + for commit_payload in commits + ) + + if len(commits) < per_page: + break + + page += 1 + + truncated = bool(commit_rows) and page > max_pages_per_repo and last_page_size == per_page + status = "success_with_warning" if truncated else "complete" + status_record = { + "run_id": run_id, + "repo_id": repo["repo_id"], + "repo_full_name": full_name, + "status": status, + "commit_count": len(commit_rows), + "default_branch_at_fetch": branch, + "truncated": truncated, + "truncation_reason": "max_pages" if truncated else None, + "failure_reason": None, + "fetched_at": fetched_at, + } + return status_record, commit_rows + except GithubApiError as exc: + status_record = { + "run_id": run_id, + "repo_id": repo["repo_id"], + "repo_full_name": full_name, + "status": "failed", + "commit_count": 0, + "default_branch_at_fetch": None, + "truncated": False, + "truncation_reason": None, + "failure_reason": str(exc), + "fetched_at": fetched_at, + } + return status_record, [] + + +def normalize_commit_record( + repo: dict[str, Any], + repository: dict[str, Any], + commit_payload: dict[str, Any], + run_id: str, + branch: str, + page_number: int, + fetched_at: str, +) -> dict[str, Any]: + commit = commit_payload["commit"] + author = commit.get("author") or {} + committer = commit.get("committer") or {} + return { + "run_id": run_id, + "repo_id": repo["repo_id"], + "repo_full_name": repo["full_name"], + "repo_html_url": repo["html_url"], + "default_branch_at_fetch": branch, + "sha": commit_payload["sha"], + "commit_key": f"{repo['repo_id']}:{commit_payload['sha']}", + "parent_shas": [parent["sha"] for parent in commit_payload.get("parents", [])], + "author_name": author.get("name"), + "author_email": author.get("email"), + "author_date": author.get("date"), + "committer_name": committer.get("name"), + "committer_email": committer.get("email"), + "committer_date": committer.get("date"), + "message": commit.get("message"), + "html_url": commit_payload.get("html_url"), + "fetched_at": fetched_at, + "source_endpoint": f"/repos/{repository['full_name']}/commits", + "page_number": page_number, + "truncated": False, + } + + +def with_retries(operation: Any, retry_count: int) -> Any: + last_error: Exception | None = None + for _ in range(retry_count + 1): + try: + return operation() + except GithubApiError as exc: + last_error = exc + assert last_error is not None + raise last_error + + +def load_status_index(path: Path) -> dict[int, str]: + rows = read_jsonl(path) + latest_by_repo: dict[int, str] = {} + for row in rows: + latest_by_repo[int(row["repo_id"])] = row["status"] + return latest_by_repo + + +def dedupe_status_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + latest_by_repo: dict[int, dict[str, Any]] = {} + order: list[int] = [] + for row in rows: + repo_id = int(row["repo_id"]) + if repo_id not in latest_by_repo: + order.append(repo_id) + latest_by_repo[repo_id] = row + return [latest_by_repo[repo_id] for repo_id in order] + + +def count_jsonl_rows(path: Path) -> int: + if not path.exists(): + return 0 + return len(path.read_text(encoding="utf-8").splitlines()) diff --git a/tests/test_phase1.py b/tests/test_phase1.py index 7251cdd..90ddde3 100644 --- a/tests/test_phase1.py +++ b/tests/test_phase1.py @@ -5,8 +5,12 @@ from pathlib import Path import requests -from github_datapipe.config import GithubConfig -from github_datapipe.extract_repos import SampleReposOptions, resolve_query, sample_repositories +from github_datapipe.core.config import GithubConfig +from github_datapipe.phases.phase1_repository_sampling.service import ( + SampleReposOptions, + resolve_query, + sample_repositories, +) def test_resolve_query_uses_default_when_missing() -> None: @@ -49,7 +53,7 @@ def test_sample_repositories_dedupes_and_persists(monkeypatch, tmp_path: Path) - return {"total_count": 3, "items": [fake_repo(102, "owner/repo-three")]} # Inject the mock sampler into the main code - monkeypatch.setattr("github_datapipe.extract_repos.GithubRepoSampler", FakeSampler) + monkeypatch.setattr("github_datapipe.phases.phase1_repository_sampling.service.GithubApiClient", FakeSampler) options = SampleReposOptions( count=3, diff --git a/tests/test_phase2.py b/tests/test_phase2.py new file mode 100644 index 0000000..9ee7fdb --- /dev/null +++ b/tests/test_phase2.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import requests + +from github_datapipe.phases.phase2_commit_ingestion.service import ( + FetchCommitsOptions, + fetch_commits, +) + + +def test_fetch_commits_writes_normalized_records_and_warning(monkeypatch, tmp_path: Path) -> None: + monkeypatch.setenv("github_token", "token") + repos_path = tmp_path / "run-test" / "phase1_repository_sampling" / "repos.jsonl" + repos_path.parent.mkdir(parents=True, exist_ok=True) + repos_path.write_text(json.dumps(fake_repo_record()) + "\n", encoding="utf-8") + + class FakeClient: + def __init__(self, token: str, session: requests.Session | None = None) -> None: + self.token = token + + def get_repository(self, full_name: str) -> dict: + return {"full_name": full_name, "default_branch": "main"} + + def list_commits(self, full_name: str, branch: str, page: int, per_page: int) -> list[dict]: + if page == 1: + return [fake_commit_payload("a" * 40), fake_commit_payload("b" * 40)] + return [] + + monkeypatch.setattr("github_datapipe.phases.phase2_commit_ingestion.service.GithubApiClient", FakeClient) + + result = fetch_commits( + FetchCommitsOptions( + output_root=tmp_path, + run_id="run-test", + mode="refresh", + per_page=2, + max_pages_per_repo=1, + retry_count=0, + ) + ) + + commits_path = Path(result["commits_path"]) + commit_rows = [json.loads(line) for line in commits_path.read_text(encoding="utf-8").splitlines()] + assert len(commit_rows) == 2 + assert commit_rows[0]["commit_key"] == f"{fake_repo_record()['repo_id']}:{'a' * 40}" + + status_path = Path(result["status_path"]) + status_rows = [json.loads(line) for line in status_path.read_text(encoding="utf-8").splitlines()] + assert status_rows[0]["status"] == "success_with_warning" + assert status_rows[0]["truncated"] is True + + +def test_fetch_commits_resume_skips_completed_repos(monkeypatch, tmp_path: Path) -> None: + monkeypatch.setenv("github_token", "token") + run_root = tmp_path / "run-test" + repos_path = run_root / "phase1_repository_sampling" / "repos.jsonl" + repos_path.parent.mkdir(parents=True, exist_ok=True) + repos_path.write_text(json.dumps(fake_repo_record()) + "\n", encoding="utf-8") + + phase2_root = run_root / "phase2_commit_ingestion" + phase2_root.mkdir(parents=True, exist_ok=True) + (phase2_root / "repo_status.jsonl").write_text( + json.dumps( + { + "repo_id": fake_repo_record()["repo_id"], + "status": "complete", + } + ) + + "\n", + encoding="utf-8", + ) + + class FakeClient: + def __init__(self, token: str, session: requests.Session | None = None) -> None: + raise AssertionError("Client should not be initialized for already completed repos") + + monkeypatch.setattr("github_datapipe.phases.phase2_commit_ingestion.service.GithubApiClient", FakeClient) + + result = fetch_commits( + FetchCommitsOptions( + output_root=tmp_path, + run_id="run-test", + mode="resume", + retry_count=0, + ) + ) + + assert result["processed_repositories"] == 0 + + +def fake_repo_record() -> dict: + return { + "run_id": "run-test", + "repo_id": 100, + "full_name": "owner/repo-one", + "html_url": "https://github.com/owner/repo-one", + } + + +def fake_commit_payload(sha: str) -> dict: + return { + "sha": sha, + "html_url": f"https://github.com/owner/repo-one/commit/{sha}", + "parents": [{"sha": "parent-sha"}], + "commit": { + "author": { + "name": "Alice", + "email": "alice@example.com", + "date": "2024-01-01T00:00:00Z", + }, + "committer": { + "name": "Bob", + "email": "bob@example.com", + "date": "2024-01-01T00:00:00Z", + }, + "message": "Initial commit", + }, + }