Fork of patx/hglab.
tokenmess/hglab
add tests and fix null node error for empty repos
Commit 8970194fd8ed · Harrison Erd · 2026-05-04 19:30 -0400
Comments
No comments yet.
Diff
diff --git a/.hgignore b/.hgignore
--- a/.hgignore
+++ b/.hgignore
@@ -1,3 +1,4 @@
__pycache__/
.env
data/
+.pytest_cache
diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -52,6 +52,7 @@
RESERVED_USERNAMES = {"dashboard", "favicon.ico", "hg", "login", "logout", "new", "settings", "signup", "static", "harrisonerd"}
WRITE_HG_COMMANDS = {"pushkey", "unbundle"}
NULL_REV = "null"
+NULL_NODE = "0" * 40
README_CANDIDATES = ("README.md", "README.rst", "README.txt", "README")
MARKDOWN_EXTENSIONS = ("extra", "sane_lists")
MARKDOWN_TAGS = {
@@ -341,6 +342,9 @@
raw = (path or "").strip("/")
if not raw:
return ""
+ raw_parts = raw.split("/")
+ if any(part in {"", ".", ".."} for part in raw_parts):
+ abort(400, "Invalid repository path.")
parts = PurePosixPath(raw).parts
if any(part in {"", ".", ".."} for part in parts):
abort(400, "Invalid repository path.")
@@ -897,6 +901,8 @@
website = (value or "").strip()
if not website:
return ""
+ if any(char.isspace() for char in website):
+ raise ValueError("Website must be a valid http(s) URL.")
if "://" not in website:
website = "https://" + website
parsed = urlparse(website)
@@ -1201,6 +1207,10 @@
return render_markdown_links(text)
+def is_null_revision(value):
+ return (value or "").strip().lower() in {NULL_REV, NULL_NODE}
+
+
def ancestors_revset(node):
revision = validate_revision_id(node, allow_null=False)
return f"sort(ancestors({revision}), -rev)"
@@ -1215,7 +1225,9 @@
def commit_log(path, limit=50, revision=None):
- if revision == NULL_REV:
+ if is_null_revision(revision):
+ return []
+ if revision is None and repo_tip_node(path) is None:
return []
template_arg = "{rev}\\x1f{node}\\x1f{node|short}\\x1f{author|person}\\x1f{date|isodate}\\x1f{desc|firstline}\\x1e"
args = ["log", "-l", str(limit), "--template", template_arg]
@@ -1296,6 +1308,8 @@
parts = completed.stdout.rstrip("\x1e").split("\x1f")
if len(parts) != 6:
return None
+ if parts[0] == "-1" or is_null_revision(parts[1]):
+ return None
return {
"rev": parts[0],
"node": parts[1],
@@ -1609,7 +1623,9 @@
def commit_count(path, revision=None):
- if revision == NULL_REV:
+ if is_null_revision(revision):
+ return 0
+ if revision is None and repo_tip_node(path) is None:
return 0
args = ["log", "--template", "."]
if revision:
@@ -1625,7 +1641,7 @@
def validate_revision_id(value, allow_null=True):
revision = (value or "").strip()
- if not REV_RE.match(revision) or (revision == NULL_REV and not allow_null):
+ if not REV_RE.match(revision) or (is_null_revision(revision) and not allow_null):
abort(404, "Revision not found.")
return revision
@@ -1638,12 +1654,14 @@
return None
raise HgCommandError(completed.stderr.strip() or "Unable to read repository tip.", completed.returncode)
node = completed.stdout.strip()
+ if is_null_revision(node):
+ return None
return node or None
def repo_has_revision(path, revision):
revision = validate_revision_id(revision)
- if revision == NULL_REV:
+ if is_null_revision(revision):
return True
completed = run_hg(["log", "-r", revision, "--template", "{node}"], cwd=path, check=False)
return completed.returncode == 0 and bool(completed.stdout.strip())
@@ -1652,7 +1670,7 @@
def is_ancestor(path, ancestor_node, descendant_node):
ancestor_node = validate_revision_id(ancestor_node)
descendant_node = validate_revision_id(descendant_node, allow_null=False)
- if ancestor_node == NULL_REV or ancestor_node == descendant_node:
+ if is_null_revision(ancestor_node) or ancestor_node == descendant_node:
return True
completed = run_hg(
["log", "-r", f"descendants({ancestor_node}) and {descendant_node}", "--template", "{node}"],
diff --git a/tests/test_app.py b/tests/test_app.py
new file mode 100644
--- /dev/null
+++ b/tests/test_app.py
@@ -0,0 +1,795 @@
+import atexit
+import base64
+from http.cookies import SimpleCookie
+from io import BytesIO, StringIO
+import os
+import shutil
+import tempfile
+from pathlib import Path
+from urllib.parse import urlencode, urlsplit, unquote
+
+import pytest
+from bottle import HTTPError
+
+
+BOOTSTRAP_DIR = Path(tempfile.mkdtemp(prefix="hglab-test-bootstrap-"))
+atexit.register(shutil.rmtree, BOOTSTRAP_DIR, ignore_errors=True)
+os.environ["HG_HOST_DB"] = str(BOOTSTRAP_DIR / "hghost.sqlite3")
+os.environ["HG_HOST_REPO_ROOT"] = str(BOOTSTRAP_DIR / "repos")
+os.environ["SECRET_KEY"] = "test-secret"
+
+import app as hglab # noqa: E402
+
+
+class WsgiResponse:
+ def __init__(self, status_line, headers, body):
+ self.status_line = status_line
+ self.status_code = int(status_line.split(" ", 1)[0])
+ self.headers = headers
+ self.body = body
+ self.text = body.decode("utf-8", "replace")
+
+ def header(self, name, default=None):
+ name = name.lower()
+ for key, value in self.headers:
+ if key.lower() == name:
+ return value
+ return default
+
+ @property
+ def location(self):
+ return self.header("Location")
+
+ @property
+ def location_path(self):
+ location = self.location
+ if not location:
+ return None
+ split = urlsplit(location)
+ if not split.scheme and not split.netloc:
+ return location
+ return split.path + (f"?{split.query}" if split.query else "")
+
+
+class WsgiClient:
+ def __init__(self, wsgi_app):
+ self.wsgi_app = wsgi_app
+ self.cookies = {}
+
+ def get(self, path, headers=None):
+ return self.request("GET", path, headers=headers)
+
+ def post(self, path, data=None, headers=None):
+ return self.request("POST", path, data=data, headers=headers)
+
+ def request(self, method, path, data=None, headers=None):
+ headers = headers or {}
+ split = urlsplit(path)
+ body = b""
+ if data is not None:
+ body = urlencode(data, doseq=True).encode("utf-8")
+ headers = {"Content-Type": "application/x-www-form-urlencoded", **headers}
+ environ = {
+ "REQUEST_METHOD": method,
+ "SCRIPT_NAME": "",
+ "PATH_INFO": unquote(split.path),
+ "QUERY_STRING": split.query,
+ "SERVER_NAME": "example.test",
+ "SERVER_PORT": "80",
+ "SERVER_PROTOCOL": "HTTP/1.1",
+ "HTTP_HOST": "example.test",
+ "wsgi.version": (1, 0),
+ "wsgi.url_scheme": "http",
+ "wsgi.input": BytesIO(body),
+ "wsgi.errors": StringIO(),
+ "wsgi.multithread": False,
+ "wsgi.multiprocess": False,
+ "wsgi.run_once": False,
+ "CONTENT_LENGTH": str(len(body)),
+ "REMOTE_ADDR": "127.0.0.1",
+ }
+ if self.cookies:
+ environ["HTTP_COOKIE"] = "; ".join(f"{key}={value}" for key, value in self.cookies.items())
+ for key, value in headers.items():
+ env_key = key.upper().replace("-", "_")
+ if env_key == "CONTENT_TYPE":
+ environ["CONTENT_TYPE"] = value
+ elif env_key == "CONTENT_LENGTH":
+ environ["CONTENT_LENGTH"] = value
+ else:
+ environ[f"HTTP_{env_key}"] = value
+
+ captured = {}
+
+ def start_response(status, response_headers, exc_info=None):
+ captured["status"] = status
+ captured["headers"] = response_headers
+
+ body_iter = self.wsgi_app(environ, start_response)
+ try:
+ response_body = b"".join(
+ chunk if isinstance(chunk, bytes) else chunk.encode("utf-8") for chunk in body_iter
+ )
+ finally:
+ close = getattr(body_iter, "close", None)
+ if close:
+ close()
+ response = WsgiResponse(captured["status"], captured["headers"], response_body)
+ self._store_cookies(response.headers)
+ return response
+
+ def _store_cookies(self, headers):
+ for key, value in headers:
+ if key.lower() != "set-cookie":
+ continue
+ cookie = SimpleCookie()
+ cookie.load(value)
+ for name, morsel in cookie.items():
+ if morsel.value == "" and (morsel["expires"] or morsel["max-age"] == "0"):
+ self.cookies.pop(name, None)
+ else:
+ self.cookies[name] = morsel.value
+
+
[email protected]()
+def isolated_app(tmp_path, monkeypatch):
+ monkeypatch.setattr(hglab, "DB_PATH", tmp_path / "hghost.sqlite3")
+ monkeypatch.setattr(hglab, "REPO_ROOT", tmp_path / "repos")
+ hglab.init_db()
+ return hglab
+
+
+def create_user(username, password="correct horse battery staple"):
+ with hglab.db_connect() as conn:
+ conn.execute(
+ "INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
+ (username, hglab.hash_password(password), hglab.utcnow()),
+ )
+ return hglab.get_user_by_username(username)
+
+
+def commit_file(repo_path, relative_path, content, message="initial commit", user="alice"):
+ target = repo_path / relative_path
+ target.parent.mkdir(parents=True, exist_ok=True)
+ target.write_text(content, encoding="utf-8")
+ hglab.run_hg(["add", relative_path], cwd=repo_path)
+ hglab.run_hg(["commit", "-u", user, "-m", message], cwd=repo_path)
+ return hglab.repo_tip_node(repo_path)
+
+
+def basic_auth(username, password):
+ token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
+ return {"Authorization": f"Basic {token}"}
+
+
+def create_repo_with_refs(owner):
+ hglab.create_repository(owner, "demo", "Demo repository")
+ path = hglab.repo_path(owner["username"], "demo")
+ default_node = commit_file(path, "README.md", "# Demo\n", message="initial", user=owner["username"])
+
+ hglab.run_hg(["branch", "feature"], cwd=path)
+ feature_node = commit_file(path, "feature.txt", "feature work\n", message="add feature", user=owner["username"])
+ hglab.run_hg(["tag", "v1.0", "-u", owner["username"], "-m", "tag v1.0"], cwd=path)
+ hglab.run_hg(["bookmark", "-r", feature_node, "feature-bm"], cwd=path)
+
+ hglab.run_hg(["update", "default"], cwd=path)
+ hglab.run_hg(["branch", "old"], cwd=path)
+ old_node = commit_file(path, "old.txt", "old branch\n", message="old branch work", user=owner["username"])
+ hglab.run_hg(["commit", "--close-branch", "-u", owner["username"], "-m", "close old"], cwd=path)
+ hglab.run_hg(["update", "default"], cwd=path)
+
+ return {
+ "path": path,
+ "default_node": default_node,
+ "feature_node": feature_node,
+ "old_node": old_node,
+ }
+
+
+def test_normalize_slug_accepts_trimmed_lowercase_values():
+ assert hglab.normalize_slug(" Demo_Repo-1 ", "Repository name") == "demo_repo-1"
+ assert hglab.normalize_slug("My.Name", "Repository name") == "my.name"
+
+
[email protected](
+ ("value", "label"),
+ [
+ ("x", "Repository name"),
+ ("bad/name", "Repository name"),
+ ("-bad", "Repository name"),
+ ("demo.hg", "Repository name"),
+ ("login", "Username"),
+ ("harrisonerd", "Username"),
+ ],
+)
+def test_normalize_slug_rejects_invalid_or_reserved_values(value, label):
+ with pytest.raises(ValueError):
+ hglab.normalize_slug(value, label)
+
+
+def test_clean_repo_path_normalizes_and_rejects_traversal():
+ assert hglab.clean_repo_path("/docs/readme.md/") == "docs/readme.md"
+ assert hglab.clean_repo_path("") == ""
+
+ for value in ("../secret", "docs/../secret", "./file"):
+ with pytest.raises(HTTPError) as exc_info:
+ hglab.clean_repo_path(value)
+ assert exc_info.value.status_code == 400
+
+
+def test_password_hashes_verify_and_reject_bad_inputs():
+ stored = hglab.hash_password("correct-password")
+
+ assert stored.startswith("pbkdf2_sha256$")
+ assert hglab.verify_password("correct-password", stored)
+ assert not hglab.verify_password("wrong-password", stored)
+ assert not hglab.verify_password("correct-password", "not-a-valid-hash")
+
+
[email protected](
+ ("value", "expected"),
+ [
+ ("example.com", "https://example.com"),
+ (" http://example.com/path ", "http://example.com/path"),
+ ("https://example.com", "https://example.com"),
+ ],
+)
+def test_normalize_website_accepts_http_urls_and_adds_scheme(value, expected):
+ assert hglab.normalize_website(value) == expected
+
+
[email protected]("value", ["ftp://example.com", "https://", "not a url"])
+def test_normalize_website_rejects_invalid_urls(value):
+ with pytest.raises(ValueError):
+ hglab.normalize_website(value)
+
+
+def test_render_markdown_strips_scripts_and_unsafe_links():
+ rendered = hglab.render_markdown(
+ """
+# Title
+
+<script>alert("x")</script>
+
+[safe](https://example.com) [unsafe](javascript:alert(1))
+
+| A |
+| - |
+| B |
+"""
+ )
+
+ assert "<h1>Title</h1>" in rendered
+ assert "script" not in rendered.lower()
+ assert "javascript:" not in rendered.lower()
+ assert 'href="https://example.com"' in rendered
+ assert "<table>" in rendered
+
+
+def test_render_markdown_links_allows_only_links():
+ rendered = hglab.render_markdown_links("[site](https://example.com) **bold** <strong>x</strong>")
+
+ assert rendered == '<a href="https://example.com">site</a> bold <strong>x</strong>'
+
+
+def test_build_tree_deduplicates_entries_and_sorts_directories_first():
+ files = ["README.md", "src/app.py", "src/utils/helpers.py", "docs/index.md", "src/z.txt"]
+
+ assert hglab.build_tree(files, "") == [
+ {"name": "docs", "path": "docs", "type": "dir"},
+ {"name": "src", "path": "src", "type": "dir"},
+ {"name": "README.md", "path": "README.md", "type": "file"},
+ ]
+ assert hglab.build_tree(files, "src") == [
+ {"name": "utils", "path": "src/utils", "type": "dir"},
+ {"name": "app.py", "path": "src/app.py", "type": "file"},
+ {"name": "z.txt", "path": "src/z.txt", "type": "file"},
+ ]
+
+
+def test_ref_option_values_round_trip_quoted_names():
+ branch_name = "feature/a|b c"
+ value = hglab.ref_option_value(hglab.REF_TYPE_BRANCH, branch_name)
+
+ assert value == "branch|feature%2Fa%7Cb%20c"
+ assert hglab.parse_ref_option_value(value) == (hglab.REF_TYPE_BRANCH, branch_name)
+
+
+def test_source_ref_option_values_round_trip_and_validate_source_repo():
+ branch_name = "feature/a|b"
+ value = hglab.source_ref_option_value(42, hglab.REF_TYPE_BRANCH, branch_name)
+
+ assert hglab.parse_source_ref_option_value(value) == (42, hglab.REF_TYPE_BRANCH, branch_name)
+
+ with pytest.raises(ValueError, match="Invalid source repository"):
+ hglab.parse_source_ref_option_value(f"not-int|{hglab.REF_TYPE_BRANCH}|default")
+
+ with pytest.raises(ValueError, match="Invalid source ref"):
+ hglab.parse_source_ref_option_value(f"42|{hglab.REF_TYPE_COMMIT}|abc123")
+
+
+def test_ref_query_and_url_helpers_skip_default_refs_unless_forced():
+ ref = {"type": hglab.REF_TYPE_BRANCH, "name": "default", "is_default": True}
+
+ assert hglab.ref_query_string(ref) == ""
+ assert hglab.ref_query_string(ref, force=True) == "ref_type=branch&ref=default"
+ assert hglab.url_with_ref("/alice/demo", ref, force=True) == "/alice/demo?ref_type=branch&ref=default"
+ assert hglab.url_with_ref("/alice/demo?tab=files", ref, force=True) == (
+ "/alice/demo?tab=files&ref_type=branch&ref=default"
+ )
+
+
+def test_format_ref_label_and_closed_branch_option_label():
+ assert hglab.format_ref_label(hglab.REF_TYPE_TIP) == "tip"
+ assert hglab.format_ref_label(hglab.REF_TYPE_COMMIT, "abcdef1234567890") == "commit abcdef123456"
+ assert hglab.ref_option_label({"type": hglab.REF_TYPE_BRANCH, "name": "old", "closed": True}) == (
+ "branch old (closed)"
+ )
+
+
+def test_init_db_creates_expected_tables_and_is_idempotent(isolated_app):
+ isolated_app.init_db()
+
+ with isolated_app.db_connect() as conn:
+ tables = {
+ row["name"]
+ for row in conn.execute("SELECT name FROM sqlite_master WHERE type = 'table'")
+ }
+ user_columns = {row["name"] for row in conn.execute("PRAGMA table_info(users)")}
+ repo_columns = {row["name"] for row in conn.execute("PRAGMA table_info(repositories)")}
+ pr_columns = {row["name"] for row in conn.execute("PRAGMA table_info(pull_requests)")}
+
+ assert {"users", "repositories", "issues", "pull_requests", "repo_stars"}.issubset(tables)
+ assert {"display_name", "bio", "website"}.issubset(user_columns)
+ assert {"forked_from_repo_id", "forked_at", "forked_from_node"}.issubset(repo_columns)
+ assert {"target_ref_type", "target_ref_name", "source_ref_type", "source_ref_name"}.issubset(pr_columns)
+
+
+def test_create_repository_persists_metadata_and_writes_hgrc(isolated_app):
+ owner = create_user("alice")
+
+ isolated_app.create_repository(owner, "demo", "line one\nline two")
+
+ repo = isolated_app.get_repo("alice", "demo")
+ path = isolated_app.repo_path("alice", "demo")
+ hgrc = (path / ".hg" / "hgrc").read_text(encoding="utf-8")
+
+ assert repo["owner_username"] == "alice"
+ assert repo["name"] == "demo"
+ assert path.joinpath(".hg").is_dir()
+ assert "name = alice/demo" in hgrc
+ assert "description = line one line two" in hgrc
+ assert "allow-push = alice" in hgrc
+
+
+def test_create_repository_rolls_back_database_and_files_when_hg_init_fails(isolated_app, monkeypatch):
+ owner = create_user("alice")
+
+ def fail_hg(*args, **kwargs):
+ raise hglab.HgCommandError("init failed")
+
+ monkeypatch.setattr(isolated_app, "run_hg", fail_hg)
+
+ with pytest.raises(hglab.HgCommandError, match="init failed"):
+ isolated_app.create_repository(owner, "broken", "")
+
+ assert isolated_app.get_repo("alice", "broken") is None
+ assert not isolated_app.repo_path("alice", "broken").exists()
+
+
+def test_star_and_contributor_helpers_update_database_and_hgrc(isolated_app):
+ owner = create_user("alice")
+ contributor = create_user("bob")
+ isolated_app.create_repository(owner, "demo", "")
+ repo = isolated_app.get_repo("alice", "demo")
+
+ isolated_app.star_repo(contributor, repo)
+ assert isolated_app.repo_star_count(repo["id"]) == 1
+ assert isolated_app.user_starred_repo(contributor, repo)
+
+ isolated_app.add_repo_contributor(repo, owner, "bob")
+ assert isolated_app.user_can_maintain_repo(contributor, repo)
+ hgrc = isolated_app.repo_path("alice", "demo").joinpath(".hg", "hgrc").read_text(encoding="utf-8")
+ assert "allow-push = alice bob" in hgrc
+
+ isolated_app.unstar_repo(contributor, repo)
+ isolated_app.remove_repo_contributor(repo, contributor["id"])
+ assert isolated_app.repo_star_count(repo["id"]) == 0
+ assert not isolated_app.user_starred_repo(contributor, repo)
+ assert not isolated_app.user_can_maintain_repo(contributor, repo)
+
+
+def test_issue_queries_count_filter_and_order_comments(isolated_app):
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ repo = isolated_app.get_repo("alice", "demo")
+ now = isolated_app.utcnow()
+
+ with isolated_app.db_connect() as conn:
+ conn.execute(
+ """
+ INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at)
+ VALUES (?, ?, 1, 'open issue', 'body', 'open', ?, ?)
+ """,
+ (repo["id"], owner["id"], now, now),
+ )
+ conn.execute(
+ """
+ INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at, closed_at)
+ VALUES (?, ?, 2, 'closed issue', '', 'closed', ?, ?, ?)
+ """,
+ (repo["id"], owner["id"], now, now, now),
+ )
+ issue_id = conn.execute("SELECT id FROM issues WHERE number = 1").fetchone()["id"]
+ conn.execute(
+ """
+ INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
+ VALUES (?, ?, 'second', '2026-01-01T00:00:02Z', '2026-01-01T00:00:02Z')
+ """,
+ (issue_id, owner["id"]),
+ )
+ conn.execute(
+ """
+ INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
+ VALUES (?, ?, 'first', '2026-01-01T00:00:01Z', '2026-01-01T00:00:01Z')
+ """,
+ (issue_id, owner["id"]),
+ )
+
+ assert isolated_app.issue_counts(repo["id"]) == {"open": 1, "closed": 1}
+ assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "all")] == [2, 1]
+ assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "invalid")] == [1]
+ assert [comment["body"] for comment in isolated_app.list_issue_comments(issue_id)] == ["first", "second"]
+
+
+def test_mercurial_read_helpers_return_files_readme_commits_and_default_ref(isolated_app):
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ path = isolated_app.repo_path("alice", "demo")
+ node = commit_file(path, "README.md", "# Demo\n", message="add readme", user="Alice <[email protected]>")
+
+ files = isolated_app.hg_files(path)
+ readme_name, readme = isolated_app.readme_for_repo(path, files)
+ commits = isolated_app.commit_log(path)
+ ref = isolated_app.default_code_ref(path)
+
+ assert files == ["README.md"]
+ assert (readme_name, readme) == ("README.md", "# Demo\n")
+ assert commits[0]["summary"] == "add readme"
+ assert isolated_app.commit_count(path) == 1
+ assert ref["type"] == isolated_app.REF_TYPE_BRANCH
+ assert ref["name"] == "default"
+ assert ref["node"] == node
+ assert isolated_app.repo_has_revision(path, node)
+ assert isolated_app.is_ancestor(path, isolated_app.NULL_REV, node)
+
+
+def test_create_pull_request_between_fork_repositories(isolated_app):
+ owner = create_user("alice")
+ author = create_user("bob")
+ isolated_app.create_repository(owner, "demo", "")
+ target_path = isolated_app.repo_path("alice", "demo")
+ base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
+ target_repo = isolated_app.get_repo("alice", "demo")
+
+ isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
+ source_repo = isolated_app.get_repo("bob", "demo-fork")
+ source_path = isolated_app.repo_path("bob", "demo-fork")
+ source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")
+
+ number = isolated_app.create_pull_request(
+ target_repo,
+ source_repo,
+ author,
+ "Add feature",
+ "Please merge this",
+ isolated_app.REF_TYPE_TIP,
+ "",
+ isolated_app.REF_TYPE_TIP,
+ "",
+ )
+ pr = isolated_app.get_pull_request(target_repo["id"], number)
+ diff, current_source_node, source_ref = isolated_app.pull_request_diff(pr)
+
+ assert number == 1
+ assert pr["base_node"] == base_node
+ assert pr["source_node"] == source_node
+ assert pr["source_owner_username"] == "bob"
+ assert pr["target_owner_username"] == "alice"
+ assert current_source_node == source_node
+ assert source_ref["type"] == isolated_app.REF_TYPE_TIP
+ assert "feature.txt" in diff
+ assert "new feature" in diff
+
+
+def test_branch_tag_bookmark_helpers_resolve_and_filter_refs(isolated_app):
+ owner = create_user("alice")
+ nodes = create_repo_with_refs(owner)
+ path = nodes["path"]
+
+ branches = {branch["name"]: branch for branch in isolated_app.list_repo_branches(path)}
+ tags = isolated_app.list_repo_tags(path)
+ bookmarks = {bookmark["name"]: bookmark for bookmark in isolated_app.list_repo_bookmarks(path)}
+ target_labels = [option["label"] for option in isolated_app.target_repo_ref_options(path)]
+ all_labels = [option["label"] for option in isolated_app.repo_ref_options(path)]
+
+ assert {"default", "feature", "old"}.issubset(branches)
+ assert branches["default"]["node"] == nodes["default_node"]
+ assert branches["feature"]["closed"] is False
+ assert branches["old"]["closed"] is True
+ assert tags[0]["name"] == "v1.0"
+ assert tags[0]["node"] == nodes["feature_node"]
+ assert bookmarks["feature-bm"]["node"] == nodes["feature_node"]
+ assert isolated_app.default_code_ref(path)["node"] == nodes["default_node"]
+ assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_BRANCH, "feature")["name"] == "feature"
+ assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_BOOKMARK, "feature-bm")["node"] == (
+ nodes["feature_node"]
+ )
+ assert isolated_app.commit_ref(path, nodes["feature_node"])["type"] == isolated_app.REF_TYPE_COMMIT
+ assert "branch old (closed)" not in target_labels
+ assert "branch old (closed)" in all_labels
+
+
+def test_bottle_signup_login_logout_and_new_repo_flow(isolated_app):
+ client = WsgiClient(isolated_app.app)
+
+ response = client.get("/new")
+ assert response.status_code == 303
+ assert response.location_path == "/login?next=/new"
+
+ response = client.post(
+ "/signup",
+ {"username": "alice", "password": "password123", "next": "/new"},
+ )
+ assert response.status_code == 303
+ assert response.location_path == "/new"
+ assert "session" in client.cookies
+
+ response = client.get("/new")
+ assert response.status_code == 200
+ assert "Create repository" in response.text
+
+ response = client.post("/new", {"name": "demo", "description": "A test repository"})
+ assert response.status_code == 303
+ assert response.location_path == "/alice/demo"
+ assert isolated_app.get_repo("alice", "demo") is not None
+
+ response = client.get("/alice/demo")
+ assert response.status_code == 200
+ assert "This repository is empty." in response.text
+ assert "http://example.test/hg/alice/demo" in response.text
+
+ response = client.post("/logout")
+ assert response.status_code == 303
+ assert response.location_path == "/"
+ assert "session" not in client.cookies
+
+ response = client.get("/new")
+ assert response.status_code == 303
+ assert response.location_path == "/login?next=/new"
+
+
+def test_bottle_repository_pages_render_refs_files_raw_and_errors(isolated_app):
+ owner = create_user("alice")
+ nodes = create_repo_with_refs(owner)
+ client = WsgiClient(isolated_app.app)
+ commit_short = nodes["default_node"][:12]
+
+ checks = [
+ ("/", 200, "Recent Activity"),
+ ("/alice", 200, "alice/demo"),
+ ("/alice?tab=stars", 200, "No starred repositories yet."),
+ ("/alice/demo", 200, "<h1>Demo</h1>"),
+ ("/alice/demo/src", 200, "README.md"),
+ ("/alice/demo/src/README.md", 200, "# Demo"),
+ (f"/alice/demo/commits/{commit_short}", 200, "initial"),
+ ("/alice/demo/commits", 200, "initial"),
+ ("/alice/demo/branches", 200, "feature"),
+ ("/alice/demo/branches", 200, "closed"),
+ ("/alice/demo/tags", 200, "v1.0"),
+ ("/alice/demo/bookmarks", 200, "feature-bm"),
+ ("/alice/demo/src?ref_type=branch&ref=feature", 200, "feature.txt"),
+ ("/alice/demo/src/feature.txt?ref_type=branch&ref=feature", 200, "feature work"),
+ ("/alice/demo/issues", 200, "No open issues."),
+ ("/alice/demo/pulls", 200, "No open pull requests."),
+ ("/static/icon.png", 200, ""),
+ ("/favicon.ico", 204, ""),
+ ("/alice/missing", 404, "Repository not found."),
+ ("/alice/demo/src/missing.txt", 404, "Path not found."),
+ ("/alice/demo/src/docs/../secret", 400, "Invalid repository path."),
+ ("/alice/demo/src?ref_type=branch&ref=missing", 404, "Branch not found."),
+ ]
+
+ for path, status_code, expected_text in checks:
+ response = client.get(path)
+ assert response.status_code == status_code, path
+ assert expected_text in response.text, path
+
+ response = client.get("/alice/demo/raw/feature.txt?ref_type=branch&ref=feature")
+ assert response.status_code == 200
+ assert response.body == b"feature work\n"
+ assert response.header("Content-Type").startswith("text/plain")
+
+
+def test_bottle_profile_star_fork_and_repo_settings_flows(isolated_app):
+ owner = create_user("alice")
+ bob = create_user("bob")
+ create_repo_with_refs(owner)
+
+ bob_client = WsgiClient(isolated_app.app)
+ response = bob_client.post(
+ "/login",
+ {"username": "bob", "password": "correct horse battery staple", "next": "/"},
+ )
+ assert response.status_code == 303
+
+ response = bob_client.post("/alice/demo/star", {"action": "star"})
+ assert response.status_code == 303
+ assert isolated_app.repo_star_count(isolated_app.get_repo("alice", "demo")["id"]) == 1
+
+ response = bob_client.post("/alice/demo/fork", {"name": "demo", "description": "Forked"})
+ assert response.status_code == 303
+ assert response.location_path == "/bob/demo"
+ assert isolated_app.get_repo("bob", "demo") is not None
+ assert "Fork of <a href=\"/alice/demo\">alice/demo</a>" in bob_client.get("/bob/demo").text
+
+ owner_client = WsgiClient(isolated_app.app)
+ response = owner_client.post(
+ "/login",
+ {"username": "alice", "password": "correct horse battery staple", "next": "/"},
+ )
+ assert response.status_code == 303
+
+ response = owner_client.post(
+ "/settings/profile",
+ {"display_name": "Alice A.", "bio": "Mercurial maintainer", "website": "example.com"},
+ )
+ assert response.status_code == 200
+ assert "Profile updated." in response.text
+ profile = owner_client.get("/alice")
+ assert "Alice A." in profile.text
+ assert "https://example.com" in profile.text
+
+ response = owner_client.post("/alice/demo/settings", {"action": "save", "description": "Updated"})
+ assert response.status_code == 200
+ assert "Repository settings updated." in response.text
+ assert isolated_app.get_repo("alice", "demo")["description"] == "Updated"
+
+ response = owner_client.post("/alice/demo/settings", {"action": "add_contributor", "username": "bob"})
+ assert response.status_code == 303
+ repo = isolated_app.get_repo("alice", "demo")
+ assert isolated_app.user_can_maintain_repo(bob, repo)
+
+ response = owner_client.post("/alice/demo/settings", {"action": "remove_contributor", "user_id": str(bob["id"])})
+ assert response.status_code == 303
+ assert not isolated_app.user_can_maintain_repo(bob, repo)
+
+ response = owner_client.post("/alice/demo/settings", {"action": "delete", "confirm_name": "wrong"})
+ assert response.status_code == 200
+ assert "Type "demo" to confirm deletion." in response.text
+
+
+def test_bottle_issue_routes_create_comment_close_and_reopen(isolated_app):
+ owner = create_user("alice")
+ create_repo_with_refs(owner)
+ client = WsgiClient(isolated_app.app)
+ client.post("/login", {"username": "alice", "password": "correct horse battery staple", "next": "/"})
+
+ response = client.get("/alice/demo/issues/new")
+ assert response.status_code == 200
+ assert "Open issue" in response.text
+
+ response = client.post("/alice/demo/issues/new", {"title": "", "body": ""})
+ assert response.status_code == 200
+ assert "Issue title is required." in response.text
+
+ response = client.post("/alice/demo/issues/new", {"title": "Bug report", "body": "It fails"})
+ assert response.status_code == 303
+ assert response.location_path == "/alice/demo/issues/1"
+
+ response = client.get("/alice/demo/issues/1")
+ assert response.status_code == 200
+ assert "Bug report" in response.text
+ assert "It fails" in response.text
+
+ response = client.post("/alice/demo/issues/1", {"action": "comment", "body": ""})
+ assert response.status_code == 200
+ assert "Comment body is required." in response.text
+
+ response = client.post("/alice/demo/issues/1", {"action": "comment", "body": "I can reproduce this"})
+ assert response.status_code == 303
+ assert "I can reproduce this" in client.get("/alice/demo/issues/1").text
+
+ response = client.post("/alice/demo/issues/1", {"action": "close"})
+ assert response.status_code == 303
+ assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "closed"
+
+ response = client.post("/alice/demo/issues/1", {"action": "reopen"})
+ assert response.status_code == 303
+ assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "open"
+
+
+def test_bottle_pull_request_routes_create_comment_forbid_and_merge(isolated_app):
+ owner = create_user("alice")
+ author = create_user("bob")
+ isolated_app.create_repository(owner, "demo", "")
+ target_path = isolated_app.repo_path("alice", "demo")
+ base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
+ target_repo = isolated_app.get_repo("alice", "demo")
+ isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
+ source_repo = isolated_app.get_repo("bob", "demo-fork")
+ source_path = isolated_app.repo_path("bob", "demo-fork")
+ source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")
+
+ bob_client = WsgiClient(isolated_app.app)
+ bob_client.post("/login", {"username": "bob", "password": "correct horse battery staple", "next": "/"})
+
+ response = bob_client.get("/alice/demo/pulls/new")
+ assert response.status_code == 200
+ assert "bob/demo-fork tip" in response.text
+
+ response = bob_client.post(
+ "/alice/demo/pulls/new",
+ {
+ "source_ref": isolated_app.source_ref_option_value(source_repo["id"], isolated_app.REF_TYPE_TIP, ""),
+ "target_ref": isolated_app.ref_option_value(isolated_app.REF_TYPE_TIP, ""),
+ "title": "Add feature",
+ "body": "Please merge this",
+ },
+ )
+ assert response.status_code == 303
+ assert response.location_path == "/alice/demo/pulls/1"
+
+ pr = isolated_app.get_pull_request(target_repo["id"], 1)
+ assert pr["base_node"] == base_node
+ assert pr["source_node"] == source_node
+
+ response = bob_client.get("/alice/demo/pulls/1")
+ assert response.status_code == 200
+ assert "feature.txt" in response.text
+ assert "new feature" in response.text
+
+ response = bob_client.post("/alice/demo/pulls/1", {"action": "comment", "body": "Looks ready"})
+ assert response.status_code == 303
+ assert "Looks ready" in bob_client.get("/alice/demo/pulls/1").text
+
+ response = bob_client.post("/alice/demo/pulls/1", {"action": "close"})
+ assert response.status_code == 403
+ assert "Only maintainers can update pull requests." in response.text
+
+ owner_client = WsgiClient(isolated_app.app)
+ owner_client.post("/login", {"username": "alice", "password": "correct horse battery staple", "next": "/"})
+ response = owner_client.post("/alice/demo/pulls/1", {"action": "merge"})
+ assert response.status_code == 303
+
+ merged = isolated_app.get_pull_request(target_repo["id"], 1)
+ assert merged["status"] == "merged"
+ assert merged["merge_node"] == source_node
+ response = owner_client.get("/alice/demo/pulls/1")
+ assert response.status_code == 200
+ assert "Merged by alice" in response.text
+
+
+def test_mercurial_http_routes_are_public_for_reads_and_protect_writes(isolated_app):
+ owner = create_user("alice", password="owner-password")
+ create_user("bob", password="bob-password")
+ isolated_app.create_repository(owner, "demo", "")
+ commit_file(isolated_app.repo_path("alice", "demo"), "README.md", "# Demo\n", message="initial", user="alice")
+ client = WsgiClient(isolated_app.app)
+
+ response = client.get("/hg/alice/demo?cmd=capabilities")
+ assert response.status_code == 200
+ assert b"lookup" in response.body
+
+ response = client.post("/hg/alice/demo?cmd=unbundle")
+ assert response.status_code == 401
+ assert response.header("WWW-Authenticate") == 'Basic realm="HgHost"'
+ assert "Authentication required." in response.text
+
+ response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
+ assert response.status_code == 401
+ assert "Invalid Mercurial credentials." in response.text
+
+ response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("bob", "bob-password"))
+ assert response.status_code == 403
+ assert "Push not authorized" in response.text