Fork of patx/hglab.
tokenmess/hglab
import atexit
import base64
from http.cookies import SimpleCookie
from io import BytesIO, StringIO
import os
import re
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
from urllib.parse import urlencode, urlsplit, unquote
import pytest
from bottle import HTTPError
BOOTSTRAP_DIR = Path(tempfile.mkdtemp(prefix="hglab-test-bootstrap-"))
atexit.register(shutil.rmtree, BOOTSTRAP_DIR, ignore_errors=True)
os.environ["HG_HOST_DB"] = str(BOOTSTRAP_DIR / "hghost.sqlite3")
os.environ["HG_HOST_REPO_ROOT"] = str(BOOTSTRAP_DIR / "repos")
os.environ["SECRET_KEY"] = "test-secret"
import app as hglab # noqa: E402
class WsgiResponse:
def __init__(self, status_line, headers, body):
self.status_line = status_line
self.status_code = int(status_line.split(" ", 1)[0])
self.headers = headers
self.body = body
self.text = body.decode("utf-8", "replace")
def header(self, name, default=None):
name = name.lower()
for key, value in self.headers:
if key.lower() == name:
return value
return default
@property
def location(self):
return self.header("Location")
@property
def location_path(self):
location = self.location
if not location:
return None
split = urlsplit(location)
if not split.scheme and not split.netloc:
return location
return split.path + (f"?{split.query}" if split.query else "")
class WsgiClient:
def __init__(self, wsgi_app):
self.wsgi_app = wsgi_app
self.cookies = {}
self.csrf_token = None
def get(self, path, headers=None):
return self.request("GET", path, headers=headers)
def post(self, path, data=None, headers=None):
return self.request("POST", path, data=data, headers=headers)
def request(self, method, path, data=None, headers=None):
headers = headers or {}
split = urlsplit(path)
body = b""
if method == "POST" and data is None and not split.path.startswith("/hg/") and self.csrf_token:
data = {}
if data is not None:
if (
method == "POST"
and not split.path.startswith("/hg/")
and hglab.CSRF_FORM_FIELD not in data
and self.csrf_token
):
data = {**data, hglab.CSRF_FORM_FIELD: self.csrf_token}
body = urlencode(data, doseq=True).encode("utf-8")
headers = {"Content-Type": "application/x-www-form-urlencoded", **headers}
environ = {
"REQUEST_METHOD": method,
"SCRIPT_NAME": "",
"PATH_INFO": unquote(split.path),
"QUERY_STRING": split.query,
"SERVER_NAME": "example.test",
"SERVER_PORT": "80",
"SERVER_PROTOCOL": "HTTP/1.1",
"HTTP_HOST": "example.test",
"wsgi.version": (1, 0),
"wsgi.url_scheme": "http",
"wsgi.input": BytesIO(body),
"wsgi.errors": StringIO(),
"wsgi.multithread": False,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
"CONTENT_LENGTH": str(len(body)),
"REMOTE_ADDR": "127.0.0.1",
}
if self.cookies:
environ["HTTP_COOKIE"] = "; ".join(f"{key}={value}" for key, value in self.cookies.items())
for key, value in headers.items():
env_key = key.upper().replace("-", "_")
if env_key == "CONTENT_TYPE":
environ["CONTENT_TYPE"] = value
elif env_key == "CONTENT_LENGTH":
environ["CONTENT_LENGTH"] = value
else:
environ[f"HTTP_{env_key}"] = value
captured = {}
def start_response(status, response_headers, exc_info=None):
captured["status"] = status
captured["headers"] = response_headers
body_iter = self.wsgi_app(environ, start_response)
try:
response_body = b"".join(
chunk if isinstance(chunk, bytes) else chunk.encode("utf-8") for chunk in body_iter
)
finally:
close = getattr(body_iter, "close", None)
if close:
close()
response = WsgiResponse(captured["status"], captured["headers"], response_body)
self._store_cookies(response.headers)
self._store_csrf_token(response.text)
return response
def _store_cookies(self, headers):
for key, value in headers:
if key.lower() != "set-cookie":
continue
cookie = SimpleCookie()
cookie.load(value)
for name, morsel in cookie.items():
if morsel.value == "" and (morsel["expires"] or morsel["max-age"] == "0"):
self.cookies.pop(name, None)
else:
self.cookies[name] = morsel.value
def _store_csrf_token(self, text):
match = re.search(r'name="_csrf_token"\s+value="([^"]+)"', text)
if match:
self.csrf_token = match.group(1)
def login_client(client, username, password="correct horse battery staple", next_url="/"):
client.get("/login")
return client.post("/login", {"username": username, "password": password, "next": next_url})
@pytest.fixture()
def isolated_app(tmp_path, monkeypatch):
monkeypatch.setattr(hglab, "DB_PATH", tmp_path / "hghost.sqlite3")
monkeypatch.setattr(hglab, "REPO_ROOT", tmp_path / "repos")
hglab.AUTH_FAILURES.clear()
hglab.init_db()
return hglab
def create_user(username, password="correct horse battery staple"):
with hglab.db_connect() as conn:
conn.execute(
"INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
(username, hglab.hash_password(password), hglab.utcnow()),
)
return hglab.get_user_by_username(username)
def commit_file(repo_path, relative_path, content, message="initial commit", user="alice"):
target = repo_path / relative_path
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding="utf-8")
hglab.run_hg(["add", relative_path], cwd=repo_path)
hglab.run_hg(["commit", "-u", user, "-m", message], cwd=repo_path)
return hglab.repo_tip_node(repo_path)
def basic_auth(username, password):
token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
return {"Authorization": f"Basic {token}"}
def create_repo_with_refs(owner):
hglab.create_repository(owner, "demo", "Demo repository")
path = hglab.repo_path(owner["username"], "demo")
default_node = commit_file(path, "README.md", "# Demo\n", message="initial", user=owner["username"])
hglab.run_hg(["branch", "feature"], cwd=path)
feature_node = commit_file(path, "feature.txt", "feature work\n", message="add feature", user=owner["username"])
hglab.run_hg(["tag", "v1.0", "-u", owner["username"], "-m", "tag v1.0"], cwd=path)
hglab.run_hg(["bookmark", "-r", feature_node, "feature-bm"], cwd=path)
hglab.run_hg(["update", "default"], cwd=path)
hglab.run_hg(["branch", "old"], cwd=path)
old_node = commit_file(path, "old.txt", "old branch\n", message="old branch work", user=owner["username"])
hglab.run_hg(["commit", "--close-branch", "-u", owner["username"], "-m", "close old"], cwd=path)
hglab.run_hg(["update", "default"], cwd=path)
return {
"path": path,
"default_node": default_node,
"feature_node": feature_node,
"old_node": old_node,
}
def test_normalize_slug_accepts_trimmed_lowercase_values():
assert hglab.normalize_slug(" Demo_Repo-1 ", "Repository name") == "demo_repo-1"
assert hglab.normalize_slug("My.Name", "Repository name") == "my.name"
@pytest.mark.parametrize(
("value", "label"),
[
("x", "Repository name"),
("bad/name", "Repository name"),
("-bad", "Repository name"),
("demo.hg", "Repository name"),
("login", "Username"),
("harrisonerd", "Username"),
],
)
def test_normalize_slug_rejects_invalid_or_reserved_values(value, label):
with pytest.raises(ValueError):
hglab.normalize_slug(value, label)
def test_clean_repo_path_normalizes_and_rejects_traversal():
assert hglab.clean_repo_path("/docs/readme.md/") == "docs/readme.md"
assert hglab.clean_repo_path("") == ""
for value in ("../secret", "docs/../secret", "./file"):
with pytest.raises(HTTPError) as exc_info:
hglab.clean_repo_path(value)
assert exc_info.value.status_code == 400
def test_password_hashes_verify_and_reject_bad_inputs():
stored = hglab.hash_password("correct-password")
assert stored.startswith("pbkdf2_sha256$")
assert hglab.verify_password("correct-password", stored)
assert not hglab.verify_password("wrong-password", stored)
assert not hglab.verify_password("correct-password", "not-a-valid-hash")
@pytest.mark.parametrize(
("value", "expected"),
[
("example.com", "https://example.com"),
(" http://example.com/path ", "http://example.com/path"),
("https://example.com", "https://example.com"),
],
)
def test_normalize_website_accepts_http_urls_and_adds_scheme(value, expected):
assert hglab.normalize_website(value) == expected
@pytest.mark.parametrize("value", ["ftp://example.com", "https://", "not a url"])
def test_normalize_website_rejects_invalid_urls(value):
with pytest.raises(ValueError):
hglab.normalize_website(value)
def test_render_markdown_strips_scripts_and_unsafe_links():
rendered = hglab.render_markdown(
"""
# Title
<script>alert("x")</script>
[safe](https://example.com) [unsafe](javascript:alert(1))
| A |
| - |
| B |
"""
)
assert "<h1>Title</h1>" in rendered
assert "script" not in rendered.lower()
assert "javascript:" not in rendered.lower()
assert 'href="https://example.com"' in rendered
assert "<table>" in rendered
def test_render_markdown_links_allows_only_links():
rendered = hglab.render_markdown_links("[site](https://example.com) **bold** <strong>x</strong>")
assert rendered == '<a href="https://example.com">site</a> bold <strong>x</strong>'
def test_startup_config_rejects_default_secret_outside_debug(monkeypatch):
monkeypatch.setattr(hglab, "DEBUG", False)
monkeypatch.setattr(hglab, "SECRET_KEY", hglab.DEFAULT_SECRET_KEY)
with pytest.raises(RuntimeError):
hglab.validate_startup_config()
monkeypatch.setattr(hglab, "SECRET_KEY", "production-secret")
hglab.validate_startup_config()
def test_security_headers_and_secure_cookie_flags(isolated_app):
client = WsgiClient(isolated_app.app)
response = client.get("/login", headers={"X-Forwarded-Proto": "https"})
assert response.status_code == 200
assert response.header("X-Content-Type-Options") == "nosniff"
assert response.header("Referrer-Policy") == "same-origin"
assert response.header("X-Frame-Options") == "DENY"
assert "frame-ancestors 'none'" in response.header("Content-Security-Policy")
csrf_cookie = response.header("Set-Cookie")
assert "csrf_token=" in csrf_cookie
assert "HttpOnly" in csrf_cookie
assert "samesite=lax" in csrf_cookie.lower()
assert "Secure" in csrf_cookie
create_user("alice")
response = client.post(
"/login",
{"username": "alice", "password": "correct horse battery staple", "next": "/"},
headers={"X-Forwarded-Proto": "https"},
)
assert response.status_code == 303
session_cookie = response.header("Set-Cookie")
assert "session=" in session_cookie
assert "HttpOnly" in session_cookie
assert "samesite=lax" in session_cookie.lower()
assert "Secure" in session_cookie
def test_csrf_required_for_browser_posts_and_hg_is_exempt(isolated_app):
owner = create_user("alice", password="owner-password")
isolated_app.create_repository(owner, "demo", "")
client = WsgiClient(isolated_app.app)
response = client.post("/login", {"username": "alice", "password": "owner-password", "next": "/"})
assert response.status_code == 403
assert "Invalid CSRF token." in response.text
client.get("/login")
response = client.post(
"/login",
{"username": "alice", "password": "owner-password", "next": "/", hglab.CSRF_FORM_FIELD: "bad-token"},
)
assert response.status_code == 403
response = client.post("/hg/alice/demo?cmd=unbundle")
assert response.status_code == 401
assert response.header("WWW-Authenticate") == 'Basic realm="HgHost"'
def test_browser_form_size_limit(isolated_app, monkeypatch):
monkeypatch.setattr(hglab, "MAX_FORM_BYTES", 40)
client = WsgiClient(isolated_app.app)
client.get("/login")
response = client.post("/login", {"username": "a" * 100, "password": "bad", "next": "/"})
assert response.status_code == 413
assert "Request body too large." in response.text
def test_login_and_hg_auth_failures_are_rate_limited(isolated_app, monkeypatch):
monkeypatch.setattr(hglab, "RATE_LIMIT_MAX_FAILURES", 2)
monkeypatch.setattr(hglab, "RATE_LIMIT_COOLDOWN_SECONDS", 60)
create_user("alice", password="owner-password")
client = WsgiClient(isolated_app.app)
client.get("/login")
for _ in range(2):
response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
assert response.status_code == 200
response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
assert response.status_code == 429
assert response.header("Retry-After") == "60"
hglab.AUTH_FAILURES.clear()
owner = hglab.get_user_by_username("alice")
isolated_app.create_repository(owner, "demo", "")
for _ in range(2):
response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
assert response.status_code == 401
response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
assert response.status_code == 429
def test_readme_and_file_previews_are_truncated(isolated_app, monkeypatch):
monkeypatch.setattr(hglab, "MAX_RENDER_BYTES", 32)
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "")
commit_file(isolated_app.repo_path("alice", "demo"), "README.md", "A" * 200, message="large readme")
client = WsgiClient(isolated_app.app)
response = client.get("/alice/demo")
assert response.status_code == 200
assert "README preview truncated." in response.text
response = client.get("/alice/demo/src/README.md")
assert response.status_code == 200
assert "File preview truncated." in response.text
def test_build_tree_deduplicates_entries_and_sorts_directories_first():
files = ["README.md", "src/app.py", "src/utils/helpers.py", "docs/index.md", "src/z.txt"]
assert hglab.build_tree(files, "") == [
{"name": "docs", "path": "docs", "type": "dir"},
{"name": "src", "path": "src", "type": "dir"},
{"name": "README.md", "path": "README.md", "type": "file"},
]
assert hglab.build_tree(files, "src") == [
{"name": "utils", "path": "src/utils", "type": "dir"},
{"name": "app.py", "path": "src/app.py", "type": "file"},
{"name": "z.txt", "path": "src/z.txt", "type": "file"},
]
def test_ref_option_values_round_trip_quoted_names():
branch_name = "feature/a|b c"
value = hglab.ref_option_value(hglab.REF_TYPE_BRANCH, branch_name)
assert value == "branch|feature%2Fa%7Cb%20c"
assert hglab.parse_ref_option_value(value) == (hglab.REF_TYPE_BRANCH, branch_name)
def test_source_ref_option_values_round_trip_and_validate_source_repo():
branch_name = "feature/a|b"
value = hglab.source_ref_option_value(42, hglab.REF_TYPE_BRANCH, branch_name)
assert hglab.parse_source_ref_option_value(value) == (42, hglab.REF_TYPE_BRANCH, branch_name)
with pytest.raises(ValueError, match="Invalid source repository"):
hglab.parse_source_ref_option_value(f"not-int|{hglab.REF_TYPE_BRANCH}|default")
with pytest.raises(ValueError, match="Invalid source ref"):
hglab.parse_source_ref_option_value(f"42|{hglab.REF_TYPE_COMMIT}|abc123")
def test_ref_query_and_url_helpers_skip_default_refs_unless_forced():
ref = {"type": hglab.REF_TYPE_BRANCH, "name": "default", "is_default": True}
assert hglab.ref_query_string(ref) == ""
assert hglab.ref_query_string(ref, force=True) == "ref_type=branch&ref=default"
assert hglab.url_with_ref("/alice/demo", ref, force=True) == "/alice/demo?ref_type=branch&ref=default"
assert hglab.url_with_ref("/alice/demo?tab=files", ref, force=True) == (
"/alice/demo?tab=files&ref_type=branch&ref=default"
)
def test_format_ref_label_and_closed_branch_option_label():
assert hglab.format_ref_label(hglab.REF_TYPE_TIP) == "tip"
assert hglab.format_ref_label(hglab.REF_TYPE_COMMIT, "abcdef1234567890") == "commit abcdef123456"
assert hglab.ref_option_label({"type": hglab.REF_TYPE_BRANCH, "name": "old", "closed": True}) == (
"branch old (closed)"
)
def test_init_db_creates_expected_tables_and_is_idempotent(isolated_app):
isolated_app.init_db()
with isolated_app.db_connect() as conn:
tables = {
row["name"]
for row in conn.execute("SELECT name FROM sqlite_master WHERE type = 'table'")
}
user_columns = {row["name"] for row in conn.execute("PRAGMA table_info(users)")}
repo_columns = {row["name"] for row in conn.execute("PRAGMA table_info(repositories)")}
pr_columns = {row["name"] for row in conn.execute("PRAGMA table_info(pull_requests)")}
assert {"users", "repositories", "issues", "pull_requests", "repo_stars"}.issubset(tables)
assert {"display_name", "bio", "website"}.issubset(user_columns)
assert {"forked_from_repo_id", "forked_at", "forked_from_node"}.issubset(repo_columns)
assert {"target_ref_type", "target_ref_name", "source_ref_type", "source_ref_name"}.issubset(pr_columns)
def test_db_connect_configures_sqlite_for_worker_contention(isolated_app):
isolated_app.init_db()
with isolated_app.db_connect() as conn:
foreign_keys = conn.execute("PRAGMA foreign_keys").fetchone()[0]
busy_timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0]
synchronous = conn.execute("PRAGMA synchronous").fetchone()[0]
journal_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert foreign_keys == 1
assert busy_timeout == hglab.SQLITE_BUSY_TIMEOUT_MS
assert synchronous == 1
assert journal_mode.lower() == "wal"
def test_sqlite_accepts_concurrent_worker_writes(isolated_app):
isolated_app.init_db()
repo_root = Path(__file__).resolve().parents[1]
env = os.environ.copy()
existing_pythonpath = env.get("PYTHONPATH")
env["PYTHONPATH"] = str(repo_root) if not existing_pythonpath else f"{repo_root}{os.pathsep}{existing_pythonpath}"
worker_script = """
import os
import sys
import time
os.environ["HG_HOST_DB"] = sys.argv[1]
os.environ["HG_HOST_REPO_ROOT"] = sys.argv[2]
os.environ["SECRET_KEY"] = "test-secret"
import app as hglab
with hglab.db_connect() as conn:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
(sys.argv[3], "hash", hglab.utcnow()),
)
time.sleep(0.2)
"""
processes = [
subprocess.Popen(
[
sys.executable,
"-c",
worker_script,
str(isolated_app.DB_PATH),
str(isolated_app.REPO_ROOT),
f"worker-{index}",
],
cwd=repo_root,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
for index in range(3)
]
results = []
try:
for process in processes:
stdout, stderr = process.communicate(timeout=15)
results.append((process.returncode, stdout, stderr))
finally:
for process in processes:
if process.poll() is None:
process.kill()
assert results
for returncode, stdout, stderr in results:
assert returncode == 0, stdout + stderr
with isolated_app.db_connect() as conn:
count = conn.execute("SELECT COUNT(*) FROM users WHERE username LIKE 'worker-%'").fetchone()[0]
assert count == 3
def test_create_repository_persists_metadata_and_writes_hgrc(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "line one\nline two")
repo = isolated_app.get_repo("alice", "demo")
path = isolated_app.repo_path("alice", "demo")
hgrc = (path / ".hg" / "hgrc").read_text(encoding="utf-8")
assert repo["owner_username"] == "alice"
assert repo["name"] == "demo"
assert path.joinpath(".hg").is_dir()
assert "name = alice/demo" in hgrc
assert "description = line one line two" in hgrc
assert "allow-push = alice" in hgrc
def test_create_repository_rolls_back_database_and_files_when_hg_init_fails(isolated_app, monkeypatch):
owner = create_user("alice")
def fail_hg(*args, **kwargs):
raise hglab.HgCommandError("init failed")
monkeypatch.setattr(isolated_app, "run_hg", fail_hg)
with pytest.raises(hglab.HgCommandError, match="init failed"):
isolated_app.create_repository(owner, "broken", "")
assert isolated_app.get_repo("alice", "broken") is None
assert not isolated_app.repo_path("alice", "broken").exists()
def test_star_and_contributor_helpers_update_database_and_hgrc(isolated_app):
owner = create_user("alice")
contributor = create_user("bob")
isolated_app.create_repository(owner, "demo", "")
repo = isolated_app.get_repo("alice", "demo")
isolated_app.star_repo(contributor, repo)
assert isolated_app.repo_star_count(repo["id"]) == 1
assert isolated_app.user_starred_repo(contributor, repo)
isolated_app.add_repo_contributor(repo, owner, "bob")
assert isolated_app.user_can_maintain_repo(contributor, repo)
hgrc = isolated_app.repo_path("alice", "demo").joinpath(".hg", "hgrc").read_text(encoding="utf-8")
assert "allow-push = alice bob" in hgrc
isolated_app.unstar_repo(contributor, repo)
isolated_app.remove_repo_contributor(repo, contributor["id"])
assert isolated_app.repo_star_count(repo["id"]) == 0
assert not isolated_app.user_starred_repo(contributor, repo)
assert not isolated_app.user_can_maintain_repo(contributor, repo)
def test_issue_queries_count_filter_and_order_comments(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "")
repo = isolated_app.get_repo("alice", "demo")
now = isolated_app.utcnow()
with isolated_app.db_connect() as conn:
conn.execute(
"""
INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at)
VALUES (?, ?, 1, 'open issue', 'body', 'open', ?, ?)
""",
(repo["id"], owner["id"], now, now),
)
conn.execute(
"""
INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at, closed_at)
VALUES (?, ?, 2, 'closed issue', '', 'closed', ?, ?, ?)
""",
(repo["id"], owner["id"], now, now, now),
)
issue_id = conn.execute("SELECT id FROM issues WHERE number = 1").fetchone()["id"]
conn.execute(
"""
INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
VALUES (?, ?, 'second', '2026-01-01T00:00:02Z', '2026-01-01T00:00:02Z')
""",
(issue_id, owner["id"]),
)
conn.execute(
"""
INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
VALUES (?, ?, 'first', '2026-01-01T00:00:01Z', '2026-01-01T00:00:01Z')
""",
(issue_id, owner["id"]),
)
assert isolated_app.issue_counts(repo["id"]) == {"open": 1, "closed": 1}
assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "all")] == [2, 1]
assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "invalid")] == [1]
assert [comment["body"] for comment in isolated_app.list_issue_comments(issue_id)] == ["first", "second"]
def test_mercurial_read_helpers_return_files_readme_commits_and_default_ref(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "")
path = isolated_app.repo_path("alice", "demo")
node = commit_file(path, "README.md", "# Demo\n", message="add readme", user="Alice <[email protected]>")
files = isolated_app.hg_files(path)
readme_name, readme = isolated_app.readme_for_repo(path, files)
commits = isolated_app.commit_log(path)
ref = isolated_app.default_code_ref(path)
assert files == ["README.md"]
assert (readme_name, readme) == ("README.md", "# Demo\n")
assert commits[0]["summary"] == "add readme"
assert isolated_app.commit_count(path) == 1
assert ref["type"] == isolated_app.REF_TYPE_BRANCH
assert ref["name"] == "default"
assert ref["node"] == node
assert isolated_app.repo_has_revision(path, node)
assert isolated_app.is_ancestor(path, isolated_app.NULL_REV, node)
def test_create_pull_request_between_fork_repositories(isolated_app):
owner = create_user("alice")
author = create_user("bob")
isolated_app.create_repository(owner, "demo", "")
target_path = isolated_app.repo_path("alice", "demo")
base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
target_repo = isolated_app.get_repo("alice", "demo")
isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
source_repo = isolated_app.get_repo("bob", "demo-fork")
source_path = isolated_app.repo_path("bob", "demo-fork")
source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")
number = isolated_app.create_pull_request(
target_repo,
source_repo,
author,
"Add feature",
"Please merge this",
isolated_app.REF_TYPE_TIP,
"",
isolated_app.REF_TYPE_TIP,
"",
)
pr = isolated_app.get_pull_request(target_repo["id"], number)
diff, current_source_node, source_ref = isolated_app.pull_request_diff(pr)
assert number == 1
assert pr["base_node"] == base_node
assert pr["source_node"] == source_node
assert pr["source_owner_username"] == "bob"
assert pr["target_owner_username"] == "alice"
assert current_source_node == source_node
assert source_ref["type"] == isolated_app.REF_TYPE_TIP
assert "feature.txt" in diff
assert "new feature" in diff
def test_branch_tag_bookmark_helpers_resolve_and_filter_refs(isolated_app):
owner = create_user("alice")
nodes = create_repo_with_refs(owner)
path = nodes["path"]
branches = {branch["name"]: branch for branch in isolated_app.list_repo_branches(path)}
tags = isolated_app.list_repo_tags(path)
bookmarks = {bookmark["name"]: bookmark for bookmark in isolated_app.list_repo_bookmarks(path)}
target_labels = [option["label"] for option in isolated_app.target_repo_ref_options(path)]
all_labels = [option["label"] for option in isolated_app.repo_ref_options(path)]
assert {"default", "feature", "old"}.issubset(branches)
assert branches["default"]["node"] == nodes["default_node"]
assert branches["feature"]["closed"] is False
assert branches["old"]["closed"] is True
assert tags[0]["name"] == "v1.0"
assert tags[0]["node"] == nodes["feature_node"]
assert bookmarks["feature-bm"]["node"] == nodes["feature_node"]
assert isolated_app.default_code_ref(path)["node"] == nodes["default_node"]
assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_BRANCH, "feature")["name"] == "feature"
assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_BOOKMARK, "feature-bm")["node"] == (
nodes["feature_node"]
)
assert isolated_app.commit_ref(path, nodes["feature_node"])["type"] == isolated_app.REF_TYPE_COMMIT
assert "branch old (closed)" not in target_labels
assert "branch old (closed)" in all_labels
def test_bottle_signup_login_logout_and_new_repo_flow(isolated_app):
client = WsgiClient(isolated_app.app)
response = client.get("/new")
assert response.status_code == 303
assert response.location_path == "/login?next=/new"
response = client.get("/signup?next=/new")
assert response.status_code == 200
response = client.post(
"/signup",
{"username": "alice", "password": "password123", "next": "/new"},
)
assert response.status_code == 303
assert response.location_path == "/new"
assert "session" in client.cookies
response = client.get("/new")
assert response.status_code == 200
assert "Create repository" in response.text
response = client.post("/new", {"name": "demo", "description": "A test repository"})
assert response.status_code == 303
assert response.location_path == "/alice/demo"
assert isolated_app.get_repo("alice", "demo") is not None
response = client.get("/alice/demo")
assert response.status_code == 200
assert "This repository is empty." in response.text
assert "http://example.test/hg/alice/demo" in response.text
response = client.post("/logout")
assert response.status_code == 303
assert response.location_path == "/"
assert "session" not in client.cookies
response = client.get("/new")
assert response.status_code == 303
assert response.location_path == "/login?next=/new"
def test_bottle_repository_pages_render_refs_files_raw_and_errors(isolated_app):
owner = create_user("alice")
nodes = create_repo_with_refs(owner)
client = WsgiClient(isolated_app.app)
commit_short = nodes["default_node"][:12]
checks = [
("/", 200, "Recent Activity"),
("/alice", 200, "alice/demo"),
("/alice?tab=stars", 200, "No starred repositories yet."),
("/alice/demo", 200, "<h1>Demo</h1>"),
("/alice/demo/src", 200, "README.md"),
("/alice/demo/src/README.md", 200, "# Demo"),
(f"/alice/demo/commits/{commit_short}", 200, "initial"),
("/alice/demo/commits", 200, "initial"),
("/alice/demo/branches", 200, "feature"),
("/alice/demo/branches", 200, "closed"),
("/alice/demo/tags", 200, "v1.0"),
("/alice/demo/bookmarks", 200, "feature-bm"),
("/alice/demo/src?ref_type=branch&ref=feature", 200, "feature.txt"),
("/alice/demo/src/feature.txt?ref_type=branch&ref=feature", 200, "feature work"),
("/alice/demo/issues", 200, "No open issues."),
("/alice/demo/pulls", 200, "No open pull requests."),
("/static/icon.png", 200, ""),
("/favicon.ico", 204, ""),
("/alice/missing", 404, "Repository not found."),
("/alice/demo/src/missing.txt", 404, "Path not found."),
("/alice/demo/src/docs/../secret", 400, "Invalid repository path."),
("/alice/demo/src?ref_type=branch&ref=missing", 404, "Branch not found."),
]
for path, status_code, expected_text in checks:
response = client.get(path)
assert response.status_code == status_code, path
assert expected_text in response.text, path
response = client.get("/alice/demo/raw/feature.txt?ref_type=branch&ref=feature")
assert response.status_code == 200
assert response.body == b"feature work\n"
assert response.header("Content-Type").startswith("text/plain")
def test_bottle_profile_star_fork_and_repo_settings_flows(isolated_app):
owner = create_user("alice")
bob = create_user("bob")
create_repo_with_refs(owner)
bob_client = WsgiClient(isolated_app.app)
response = login_client(bob_client, "bob")
assert response.status_code == 303
response = bob_client.post("/alice/demo/star", {"action": "star"})
assert response.status_code == 303
assert isolated_app.repo_star_count(isolated_app.get_repo("alice", "demo")["id"]) == 1
response = bob_client.post("/alice/demo/fork", {"name": "demo", "description": "Forked"})
assert response.status_code == 303
assert response.location_path == "/bob/demo"
assert isolated_app.get_repo("bob", "demo") is not None
assert "Fork of <a href=\"/alice/demo\">alice/demo</a>" in bob_client.get("/bob/demo").text
owner_client = WsgiClient(isolated_app.app)
response = login_client(owner_client, "alice")
assert response.status_code == 303
response = owner_client.post(
"/settings/profile",
{"display_name": "Alice A.", "bio": "Mercurial maintainer", "website": "example.com"},
)
assert response.status_code == 200
assert "Profile updated." in response.text
profile = owner_client.get("/alice")
assert "Alice A." in profile.text
assert "https://example.com" in profile.text
response = owner_client.post("/alice/demo/settings", {"action": "save", "description": "Updated"})
assert response.status_code == 200
assert "Repository settings updated." in response.text
assert isolated_app.get_repo("alice", "demo")["description"] == "Updated"
response = owner_client.post("/alice/demo/settings", {"action": "add_contributor", "username": "bob"})
assert response.status_code == 303
repo = isolated_app.get_repo("alice", "demo")
assert isolated_app.user_can_maintain_repo(bob, repo)
response = owner_client.post("/alice/demo/settings", {"action": "remove_contributor", "user_id": str(bob["id"])})
assert response.status_code == 303
assert not isolated_app.user_can_maintain_repo(bob, repo)
response = owner_client.post("/alice/demo/settings", {"action": "delete", "confirm_name": "wrong"})
assert response.status_code == 200
assert "Type "demo" to confirm deletion." in response.text
def test_bottle_issue_routes_create_comment_close_and_reopen(isolated_app):
owner = create_user("alice")
create_repo_with_refs(owner)
client = WsgiClient(isolated_app.app)
login_client(client, "alice")
response = client.get("/alice/demo/issues/new")
assert response.status_code == 200
assert "Open issue" in response.text
response = client.post("/alice/demo/issues/new", {"title": "", "body": ""})
assert response.status_code == 200
assert "Issue title is required." in response.text
response = client.post("/alice/demo/issues/new", {"title": "Bug report", "body": "It fails"})
assert response.status_code == 303
assert response.location_path == "/alice/demo/issues/1"
response = client.get("/alice/demo/issues/1")
assert response.status_code == 200
assert "Bug report" in response.text
assert "It fails" in response.text
response = client.post("/alice/demo/issues/1", {"action": "comment", "body": ""})
assert response.status_code == 200
assert "Comment body is required." in response.text
response = client.post("/alice/demo/issues/1", {"action": "comment", "body": "I can reproduce this"})
assert response.status_code == 303
assert "I can reproduce this" in client.get("/alice/demo/issues/1").text
response = client.post("/alice/demo/issues/1", {"action": "close"})
assert response.status_code == 303
assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "closed"
response = client.post("/alice/demo/issues/1", {"action": "reopen"})
assert response.status_code == 303
assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "open"
def test_bottle_pull_request_routes_create_comment_forbid_and_merge(isolated_app):
owner = create_user("alice")
author = create_user("bob")
isolated_app.create_repository(owner, "demo", "")
target_path = isolated_app.repo_path("alice", "demo")
base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
target_repo = isolated_app.get_repo("alice", "demo")
isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
source_repo = isolated_app.get_repo("bob", "demo-fork")
source_path = isolated_app.repo_path("bob", "demo-fork")
source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")
bob_client = WsgiClient(isolated_app.app)
login_client(bob_client, "bob")
response = bob_client.get("/alice/demo/pulls/new")
assert response.status_code == 200
assert "bob/demo-fork tip" in response.text
response = bob_client.post(
"/alice/demo/pulls/new",
{
"source_ref": isolated_app.source_ref_option_value(source_repo["id"], isolated_app.REF_TYPE_TIP, ""),
"target_ref": isolated_app.ref_option_value(isolated_app.REF_TYPE_TIP, ""),
"title": "Add feature",
"body": "Please merge this",
},
)
assert response.status_code == 303
assert response.location_path == "/alice/demo/pulls/1"
pr = isolated_app.get_pull_request(target_repo["id"], 1)
assert pr["base_node"] == base_node
assert pr["source_node"] == source_node
response = bob_client.get("/alice/demo/pulls/1")
assert response.status_code == 200
assert "feature.txt" in response.text
assert "new feature" in response.text
response = bob_client.post("/alice/demo/pulls/1", {"action": "comment", "body": "Looks ready"})
assert response.status_code == 303
assert "Looks ready" in bob_client.get("/alice/demo/pulls/1").text
response = bob_client.post("/alice/demo/pulls/1", {"action": "close"})
assert response.status_code == 403
assert "Only maintainers can update pull requests." in response.text
owner_client = WsgiClient(isolated_app.app)
login_client(owner_client, "alice")
response = owner_client.post("/alice/demo/pulls/1", {"action": "merge"})
assert response.status_code == 303
merged = isolated_app.get_pull_request(target_repo["id"], 1)
assert merged["status"] == "merged"
assert merged["merge_node"] == source_node
response = owner_client.get("/alice/demo/pulls/1")
assert response.status_code == 200
assert "Merged by alice" in response.text
def test_mercurial_http_routes_are_public_for_reads_and_protect_writes(isolated_app):
owner = create_user("alice", password="owner-password")
create_user("bob", password="bob-password")
isolated_app.create_repository(owner, "demo", "")
commit_file(isolated_app.repo_path("alice", "demo"), "README.md", "# Demo\n", message="initial", user="alice")
client = WsgiClient(isolated_app.app)
response = client.get("/hg/alice/demo?cmd=capabilities")
assert response.status_code == 200
assert b"lookup" in response.body
response = client.post("/hg/alice/demo?cmd=unbundle")
assert response.status_code == 401
assert response.header("WWW-Authenticate") == 'Basic realm="HgHost"'
assert "Authentication required." in response.text
response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
assert response.status_code == 401
assert "Invalid Mercurial credentials." in response.text
response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("bob", "bob-password"))
assert response.status_code == 403
assert "Push not authorized" in response.text