import atexit
import base64
from http.cookies import SimpleCookie
from io import BytesIO, StringIO
import os
import re
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
from urllib.parse import urlencode, urlsplit, unquote

import pytest
from bottle import HTTPError


BOOTSTRAP_DIR = Path(tempfile.mkdtemp(prefix="hglab-test-bootstrap-"))
atexit.register(shutil.rmtree, BOOTSTRAP_DIR, ignore_errors=True)
os.environ["HG_HOST_DB"] = str(BOOTSTRAP_DIR / "hghost.sqlite3")
os.environ["HG_HOST_REPO_ROOT"] = str(BOOTSTRAP_DIR / "repos")
os.environ["SECRET_KEY"] = "test-secret"

import app as hglab  # noqa: E402


class WsgiResponse:
    def __init__(self, status_line, headers, body):
        self.status_line = status_line
        self.status_code = int(status_line.split(" ", 1)[0])
        self.headers = headers
        self.body = body
        self.text = body.decode("utf-8", "replace")

    def header(self, name, default=None):
        name = name.lower()
        for key, value in self.headers:
            if key.lower() == name:
                return value
        return default

    @property
    def location(self):
        return self.header("Location")

    @property
    def location_path(self):
        location = self.location
        if not location:
            return None
        split = urlsplit(location)
        if not split.scheme and not split.netloc:
            return location
        return split.path + (f"?{split.query}" if split.query else "")


class WsgiClient:
    def __init__(self, wsgi_app):
        self.wsgi_app = wsgi_app
        self.cookies = {}
        self.csrf_token = None

    def get(self, path, headers=None):
        return self.request("GET", path, headers=headers)

    def post(self, path, data=None, headers=None):
        return self.request("POST", path, data=data, headers=headers)

    def request(self, method, path, data=None, headers=None):
        headers = headers or {}
        split = urlsplit(path)
        body = b""
        if method == "POST" and data is None and not split.path.startswith("/hg/") and self.csrf_token:
            data = {}
        if data is not None:
            if (
                method == "POST"
                and not split.path.startswith("/hg/")
                and hglab.CSRF_FORM_FIELD not in data
                and self.csrf_token
            ):
                data = {**data, hglab.CSRF_FORM_FIELD: self.csrf_token}
            body = urlencode(data, doseq=True).encode("utf-8")
            headers = {"Content-Type": "application/x-www-form-urlencoded", **headers}
        environ = {
            "REQUEST_METHOD": method,
            "SCRIPT_NAME": "",
            "PATH_INFO": unquote(split.path),
            "QUERY_STRING": split.query,
            "SERVER_NAME": "example.test",
            "SERVER_PORT": "80",
            "SERVER_PROTOCOL": "HTTP/1.1",
            "HTTP_HOST": "example.test",
            "wsgi.version": (1, 0),
            "wsgi.url_scheme": "http",
            "wsgi.input": BytesIO(body),
            "wsgi.errors": StringIO(),
            "wsgi.multithread": False,
            "wsgi.multiprocess": False,
            "wsgi.run_once": False,
            "CONTENT_LENGTH": str(len(body)),
            "REMOTE_ADDR": "127.0.0.1",
        }
        if self.cookies:
            environ["HTTP_COOKIE"] = "; ".join(f"{key}={value}" for key, value in self.cookies.items())
        for key, value in headers.items():
            env_key = key.upper().replace("-", "_")
            if env_key == "CONTENT_TYPE":
                environ["CONTENT_TYPE"] = value
            elif env_key == "CONTENT_LENGTH":
                environ["CONTENT_LENGTH"] = value
            else:
                environ[f"HTTP_{env_key}"] = value

        captured = {}

        def start_response(status, response_headers, exc_info=None):
            captured["status"] = status
            captured["headers"] = response_headers

        body_iter = self.wsgi_app(environ, start_response)
        try:
            response_body = b"".join(
                chunk if isinstance(chunk, bytes) else chunk.encode("utf-8") for chunk in body_iter
            )
        finally:
            close = getattr(body_iter, "close", None)
            if close:
                close()
        response = WsgiResponse(captured["status"], captured["headers"], response_body)
        self._store_cookies(response.headers)
        self._store_csrf_token(response.text)
        return response

    def _store_cookies(self, headers):
        for key, value in headers:
            if key.lower() != "set-cookie":
                continue
            cookie = SimpleCookie()
            cookie.load(value)
            for name, morsel in cookie.items():
                if morsel.value == "" and (morsel["expires"] or morsel["max-age"] == "0"):
                    self.cookies.pop(name, None)
                else:
                    self.cookies[name] = morsel.value

    def _store_csrf_token(self, text):
        match = re.search(r'name="_csrf_token"\s+value="([^"]+)"', text)
        if match:
            self.csrf_token = match.group(1)


def login_client(client, username, password="correct horse battery staple", next_url="/"):
    client.get("/login")
    return client.post("/login", {"username": username, "password": password, "next": next_url})


@pytest.fixture()
def isolated_app(tmp_path, monkeypatch):
    monkeypatch.setattr(hglab, "DB_PATH", tmp_path / "hghost.sqlite3")
    monkeypatch.setattr(hglab, "REPO_ROOT", tmp_path / "repos")
    hglab.AUTH_FAILURES.clear()
    hglab.init_db()
    return hglab


def create_user(username, password="correct horse battery staple"):
    with hglab.db_connect() as conn:
        conn.execute(
            "INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
            (username, hglab.hash_password(password), hglab.utcnow()),
        )
    return hglab.get_user_by_username(username)


def commit_file(repo_path, relative_path, content, message="initial commit", user="alice"):
    target = repo_path / relative_path
    target.parent.mkdir(parents=True, exist_ok=True)
    target.write_text(content, encoding="utf-8")
    hglab.run_hg(["add", relative_path], cwd=repo_path)
    hglab.run_hg(["commit", "-u", user, "-m", message], cwd=repo_path)
    return hglab.repo_tip_node(repo_path)


def basic_auth(username, password):
    token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
    return {"Authorization": f"Basic {token}"}


def create_repo_with_refs(owner):
    hglab.create_repository(owner, "demo", "Demo repository")
    path = hglab.repo_path(owner["username"], "demo")
    default_node = commit_file(path, "README.md", "# Demo\n", message="initial", user=owner["username"])

    hglab.run_hg(["branch", "feature"], cwd=path)
    feature_node = commit_file(path, "feature.txt", "feature work\n", message="add feature", user=owner["username"])
    hglab.run_hg(["tag", "v1.0", "-u", owner["username"], "-m", "tag v1.0"], cwd=path)
    hglab.run_hg(["bookmark", "-r", feature_node, "feature-bm"], cwd=path)

    hglab.run_hg(["update", "default"], cwd=path)
    hglab.run_hg(["branch", "old"], cwd=path)
    old_node = commit_file(path, "old.txt", "old branch\n", message="old branch work", user=owner["username"])
    hglab.run_hg(["commit", "--close-branch", "-u", owner["username"], "-m", "close old"], cwd=path)
    hglab.run_hg(["update", "default"], cwd=path)

    return {
        "path": path,
        "default_node": default_node,
        "feature_node": feature_node,
        "old_node": old_node,
    }


def test_normalize_slug_accepts_trimmed_lowercase_values():
    assert hglab.normalize_slug(" Demo_Repo-1 ", "Repository name") == "demo_repo-1"
    assert hglab.normalize_slug("My.Name", "Repository name") == "my.name"


@pytest.mark.parametrize(
    ("value", "label"),
    [
        ("x", "Repository name"),
        ("bad/name", "Repository name"),
        ("-bad", "Repository name"),
        ("demo.hg", "Repository name"),
        ("login", "Username"),
        ("harrisonerd", "Username"),
    ],
)
def test_normalize_slug_rejects_invalid_or_reserved_values(value, label):
    with pytest.raises(ValueError):
        hglab.normalize_slug(value, label)


def test_clean_repo_path_normalizes_and_rejects_traversal():
    assert hglab.clean_repo_path("/docs/readme.md/") == "docs/readme.md"
    assert hglab.clean_repo_path("") == ""

    for value in ("../secret", "docs/../secret", "./file"):
        with pytest.raises(HTTPError) as exc_info:
            hglab.clean_repo_path(value)
        assert exc_info.value.status_code == 400


def test_password_hashes_verify_and_reject_bad_inputs():
    stored = hglab.hash_password("correct-password")

    assert stored.startswith("pbkdf2_sha256$")
    assert hglab.verify_password("correct-password", stored)
    assert not hglab.verify_password("wrong-password", stored)
    assert not hglab.verify_password("correct-password", "not-a-valid-hash")


@pytest.mark.parametrize(
    ("value", "expected"),
    [
        ("example.com", "https://example.com"),
        (" http://example.com/path ", "http://example.com/path"),
        ("https://example.com", "https://example.com"),
    ],
)
def test_normalize_website_accepts_http_urls_and_adds_scheme(value, expected):
    assert hglab.normalize_website(value) == expected


@pytest.mark.parametrize("value", ["ftp://example.com", "https://", "not a url"])
def test_normalize_website_rejects_invalid_urls(value):
    with pytest.raises(ValueError):
        hglab.normalize_website(value)


def test_render_markdown_strips_scripts_and_unsafe_links():
    rendered = hglab.render_markdown(
        """
# Title

<script>alert("x")</script>

[safe](https://example.com) [unsafe](javascript:alert(1))

| A |
| - |
| B |
"""
    )

    assert "<h1>Title</h1>" in rendered
    assert "script" not in rendered.lower()
    assert "javascript:" not in rendered.lower()
    assert 'href="https://example.com"' in rendered
    assert "<table>" in rendered


def test_render_markdown_links_allows_only_links():
    rendered = hglab.render_markdown_links("[site](https://example.com) **bold** <strong>x</strong>")

    assert rendered == '<a href="https://example.com">site</a> bold &lt;strong&gt;x&lt;/strong&gt;'


def test_startup_config_rejects_default_secret_outside_debug(monkeypatch):
    monkeypatch.setattr(hglab, "DEBUG", False)
    monkeypatch.setattr(hglab, "SECRET_KEY", hglab.DEFAULT_SECRET_KEY)

    with pytest.raises(RuntimeError):
        hglab.validate_startup_config()

    monkeypatch.setattr(hglab, "SECRET_KEY", "production-secret")
    hglab.validate_startup_config()


def test_security_headers_and_secure_cookie_flags(isolated_app):
    client = WsgiClient(isolated_app.app)

    response = client.get("/login", headers={"X-Forwarded-Proto": "https"})
    assert response.status_code == 200
    assert response.header("X-Content-Type-Options") == "nosniff"
    assert response.header("Referrer-Policy") == "same-origin"
    assert response.header("X-Frame-Options") == "DENY"
    assert "frame-ancestors 'none'" in response.header("Content-Security-Policy")
    csrf_cookie = response.header("Set-Cookie")
    assert "csrf_token=" in csrf_cookie
    assert "HttpOnly" in csrf_cookie
    assert "samesite=lax" in csrf_cookie.lower()
    assert "Secure" in csrf_cookie

    create_user("alice")
    response = client.post(
        "/login",
        {"username": "alice", "password": "correct horse battery staple", "next": "/"},
        headers={"X-Forwarded-Proto": "https"},
    )
    assert response.status_code == 303
    session_cookie = response.header("Set-Cookie")
    assert "session=" in session_cookie
    assert "HttpOnly" in session_cookie
    assert "samesite=lax" in session_cookie.lower()
    assert "Secure" in session_cookie


def test_csrf_required_for_browser_posts_and_hg_is_exempt(isolated_app):
    owner = create_user("alice", password="owner-password")
    isolated_app.create_repository(owner, "demo", "")
    client = WsgiClient(isolated_app.app)

    response = client.post("/login", {"username": "alice", "password": "owner-password", "next": "/"})
    assert response.status_code == 403
    assert "Invalid CSRF token." in response.text

    client.get("/login")
    response = client.post(
        "/login",
        {"username": "alice", "password": "owner-password", "next": "/", hglab.CSRF_FORM_FIELD: "bad-token"},
    )
    assert response.status_code == 403

    response = client.post("/hg/alice/demo?cmd=unbundle")
    assert response.status_code == 401
    assert response.header("WWW-Authenticate") == 'Basic realm="HgHost"'


def test_browser_form_size_limit(isolated_app, monkeypatch):
    monkeypatch.setattr(hglab, "MAX_FORM_BYTES", 40)
    client = WsgiClient(isolated_app.app)
    client.get("/login")

    response = client.post("/login", {"username": "a" * 100, "password": "bad", "next": "/"})
    assert response.status_code == 413
    assert "Request body too large." in response.text


def test_login_and_hg_auth_failures_are_rate_limited(isolated_app, monkeypatch):
    monkeypatch.setattr(hglab, "RATE_LIMIT_MAX_FAILURES", 2)
    monkeypatch.setattr(hglab, "RATE_LIMIT_COOLDOWN_SECONDS", 60)
    create_user("alice", password="owner-password")
    client = WsgiClient(isolated_app.app)
    client.get("/login")

    for _ in range(2):
        response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
        assert response.status_code == 200
    response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
    assert response.status_code == 429
    assert response.header("Retry-After") == "60"

    hglab.AUTH_FAILURES.clear()
    owner = hglab.get_user_by_username("alice")
    isolated_app.create_repository(owner, "demo", "")
    for _ in range(2):
        response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
        assert response.status_code == 401
    response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
    assert response.status_code == 429


def test_readme_and_file_previews_are_truncated(isolated_app, monkeypatch):
    monkeypatch.setattr(hglab, "MAX_RENDER_BYTES", 32)
    owner = create_user("alice")
    isolated_app.create_repository(owner, "demo", "")
    commit_file(isolated_app.repo_path("alice", "demo"), "README.md", "A" * 200, message="large readme")
    client = WsgiClient(isolated_app.app)

    response = client.get("/alice/demo")
    assert response.status_code == 200
    assert "README preview truncated." in response.text

    response = client.get("/alice/demo/src/README.md")
    assert response.status_code == 200
    assert "File preview truncated." in response.text


def test_build_tree_deduplicates_entries_and_sorts_directories_first():
    files = ["README.md", "src/app.py", "src/utils/helpers.py", "docs/index.md", "src/z.txt"]

    assert hglab.build_tree(files, "") == [
        {"name": "docs", "path": "docs", "type": "dir"},
        {"name": "src", "path": "src", "type": "dir"},
        {"name": "README.md", "path": "README.md", "type": "file"},
    ]
    assert hglab.build_tree(files, "src") == [
        {"name": "utils", "path": "src/utils", "type": "dir"},
        {"name": "app.py", "path": "src/app.py", "type": "file"},
        {"name": "z.txt", "path": "src/z.txt", "type": "file"},
    ]


def test_ref_option_values_round_trip_quoted_names():
    branch_name = "feature/a|b c"
    value = hglab.ref_option_value(hglab.REF_TYPE_BRANCH, branch_name)

    assert value == "branch|feature%2Fa%7Cb%20c"
    assert hglab.parse_ref_option_value(value) == (hglab.REF_TYPE_BRANCH, branch_name)


def test_source_ref_option_values_round_trip_and_validate_source_repo():
    branch_name = "feature/a|b"
    value = hglab.source_ref_option_value(42, hglab.REF_TYPE_BRANCH, branch_name)

    assert hglab.parse_source_ref_option_value(value) == (42, hglab.REF_TYPE_BRANCH, branch_name)

    with pytest.raises(ValueError, match="Invalid source repository"):
        hglab.parse_source_ref_option_value(f"not-int|{hglab.REF_TYPE_BRANCH}|default")

    with pytest.raises(ValueError, match="Invalid source ref"):
        hglab.parse_source_ref_option_value(f"42|{hglab.REF_TYPE_COMMIT}|abc123")


def test_ref_query_and_url_helpers_skip_default_refs_unless_forced():
    ref = {"type": hglab.REF_TYPE_BRANCH, "name": "default", "is_default": True}

    assert hglab.ref_query_string(ref) == ""
    assert hglab.ref_query_string(ref, force=True) == "ref_type=branch&ref=default"
    assert hglab.url_with_ref("/alice/demo", ref, force=True) == "/alice/demo?ref_type=branch&ref=default"
    assert hglab.url_with_ref("/alice/demo?tab=files", ref, force=True) == (
        "/alice/demo?tab=files&ref_type=branch&ref=default"
    )


def test_format_ref_label_and_closed_branch_option_label():
    assert hglab.format_ref_label(hglab.REF_TYPE_TIP) == "tip"
    assert hglab.format_ref_label(hglab.REF_TYPE_COMMIT, "abcdef1234567890") == "commit abcdef123456"
    assert hglab.ref_option_label({"type": hglab.REF_TYPE_BRANCH, "name": "old", "closed": True}) == (
        "branch old (closed)"
    )


def test_init_db_creates_expected_tables_and_is_idempotent(isolated_app):
    isolated_app.init_db()

    with isolated_app.db_connect() as conn:
        tables = {
            row["name"]
            for row in conn.execute("SELECT name FROM sqlite_master WHERE type = 'table'")
        }
        user_columns = {row["name"] for row in conn.execute("PRAGMA table_info(users)")}
        repo_columns = {row["name"] for row in conn.execute("PRAGMA table_info(repositories)")}
        pr_columns = {row["name"] for row in conn.execute("PRAGMA table_info(pull_requests)")}

    assert {"users", "repositories", "issues", "pull_requests", "repo_stars"}.issubset(tables)
    assert {"display_name", "bio", "website"}.issubset(user_columns)
    assert {"forked_from_repo_id", "forked_at", "forked_from_node"}.issubset(repo_columns)
    assert {"target_ref_type", "target_ref_name", "source_ref_type", "source_ref_name"}.issubset(pr_columns)


def test_db_connect_configures_sqlite_for_worker_contention(isolated_app):
    isolated_app.init_db()

    with isolated_app.db_connect() as conn:
        foreign_keys = conn.execute("PRAGMA foreign_keys").fetchone()[0]
        busy_timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0]
        synchronous = conn.execute("PRAGMA synchronous").fetchone()[0]
        journal_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]

    assert foreign_keys == 1
    assert busy_timeout == hglab.SQLITE_BUSY_TIMEOUT_MS
    assert synchronous == 1
    assert journal_mode.lower() == "wal"


def test_sqlite_accepts_concurrent_worker_writes(isolated_app):
    isolated_app.init_db()
    repo_root = Path(__file__).resolve().parents[1]
    env = os.environ.copy()
    existing_pythonpath = env.get("PYTHONPATH")
    env["PYTHONPATH"] = str(repo_root) if not existing_pythonpath else f"{repo_root}{os.pathsep}{existing_pythonpath}"

    worker_script = """
import os
import sys
import time

os.environ["HG_HOST_DB"] = sys.argv[1]
os.environ["HG_HOST_REPO_ROOT"] = sys.argv[2]
os.environ["SECRET_KEY"] = "test-secret"

import app as hglab

with hglab.db_connect() as conn:
    conn.execute("BEGIN IMMEDIATE")
    conn.execute(
        "INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
        (sys.argv[3], "hash", hglab.utcnow()),
    )
    time.sleep(0.2)
"""

    processes = [
        subprocess.Popen(
            [
                sys.executable,
                "-c",
                worker_script,
                str(isolated_app.DB_PATH),
                str(isolated_app.REPO_ROOT),
                f"worker-{index}",
            ],
            cwd=repo_root,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
        for index in range(3)
    ]

    results = []
    try:
        for process in processes:
            stdout, stderr = process.communicate(timeout=15)
            results.append((process.returncode, stdout, stderr))
    finally:
        for process in processes:
            if process.poll() is None:
                process.kill()

    assert results
    for returncode, stdout, stderr in results:
        assert returncode == 0, stdout + stderr

    with isolated_app.db_connect() as conn:
        count = conn.execute("SELECT COUNT(*) FROM users WHERE username LIKE 'worker-%'").fetchone()[0]

    assert count == 3


def test_create_repository_persists_metadata_and_writes_hgrc(isolated_app):
    owner = create_user("alice")

    isolated_app.create_repository(owner, "demo", "line one\nline two")

    repo = isolated_app.get_repo("alice", "demo")
    path = isolated_app.repo_path("alice", "demo")
    hgrc = (path / ".hg" / "hgrc").read_text(encoding="utf-8")

    assert repo["owner_username"] == "alice"
    assert repo["name"] == "demo"
    assert path.joinpath(".hg").is_dir()
    assert "name = alice/demo" in hgrc
    assert "description = line one line two" in hgrc
    assert "allow-push = alice" in hgrc


def test_create_repository_rolls_back_database_and_files_when_hg_init_fails(isolated_app, monkeypatch):
    owner = create_user("alice")

    def fail_hg(*args, **kwargs):
        raise hglab.HgCommandError("init failed")

    monkeypatch.setattr(isolated_app, "run_hg", fail_hg)

    with pytest.raises(hglab.HgCommandError, match="init failed"):
        isolated_app.create_repository(owner, "broken", "")

    assert isolated_app.get_repo("alice", "broken") is None
    assert not isolated_app.repo_path("alice", "broken").exists()


def test_star_and_contributor_helpers_update_database_and_hgrc(isolated_app):
    owner = create_user("alice")
    contributor = create_user("bob")
    isolated_app.create_repository(owner, "demo", "")
    repo = isolated_app.get_repo("alice", "demo")

    isolated_app.star_repo(contributor, repo)
    assert isolated_app.repo_star_count(repo["id"]) == 1
    assert isolated_app.user_starred_repo(contributor, repo)

    isolated_app.add_repo_contributor(repo, owner, "bob")
    assert isolated_app.user_can_maintain_repo(contributor, repo)
    hgrc = isolated_app.repo_path("alice", "demo").joinpath(".hg", "hgrc").read_text(encoding="utf-8")
    assert "allow-push = alice bob" in hgrc

    isolated_app.unstar_repo(contributor, repo)
    isolated_app.remove_repo_contributor(repo, contributor["id"])
    assert isolated_app.repo_star_count(repo["id"]) == 0
    assert not isolated_app.user_starred_repo(contributor, repo)
    assert not isolated_app.user_can_maintain_repo(contributor, repo)


def test_issue_queries_count_filter_and_order_comments(isolated_app):
    owner = create_user("alice")
    isolated_app.create_repository(owner, "demo", "")
    repo = isolated_app.get_repo("alice", "demo")
    now = isolated_app.utcnow()

    with isolated_app.db_connect() as conn:
        conn.execute(
            """
            INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at)
            VALUES (?, ?, 1, 'open issue', 'body', 'open', ?, ?)
            """,
            (repo["id"], owner["id"], now, now),
        )
        conn.execute(
            """
            INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at, closed_at)
            VALUES (?, ?, 2, 'closed issue', '', 'closed', ?, ?, ?)
            """,
            (repo["id"], owner["id"], now, now, now),
        )
        issue_id = conn.execute("SELECT id FROM issues WHERE number = 1").fetchone()["id"]
        conn.execute(
            """
            INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
            VALUES (?, ?, 'second', '2026-01-01T00:00:02Z', '2026-01-01T00:00:02Z')
            """,
            (issue_id, owner["id"]),
        )
        conn.execute(
            """
            INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
            VALUES (?, ?, 'first', '2026-01-01T00:00:01Z', '2026-01-01T00:00:01Z')
            """,
            (issue_id, owner["id"]),
        )

    assert isolated_app.issue_counts(repo["id"]) == {"open": 1, "closed": 1}
    assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "all")] == [2, 1]
    assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "invalid")] == [1]
    assert [comment["body"] for comment in isolated_app.list_issue_comments(issue_id)] == ["first", "second"]


def test_mercurial_read_helpers_return_files_readme_commits_and_default_ref(isolated_app):
    owner = create_user("alice")
    isolated_app.create_repository(owner, "demo", "")
    path = isolated_app.repo_path("alice", "demo")
    node = commit_file(path, "README.md", "# Demo\n", message="add readme", user="Alice <alice@example.com>")

    files = isolated_app.hg_files(path)
    readme_name, readme = isolated_app.readme_for_repo(path, files)
    commits = isolated_app.commit_log(path)
    ref = isolated_app.default_code_ref(path)

    assert files == ["README.md"]
    assert (readme_name, readme) == ("README.md", "# Demo\n")
    assert commits[0]["summary"] == "add readme"
    assert isolated_app.commit_count(path) == 1
    assert ref["type"] == isolated_app.REF_TYPE_BRANCH
    assert ref["name"] == "default"
    assert ref["node"] == node
    assert isolated_app.repo_has_revision(path, node)
    assert isolated_app.is_ancestor(path, isolated_app.NULL_REV, node)


def test_create_pull_request_between_fork_repositories(isolated_app):
    owner = create_user("alice")
    author = create_user("bob")
    isolated_app.create_repository(owner, "demo", "")
    target_path = isolated_app.repo_path("alice", "demo")
    base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
    target_repo = isolated_app.get_repo("alice", "demo")

    isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
    source_repo = isolated_app.get_repo("bob", "demo-fork")
    source_path = isolated_app.repo_path("bob", "demo-fork")
    source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")

    number = isolated_app.create_pull_request(
        target_repo,
        source_repo,
        author,
        "Add feature",
        "Please merge this",
        isolated_app.REF_TYPE_TIP,
        "",
        isolated_app.REF_TYPE_TIP,
        "",
    )
    pr = isolated_app.get_pull_request(target_repo["id"], number)
    diff, current_source_node, source_ref = isolated_app.pull_request_diff(pr)

    assert number == 1
    assert pr["base_node"] == base_node
    assert pr["source_node"] == source_node
    assert pr["source_owner_username"] == "bob"
    assert pr["target_owner_username"] == "alice"
    assert current_source_node == source_node
    assert source_ref["type"] == isolated_app.REF_TYPE_TIP
    assert "feature.txt" in diff
    assert "new feature" in diff


def test_branch_tag_bookmark_helpers_resolve_and_filter_refs(isolated_app):
    owner = create_user("alice")
    nodes = create_repo_with_refs(owner)
    path = nodes["path"]

    branches = {branch["name"]: branch for branch in isolated_app.list_repo_branches(path)}
    tags = isolated_app.list_repo_tags(path)
    bookmarks = {bookmark["name"]: bookmark for bookmark in isolated_app.list_repo_bookmarks(path)}
    target_labels = [option["label"] for option in isolated_app.target_repo_ref_options(path)]
    all_labels = [option["label"] for option in isolated_app.repo_ref_options(path)]

    assert {"default", "feature", "old"}.issubset(branches)
    assert branches["default"]["node"] == nodes["default_node"]
    assert branches["feature"]["closed"] is False
    assert branches["old"]["closed"] is True
    assert tags[0]["name"] == "v1.0"
    assert tags[0]["node"] == nodes["feature_node"]
    assert bookmarks["feature-bm"]["node"] == nodes["feature_node"]
    assert isolated_app.default_code_ref(path)["node"] == nodes["default_node"]
    assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_BRANCH, "feature")["name"] == "feature"
    assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_BOOKMARK, "feature-bm")["node"] == (
        nodes["feature_node"]
    )
    assert isolated_app.commit_ref(path, nodes["feature_node"])["type"] == isolated_app.REF_TYPE_COMMIT
    assert "branch old (closed)" not in target_labels
    assert "branch old (closed)" in all_labels


def test_bottle_signup_login_logout_and_new_repo_flow(isolated_app):
    client = WsgiClient(isolated_app.app)

    response = client.get("/new")
    assert response.status_code == 303
    assert response.location_path == "/login?next=/new"

    response = client.get("/signup?next=/new")
    assert response.status_code == 200
    response = client.post(
        "/signup",
        {"username": "alice", "password": "password123", "next": "/new"},
    )
    assert response.status_code == 303
    assert response.location_path == "/new"
    assert "session" in client.cookies

    response = client.get("/new")
    assert response.status_code == 200
    assert "Create repository" in response.text

    response = client.post("/new", {"name": "demo", "description": "A test repository"})
    assert response.status_code == 303
    assert response.location_path == "/alice/demo"
    assert isolated_app.get_repo("alice", "demo") is not None

    response = client.get("/alice/demo")
    assert response.status_code == 200
    assert "This repository is empty." in response.text
    assert "http://example.test/hg/alice/demo" in response.text

    response = client.post("/logout")
    assert response.status_code == 303
    assert response.location_path == "/"
    assert "session" not in client.cookies

    response = client.get("/new")
    assert response.status_code == 303
    assert response.location_path == "/login?next=/new"


def test_bottle_repository_pages_render_refs_files_raw_and_errors(isolated_app):
    owner = create_user("alice")
    nodes = create_repo_with_refs(owner)
    client = WsgiClient(isolated_app.app)
    commit_short = nodes["default_node"][:12]

    checks = [
        ("/", 200, "Recent Activity"),
        ("/alice", 200, "alice/demo"),
        ("/alice?tab=stars", 200, "No starred repositories yet."),
        ("/alice/demo", 200, "<h1>Demo</h1>"),
        ("/alice/demo/src", 200, "README.md"),
        ("/alice/demo/src/README.md", 200, "# Demo"),
        (f"/alice/demo/commits/{commit_short}", 200, "initial"),
        ("/alice/demo/commits", 200, "initial"),
        ("/alice/demo/branches", 200, "feature"),
        ("/alice/demo/branches", 200, "closed"),
        ("/alice/demo/tags", 200, "v1.0"),
        ("/alice/demo/bookmarks", 200, "feature-bm"),
        ("/alice/demo/src?ref_type=branch&ref=feature", 200, "feature.txt"),
        ("/alice/demo/src/feature.txt?ref_type=branch&ref=feature", 200, "feature work"),
        ("/alice/demo/issues", 200, "No open issues."),
        ("/alice/demo/pulls", 200, "No open pull requests."),
        ("/static/icon.png", 200, ""),
        ("/favicon.ico", 204, ""),
        ("/alice/missing", 404, "Repository not found."),
        ("/alice/demo/src/missing.txt", 404, "Path not found."),
        ("/alice/demo/src/docs/../secret", 400, "Invalid repository path."),
        ("/alice/demo/src?ref_type=branch&ref=missing", 404, "Branch not found."),
    ]

    for path, status_code, expected_text in checks:
        response = client.get(path)
        assert response.status_code == status_code, path
        assert expected_text in response.text, path

    response = client.get("/alice/demo/raw/feature.txt?ref_type=branch&ref=feature")
    assert response.status_code == 200
    assert response.body == b"feature work\n"
    assert response.header("Content-Type").startswith("text/plain")


def test_bottle_profile_star_fork_and_repo_settings_flows(isolated_app):
    owner = create_user("alice")
    bob = create_user("bob")
    create_repo_with_refs(owner)

    bob_client = WsgiClient(isolated_app.app)
    response = login_client(bob_client, "bob")
    assert response.status_code == 303

    response = bob_client.post("/alice/demo/star", {"action": "star"})
    assert response.status_code == 303
    assert isolated_app.repo_star_count(isolated_app.get_repo("alice", "demo")["id"]) == 1

    response = bob_client.post("/alice/demo/fork", {"name": "demo", "description": "Forked"})
    assert response.status_code == 303
    assert response.location_path == "/bob/demo"
    assert isolated_app.get_repo("bob", "demo") is not None
    assert "Fork of <a href=\"/alice/demo\">alice/demo</a>" in bob_client.get("/bob/demo").text

    owner_client = WsgiClient(isolated_app.app)
    response = login_client(owner_client, "alice")
    assert response.status_code == 303

    response = owner_client.post(
        "/settings/profile",
        {"display_name": "Alice A.", "bio": "Mercurial maintainer", "website": "example.com"},
    )
    assert response.status_code == 200
    assert "Profile updated." in response.text
    profile = owner_client.get("/alice")
    assert "Alice A." in profile.text
    assert "https://example.com" in profile.text

    response = owner_client.post("/alice/demo/settings", {"action": "save", "description": "Updated"})
    assert response.status_code == 200
    assert "Repository settings updated." in response.text
    assert isolated_app.get_repo("alice", "demo")["description"] == "Updated"

    response = owner_client.post("/alice/demo/settings", {"action": "add_contributor", "username": "bob"})
    assert response.status_code == 303
    repo = isolated_app.get_repo("alice", "demo")
    assert isolated_app.user_can_maintain_repo(bob, repo)

    response = owner_client.post("/alice/demo/settings", {"action": "remove_contributor", "user_id": str(bob["id"])})
    assert response.status_code == 303
    assert not isolated_app.user_can_maintain_repo(bob, repo)

    response = owner_client.post("/alice/demo/settings", {"action": "delete", "confirm_name": "wrong"})
    assert response.status_code == 200
    assert "Type &quot;demo&quot; to confirm deletion." in response.text


def test_bottle_issue_routes_create_comment_close_and_reopen(isolated_app):
    owner = create_user("alice")
    create_repo_with_refs(owner)
    client = WsgiClient(isolated_app.app)
    login_client(client, "alice")

    response = client.get("/alice/demo/issues/new")
    assert response.status_code == 200
    assert "Open issue" in response.text

    response = client.post("/alice/demo/issues/new", {"title": "", "body": ""})
    assert response.status_code == 200
    assert "Issue title is required." in response.text

    response = client.post("/alice/demo/issues/new", {"title": "Bug report", "body": "It fails"})
    assert response.status_code == 303
    assert response.location_path == "/alice/demo/issues/1"

    response = client.get("/alice/demo/issues/1")
    assert response.status_code == 200
    assert "Bug report" in response.text
    assert "It fails" in response.text

    response = client.post("/alice/demo/issues/1", {"action": "comment", "body": ""})
    assert response.status_code == 200
    assert "Comment body is required." in response.text

    response = client.post("/alice/demo/issues/1", {"action": "comment", "body": "I can reproduce this"})
    assert response.status_code == 303
    assert "I can reproduce this" in client.get("/alice/demo/issues/1").text

    response = client.post("/alice/demo/issues/1", {"action": "close"})
    assert response.status_code == 303
    assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "closed"

    response = client.post("/alice/demo/issues/1", {"action": "reopen"})
    assert response.status_code == 303
    assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "open"


def test_bottle_pull_request_routes_create_comment_forbid_and_merge(isolated_app):
    owner = create_user("alice")
    author = create_user("bob")
    isolated_app.create_repository(owner, "demo", "")
    target_path = isolated_app.repo_path("alice", "demo")
    base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
    target_repo = isolated_app.get_repo("alice", "demo")
    isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
    source_repo = isolated_app.get_repo("bob", "demo-fork")
    source_path = isolated_app.repo_path("bob", "demo-fork")
    source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")

    bob_client = WsgiClient(isolated_app.app)
    login_client(bob_client, "bob")

    response = bob_client.get("/alice/demo/pulls/new")
    assert response.status_code == 200
    assert "bob/demo-fork tip" in response.text

    response = bob_client.post(
        "/alice/demo/pulls/new",
        {
            "source_ref": isolated_app.source_ref_option_value(source_repo["id"], isolated_app.REF_TYPE_TIP, ""),
            "target_ref": isolated_app.ref_option_value(isolated_app.REF_TYPE_TIP, ""),
            "title": "Add feature",
            "body": "Please merge this",
        },
    )
    assert response.status_code == 303
    assert response.location_path == "/alice/demo/pulls/1"

    pr = isolated_app.get_pull_request(target_repo["id"], 1)
    assert pr["base_node"] == base_node
    assert pr["source_node"] == source_node

    response = bob_client.get("/alice/demo/pulls/1")
    assert response.status_code == 200
    assert "feature.txt" in response.text
    assert "new feature" in response.text

    response = bob_client.post("/alice/demo/pulls/1", {"action": "comment", "body": "Looks ready"})
    assert response.status_code == 303
    assert "Looks ready" in bob_client.get("/alice/demo/pulls/1").text

    response = bob_client.post("/alice/demo/pulls/1", {"action": "close"})
    assert response.status_code == 403
    assert "Only maintainers can update pull requests." in response.text

    owner_client = WsgiClient(isolated_app.app)
    login_client(owner_client, "alice")
    response = owner_client.post("/alice/demo/pulls/1", {"action": "merge"})
    assert response.status_code == 303

    merged = isolated_app.get_pull_request(target_repo["id"], 1)
    assert merged["status"] == "merged"
    assert merged["merge_node"] == source_node
    response = owner_client.get("/alice/demo/pulls/1")
    assert response.status_code == 200
    assert "Merged by alice" in response.text


def test_mercurial_http_routes_are_public_for_reads_and_protect_writes(isolated_app):
    owner = create_user("alice", password="owner-password")
    create_user("bob", password="bob-password")
    isolated_app.create_repository(owner, "demo", "")
    commit_file(isolated_app.repo_path("alice", "demo"), "README.md", "# Demo\n", message="initial", user="alice")
    client = WsgiClient(isolated_app.app)

    response = client.get("/hg/alice/demo?cmd=capabilities")
    assert response.status_code == 200
    assert b"lookup" in response.body

    response = client.post("/hg/alice/demo?cmd=unbundle")
    assert response.status_code == 401
    assert response.header("WWW-Authenticate") == 'Basic realm="HgHost"'
    assert "Authentication required." in response.text

    response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
    assert response.status_code == 401
    assert "Invalid Mercurial credentials." in response.text

    response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("bob", "bob-password"))
    assert response.status_code == 403
    assert "Push not authorized" in response.text
