patx/hglab
ok test
Commit 75b889fddbf7 · Harrison Erd · 2026-05-05 00:23 -0400
Comments
Diff
diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -35,8 +35,17 @@
- `HG_HOST_DB`: SQLite database path. Defaults to `./data/hghost.sqlite3`.
- `HG_HOST_REPO_ROOT`: Mercurial repository root. Defaults to `./data/repos`.
- `HG_HOST_DEBUG`: set to `1` for Bottle debug/reloader.
+- `HG_HOST_MAX_FORM_BYTES`: maximum browser form POST size. Defaults to `65536`.
+- `HG_HOST_MAX_RENDER_BYTES`: maximum README/file/diff preview size. Defaults to `262144`.
+- `HG_HOST_MAX_HG_RESPONSE_BYTES`: maximum buffered Mercurial HTTP response size. Defaults to `268435456`.
+- `HG_HOST_RATE_LIMIT_ENABLED`: set to `0` to disable in-memory login/signup/hg auth throttling.
+- `HG_HOST_RATE_LIMIT_MAX_FAILURES`: failed attempts before throttling. Defaults to `5`.
+- `HG_HOST_RATE_LIMIT_WINDOW_SECONDS`: rate limit window. Defaults to `300`.
+- `HG_HOST_RATE_LIMIT_COOLDOWN_SECONDS`: throttle duration. Defaults to `300`.
- `PORT`: HTTP port. Defaults to `8080`.
+When `HG_HOST_DEBUG` is disabled, `SECRET_KEY` must be set to a non-default value before the app starts.
+
This v1 stores repositories on local disk. Do not deploy it to ephemeral filesystems unless repository storage is mounted persistently.
SQLite is configured with WAL mode and a busy timeout so a small multi-worker deployment can share one database file. Keep `HG_HOST_DB` on a local persistent filesystem used by one host; network or synced filesystems can break SQLite locking semantics and should use a server database instead.
diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -11,6 +11,7 @@
import shutil
import sqlite3
import subprocess
+import time
from pathlib import Path, PurePosixPath
from urllib.parse import parse_qsl, quote, unquote, urlencode, urlparse
@@ -31,14 +32,37 @@
from mercurial.hgweb import hgweb as make_hgweb
+def env_bool(name, default=False):
+ value = os.environ.get(name)
+ if value is None:
+ return default
+ return value.lower() in {"1", "true", "yes", "on"}
+
+
+def env_int(name, default, minimum=0):
+ try:
+ value = int(os.environ.get(name, str(default)))
+ except ValueError:
+ return default
+ return max(minimum, value)
+
+
BASE_DIR = Path(__file__).resolve().parent
DATA_DIR = BASE_DIR / "data"
DB_PATH = Path(os.environ.get("HG_HOST_DB", DATA_DIR / "hghost.sqlite3"))
REPO_ROOT = Path(os.environ.get("HG_HOST_REPO_ROOT", DATA_DIR / "repos"))
-SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-change-me")
-DEBUG = os.environ.get("HG_HOST_DEBUG", "").lower() in {"1", "true", "yes", "on"}
+DEFAULT_SECRET_KEY = "dev-secret-change-me"
+SECRET_KEY = os.environ.get("SECRET_KEY", DEFAULT_SECRET_KEY)
+DEBUG = env_bool("HG_HOST_DEBUG")
PASSWORD_ITERATIONS = 260_000
SQLITE_BUSY_TIMEOUT_MS = 30_000
+MAX_FORM_BYTES = env_int("HG_HOST_MAX_FORM_BYTES", 64 * 1024)
+MAX_RENDER_BYTES = env_int("HG_HOST_MAX_RENDER_BYTES", 256 * 1024)
+MAX_HG_RESPONSE_BYTES = env_int("HG_HOST_MAX_HG_RESPONSE_BYTES", 256 * 1024 * 1024)
+RATE_LIMIT_ENABLED = env_bool("HG_HOST_RATE_LIMIT_ENABLED", True)
+RATE_LIMIT_MAX_FAILURES = env_int("HG_HOST_RATE_LIMIT_MAX_FAILURES", 5, minimum=1)
+RATE_LIMIT_WINDOW_SECONDS = env_int("HG_HOST_RATE_LIMIT_WINDOW_SECONDS", 300, minimum=1)
+RATE_LIMIT_COOLDOWN_SECONDS = env_int("HG_HOST_RATE_LIMIT_COOLDOWN_SECONDS", 300, minimum=1)
SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9._-]{1,62}$")
REV_RE = re.compile(r"^(null|[0-9a-fA-F]{1,40})$")
REF_TYPE_BRANCH = "branch"
@@ -52,6 +76,8 @@
SCRIPT_STYLE_RE = re.compile(r"(?is)<(script|style)\b[^>]*>.*?</\1>")
RESERVED_USERNAMES = {"dashboard", "favicon.ico", "hg", "login", "logout", "new", "settings", "signup", "static", "harrisonerd"}
WRITE_HG_COMMANDS = {"pushkey", "unbundle"}
+CSRF_COOKIE_NAME = "csrf_token"
+CSRF_FORM_FIELD = "_csrf_token"
NULL_REV = "null"
NULL_NODE = "0" * 40
README_CANDIDATES = ("README.md", "README.rst", "README.txt", "README")
@@ -133,6 +159,22 @@
"dockerfile": "language-dockerfile",
"makefile": "language-makefile",
}
+SECURITY_HEADERS = {
+ "X-Content-Type-Options": "nosniff",
+ "Referrer-Policy": "same-origin",
+ "X-Frame-Options": "DENY",
+}
+CSP_HEADER = (
+ "default-src 'self'; "
+ "base-uri 'self'; "
+ "frame-ancestors 'none'; "
+ "form-action 'self'; "
+ "object-src 'none'; "
+ "img-src 'self' data: http: https:; "
+ "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; "
+ "script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com"
+)
+AUTH_FAILURES = {}
TEMPLATE_PATH.insert(0, str(BASE_DIR / "templates"))
@@ -145,6 +187,15 @@
self.returncode = returncode
+class HgResponseTooLarge(RuntimeError):
+ pass
+
+
+def validate_startup_config():
+ if not DEBUG and SECRET_KEY == DEFAULT_SECRET_KEY:
+ raise RuntimeError("SECRET_KEY must be set to a non-default value when HG_HOST_DEBUG is disabled.")
+
+
def utcnow():
return dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
@@ -405,16 +456,138 @@
return request.environ.get("hghost.user")
+def request_is_secure():
+ forwarded = request.get_header("X-Forwarded-Proto", "")
+ if forwarded:
+ return forwarded.split(",", 1)[0].strip().lower() == "https"
+ return request.urlparts.scheme == "https"
+
+
+def is_hg_request_path():
+ path = request.environ.get("PATH_INFO", request.path) or ""
+ return path == "/hg" or path.startswith("/hg/")
+
+
+def csrf_token():
+ token = request.environ.get("hghost.csrf_token")
+ if token:
+ return token
+
+ token = request.get_cookie(CSRF_COOKIE_NAME, secret=SECRET_KEY)
+ if not token:
+ token = secrets.token_urlsafe(32)
+ response.set_cookie(
+ CSRF_COOKIE_NAME,
+ token,
+ secret=SECRET_KEY,
+ httponly=True,
+ secure=request_is_secure(),
+ samesite="Lax",
+ path="/",
+ )
+ request.environ["hghost.csrf_token"] = token
+ return token
+
+
+def csrf_field():
+ return (
+ f'<input type="hidden" name="{CSRF_FORM_FIELD}" '
+ f'value="{html.escape(csrf_token(), quote=True)}">'
+ )
+
+
+def validate_csrf_token():
+ expected = request.get_cookie(CSRF_COOKIE_NAME, secret=SECRET_KEY)
+ submitted = request.forms.get(CSRF_FORM_FIELD, "")
+ if not expected or not submitted or not hmac.compare_digest(expected, submitted):
+ abort(403, "Invalid CSRF token.")
+
+
+def request_content_length():
+ try:
+ return int(request.environ.get("CONTENT_LENGTH") or 0)
+ except ValueError:
+ return 0
+
+
+def auth_rate_key(kind, identifier=""):
+ identifier = (identifier or "").strip().lower()[:100]
+ remote_addr = request.environ.get("REMOTE_ADDR", "")
+ return f"{kind}:{remote_addr}:{identifier}"
+
+
+def rate_limit_blocked(kind, identifier=""):
+ if not RATE_LIMIT_ENABLED:
+ return False
+ now = time.time()
+ prune_auth_failures(now)
+ record = AUTH_FAILURES.get(auth_rate_key(kind, identifier))
+ return bool(record and record.get("blocked_until", 0) > now)
+
+
+def prune_auth_failures(now=None):
+ now = now or time.time()
+ for key, record in list(AUTH_FAILURES.items()):
+ if record.get("reset_at", 0) <= now and record.get("blocked_until", 0) <= now:
+ AUTH_FAILURES.pop(key, None)
+
+
+def record_auth_failure(kind, identifier=""):
+ if not RATE_LIMIT_ENABLED:
+ return
+ now = time.time()
+ prune_auth_failures(now)
+ key = auth_rate_key(kind, identifier)
+ record = AUTH_FAILURES.get(key)
+ if not record or record.get("reset_at", 0) <= now:
+ record = {"count": 0, "reset_at": now + RATE_LIMIT_WINDOW_SECONDS, "blocked_until": 0}
+ record["count"] += 1
+ if record["count"] >= RATE_LIMIT_MAX_FAILURES:
+ record["blocked_until"] = now + RATE_LIMIT_COOLDOWN_SECONDS
+ AUTH_FAILURES[key] = record
+
+
+def clear_auth_failures(kind, identifier=""):
+ AUTH_FAILURES.pop(auth_rate_key(kind, identifier), None)
+
+
+def too_many_requests_response():
+ return HTTPResponse(
+ "Too many failed attempts. Try again later.\n",
+ status=429,
+ headers={"Retry-After": str(RATE_LIMIT_COOLDOWN_SECONDS)},
+ content_type="text/plain; charset=utf-8",
+ )
+
+
@app.hook("before_request")
def load_current_user():
user_id = request.get_cookie("session", secret=SECRET_KEY)
request.environ["hghost.user"] = get_user_by_id(user_id) if user_id else None
[email protected]("before_request")
+def enforce_browser_post_security():
+ if request.method != "POST" or is_hg_request_path():
+ return
+ if MAX_FORM_BYTES and request_content_length() > MAX_FORM_BYTES:
+ abort(413, "Request body too large.")
+ validate_csrf_token()
+
+
[email protected]("after_request")
+def add_security_headers():
+ for key, value in SECURITY_HEADERS.items():
+ response.headers.setdefault(key, value)
+ if not is_hg_request_path():
+ response.headers.setdefault("Content-Security-Policy", CSP_HEADER)
+
+
def render(template_name, **context):
context.setdefault("user", current_user())
context.setdefault("error", None)
context.setdefault("notice", None)
+ context.setdefault("csrf_field", csrf_field)
context.setdefault("render_markdown_links", render_markdown_links)
context.setdefault("render_repo_description", render_repo_description)
context.setdefault("format_ref_label", format_ref_label)
@@ -430,6 +603,7 @@
str(user["id"]),
secret=SECRET_KEY,
httponly=True,
+ secure=request_is_secure(),
samesite="Lax",
path="/",
)
@@ -1124,6 +1298,21 @@
return completed.stdout if text else completed.stdout
+def truncate_bytes_for_render(content):
+ if not MAX_RENDER_BYTES or len(content) <= MAX_RENDER_BYTES:
+ return content, False
+ return content[:MAX_RENDER_BYTES], True
+
+
+def truncate_text_for_render(content, label="Preview"):
+ encoded = (content or "").encode("utf-8", "replace")
+ truncated, was_truncated = truncate_bytes_for_render(encoded)
+ text = truncated.decode("utf-8", "replace")
+ if was_truncated:
+ text = text.rstrip() + f"\n\n[{label} truncated. Use the raw view or local clone for the full content.]\n"
+ return text, was_truncated
+
+
def read_file_bytes(path, file_path, revision="tip"):
if not revision:
raise HgCommandError("File not found.")
@@ -1170,6 +1359,14 @@
return None, None
+def readme_preview_for_repo(path, files, revision="tip"):
+ name, readme = readme_for_repo(path, files, revision=revision)
+ if readme is None:
+ return name, readme, False
+ readme, truncated = truncate_text_for_render(readme, label="README preview")
+ return name, readme, truncated
+
+
def is_markdown_file(file_path):
return bool(file_path and file_path.lower().endswith((".md", ".markdown", ".mdown")))
@@ -1714,7 +1911,7 @@
completed = run_hg(["diff", "--git", "-c", node], cwd=path, check=False)
if completed.returncode != 0:
raise HgCommandError(completed.stderr.strip() or "Unable to read commit diff.", completed.returncode)
- return completed.stdout
+ return truncate_text_for_render(completed.stdout, label="Diff")[0]
def diff_between_revisions(path, base_node, source_node):
@@ -1723,7 +1920,7 @@
completed = run_hg(["diff", "--git", "--from", base_node, "--to", source_node], cwd=path, check=False)
if completed.returncode != 0:
raise HgCommandError(completed.stderr.strip() or "Unable to read diff.", completed.returncode)
- return completed.stdout
+ return truncate_text_for_render(completed.stdout, label="Diff")[0]
def ensure_clean_working_copy(path):
@@ -2204,7 +2401,7 @@
return f"{scheme}://{host}/hg/{owner_username}/{repo_name}"
-def parse_basic_auth():
+def parse_basic_auth(rate_limit_kind=None, clear_on_success=True):
header = request.get_header("Authorization", "")
if not header.lower().startswith("basic "):
return None, None
@@ -2213,10 +2410,21 @@
decoded = base64.b64decode(token).decode("utf-8")
username, password = decoded.split(":", 1)
except (ValueError, UnicodeDecodeError, base64.binascii.Error):
+ if rate_limit_kind:
+ if rate_limit_blocked(rate_limit_kind, ""):
+ return None, "rate_limited"
+ record_auth_failure(rate_limit_kind, "")
return None, "invalid"
- user = get_user_by_username(username.strip().lower())
+ username = username.strip().lower()
+ if rate_limit_kind and rate_limit_blocked(rate_limit_kind, username):
+ return None, "rate_limited"
+ user = get_user_by_username(username)
if user and verify_password(password, user["password_hash"]):
+ if rate_limit_kind and clear_on_success:
+ clear_auth_failures(rate_limit_kind, username)
return user, None
+ if rate_limit_kind:
+ record_auth_failure(rate_limit_kind, username)
return None, "invalid"
@@ -2259,9 +2467,15 @@
status_headers = {}
body_parts = []
+ body_size = 0
def write_body(chunk):
- body_parts.append(chunk if isinstance(chunk, bytes) else chunk.encode("utf-8"))
+ nonlocal body_size
+ chunk = chunk if isinstance(chunk, bytes) else chunk.encode("utf-8")
+ body_size += len(chunk)
+ if MAX_HG_RESPONSE_BYTES and body_size > MAX_HG_RESPONSE_BYTES:
+ raise HgResponseTooLarge()
+ body_parts.append(chunk)
def start_response(status, headers, exc_info=None):
status_headers["status"] = status
@@ -2275,14 +2489,21 @@
str(path).encode("utf-8"),
name=f"{repo['owner_username']}/{repo['name']}".encode("utf-8"),
)
- body_iter = hg_app(env, start_response)
try:
+ body_iter = hg_app(env, start_response)
for chunk in body_iter:
write_body(chunk)
+ except HgResponseTooLarge:
+ return HTTPResponse(
+ "Mercurial response too large.\n",
+ status=413,
+ content_type="text/plain; charset=utf-8",
+ )
finally:
- close = getattr(body_iter, "close", None)
- if close:
- close()
+ if "body_iter" in locals():
+ close = getattr(body_iter, "close", None)
+ if close:
+ close()
body = b"".join(body_parts)
raw_status = status_headers.get("status", "500 Internal Server Error")
@@ -2324,6 +2545,9 @@
username_raw = request.forms.get("username", "")
password = request.forms.get("password", "")
next_url = safe_next_url(request.forms.get("next"))
+ signup_identifier = username_raw.strip().lower()
+ if rate_limit_blocked("signup", signup_identifier):
+ return too_many_requests_response()
try:
username = normalize_slug(username_raw, "Username")
if len(password) < 8:
@@ -2335,8 +2559,10 @@
)
user = get_user_by_username(username)
login_user(user)
+ clear_auth_failures("signup", username)
redirect(next_url)
except (sqlite3.IntegrityError, ValueError) as exc:
+ record_auth_failure("signup", signup_identifier)
message = "Username already exists." if isinstance(exc, sqlite3.IntegrityError) else str(exc)
return render("signup.tpl", error=message, username=username_raw, next_url=next_url)
@@ -2349,9 +2575,13 @@
username = request.forms.get("username", "").strip().lower()
password = request.forms.get("password", "")
next_url = safe_next_url(request.forms.get("next"))
+ if rate_limit_blocked("login", username):
+ return too_many_requests_response()
user = get_user_by_username(username)
if not user or not verify_password(password, user["password_hash"]):
+ record_auth_failure("login", username)
return render("login.tpl", error="Invalid username or password.", username=username, next_url=next_url)
+ clear_auth_failures("login", username)
login_user(user)
redirect(next_url)
@@ -2433,7 +2663,7 @@
path = repo_path(owner, repo_name)
selected_ref = selected_repo_ref(path)
files = hg_files(path, selected_ref.get("node"))
- readme_name, readme = readme_for_repo(path, files, revision=selected_ref.get("node"))
+ readme_name, readme, readme_truncated = readme_preview_for_repo(path, files, revision=selected_ref.get("node"))
readme_is_markdown = is_markdown_file(readme_name)
context = repo_page_context(repo, path, selected_ref=selected_ref)
return render(
@@ -2444,6 +2674,7 @@
readme=readme,
readme_html=render_markdown(readme) if readme is not None and readme_is_markdown else None,
readme_is_markdown=readme_is_markdown,
+ readme_truncated=readme_truncated,
**context,
)
@@ -2582,14 +2813,16 @@
if file_path in files:
content = read_file_bytes(path, file_path, revision=selected_ref.get("node"))
is_binary = b"\0" in content[:4096]
+ preview_content, preview_truncated = truncate_bytes_for_render(content)
return render(
"file.tpl",
repo=repo,
file_path=file_path,
- content=content.decode("utf-8", "replace") if not is_binary else "",
+ content=preview_content.decode("utf-8", "replace") if not is_binary else "",
is_binary=is_binary,
language_class=highlight_language_class(file_path),
size=len(content),
+ preview_truncated=preview_truncated,
quote_path=quote_path,
**repo_page_context(repo, path, selected_ref=selected_ref),
)
@@ -3030,14 +3263,19 @@
if not repo:
abort(404, "Repository not found.")
- auth_user, auth_error = parse_basic_auth()
- if is_hg_write_request():
+ is_write = is_hg_write_request()
+ auth_user, auth_error = parse_basic_auth("hg" if is_write else None, clear_on_success=not is_write)
+ if is_write:
+ if auth_error == "rate_limited":
+ return too_many_requests_response()
if auth_error:
return basic_auth_challenge("Invalid Mercurial credentials.")
if not auth_user:
return basic_auth_challenge()
if not user_can_maintain_repo(auth_user, repo):
+ record_auth_failure("hg", auth_user["username"])
return HTTPResponse("Push not authorized for this repository.\n", status=403, content_type="text/plain")
+ clear_auth_failures("hg", auth_user["username"])
return hg_wsgi_response(repo, auth_user)
@@ -3052,11 +3290,17 @@
return render("error.tpl", title="Forbidden", message=getattr(error, "body", "Forbidden."))
[email protected](413)
+def request_too_large(error):
+ return render("error.tpl", title="Request too large", message=getattr(error, "body", "Request body too large."))
+
+
@app.error(500)
def server_error(error):
return render("error.tpl", title="Server error", message="Something went wrong.")
+validate_startup_config()
init_db()
diff --git a/readme (Case Conflict).md b/readme (Case Conflict).md
new file mode 100644
--- /dev/null
+++ b/readme (Case Conflict).md
@@ -0,0 +1,1 @@
+hello world
diff --git a/templates/base.tpl b/templates/base.tpl
--- a/templates/base.tpl
+++ b/templates/base.tpl
@@ -18,6 +18,7 @@
<a href="/{{user['username']}}">{{user['username']}}</a>
<a href="/new">New repo</a>
<form action="/logout" method="post">
+ {{!csrf_field()}}
<button class="link-button" type="submit">Log out</button>
</form>
% else:
diff --git a/templates/commit_detail.tpl b/templates/commit_detail.tpl
--- a/templates/commit_detail.tpl
+++ b/templates/commit_detail.tpl
@@ -45,6 +45,7 @@
% if user:
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="comment">
<label>
Add a comment
diff --git a/templates/edit_profile.tpl b/templates/edit_profile.tpl
--- a/templates/edit_profile.tpl
+++ b/templates/edit_profile.tpl
@@ -3,6 +3,7 @@
<section class="auth-card wide">
<h1>Edit profile</h1>
<form method="post">
+ {{!csrf_field()}}
<label>
Display name
<input name="display_name" value="{{profile['display_name']}}" maxlength="80">
diff --git a/templates/file.tpl b/templates/file.tpl
--- a/templates/file.tpl
+++ b/templates/file.tpl
@@ -29,6 +29,9 @@
% if is_binary:
<p class="empty">Binary file, {{size}} bytes. Use the raw view to download it.</p>
% else:
+ % if preview_truncated:
+ <p class="notice">File preview truncated. Use the raw view to download the full file.</p>
+ % end
<pre class="code"><code class="{{language_class}}">{{content}}</code></pre>
% end
</section>
diff --git a/templates/fork_repo.tpl b/templates/fork_repo.tpl
--- a/templates/fork_repo.tpl
+++ b/templates/fork_repo.tpl
@@ -3,6 +3,7 @@
<section class="auth-card wide">
<h1>Fork {{source_repo["owner_username"]}}/{{source_repo["name"]}}</h1>
<form method="post">
+ {{!csrf_field()}}
<label>
Repository name
<input name="name" value="{{name}}" required pattern="[a-z0-9][a-z0-9._-]{1,62}">
diff --git a/templates/issue_detail.tpl b/templates/issue_detail.tpl
--- a/templates/issue_detail.tpl
+++ b/templates/issue_detail.tpl
@@ -18,6 +18,7 @@
</div>
% if can_maintain:
<form method="post">
+ {{!csrf_field()}}
% if issue["status"] == "open":
<input type="hidden" name="action" value="close">
<button class="button secondary small" type="submit">Close issue</button>
@@ -51,6 +52,7 @@
% if user:
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="comment">
<label>
Add a comment
diff --git a/templates/login.tpl b/templates/login.tpl
--- a/templates/login.tpl
+++ b/templates/login.tpl
@@ -3,6 +3,7 @@
<section class="auth-card">
<h1>Log in</h1>
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="next" value="{{next_url}}">
<label>
Username
diff --git a/templates/new_issue.tpl b/templates/new_issue.tpl
--- a/templates/new_issue.tpl
+++ b/templates/new_issue.tpl
@@ -13,6 +13,7 @@
<section class="panel">
<h2>Open a new issue</h2>
<form method="post">
+ {{!csrf_field()}}
<label>
Title
<input name="title" value="{{title_value}}" required maxlength="200">
diff --git a/templates/new_pull_request.tpl b/templates/new_pull_request.tpl
--- a/templates/new_pull_request.tpl
+++ b/templates/new_pull_request.tpl
@@ -14,6 +14,7 @@
% if forks and source_options:
<h2>Open pull request</h2>
<form method="post">
+ {{!csrf_field()}}
<label>
Source ref
<select name="source_ref">
@@ -43,6 +44,7 @@
% else:
<p class="empty">You do not have a fork of this repository yet.</p>
<form class="inline-form" method="post" action="/{{repo['owner_username']}}/{{repo['name']}}/fork">
+ {{!csrf_field()}}
<input type="hidden" name="name" value="{{repo['name']}}">
<input type="hidden" name="description" value="{{repo['description']}}">
<button class="button" type="submit">Fork repository</button>
diff --git a/templates/new_repo.tpl b/templates/new_repo.tpl
--- a/templates/new_repo.tpl
+++ b/templates/new_repo.tpl
@@ -3,6 +3,7 @@
<section class="auth-card wide">
<h1>New repository</h1>
<form method="post">
+ {{!csrf_field()}}
<label>
Repository name
<input name="name" value="{{name if defined('name') else ''}}" required pattern="[a-z0-9][a-z0-9._-]{1,62}">
diff --git a/templates/pull_request_detail.tpl b/templates/pull_request_detail.tpl
--- a/templates/pull_request_detail.tpl
+++ b/templates/pull_request_detail.tpl
@@ -21,10 +21,12 @@
% if can_maintain and pr["status"] == "open":
<div class="filters">
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="merge">
<button class="button" type="submit">Merge</button>
</form>
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="close">
<button class="button secondary small" type="submit">Close</button>
</form>
@@ -57,6 +59,7 @@
% if user:
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="comment">
<label>
Add a comment
diff --git a/templates/repo.tpl b/templates/repo.tpl
--- a/templates/repo.tpl
+++ b/templates/repo.tpl
@@ -16,8 +16,14 @@
<section class="panel">
% if readme_html is not None:
+ % if readme_truncated:
+ <p class="notice">README preview truncated. Use the source or raw view for the full file.</p>
+ % end
<div class="readme markdown-body">{{!readme_html}}</div>
% elif readme is not None:
+ % if readme_truncated:
+ <p class="notice">README preview truncated. Use the source or raw view for the full file.</p>
+ % end
<pre class="readme">{{readme}}</pre>
% else:
<div class="empty">
diff --git a/templates/repo_nav.tpl b/templates/repo_nav.tpl
--- a/templates/repo_nav.tpl
+++ b/templates/repo_nav.tpl
@@ -8,6 +8,7 @@
<a class="repo-tab {{'active' if active_tab == 'pulls' else ''}}" href="/{{repo['owner_username']}}/{{repo['name']}}/pulls">Pull requests{{" (" + str(pr_counts["open"]) + ")" if pr_counts["open"] else ""}}</a>
% if user:
<form class="inline-form" method="post" action="/{{repo['owner_username']}}/{{repo['name']}}/star">
+ {{!csrf_field()}}
% if is_starred:
<input type="hidden" name="action" value="unstar">
<button class="button-link" type="submit">Unstar ({{star_count}})</button>
@@ -21,6 +22,7 @@
% end
% if user and not is_owner and not has_fork:
<form class="inline-form" method="post" action="/{{repo['owner_username']}}/{{repo['name']}}/fork">
+ {{!csrf_field()}}
<input type="hidden" name="name" value="{{repo['name']}}">
<input type="hidden" name="description" value="{{repo['description']}}">
<button class="button-link" type="submit">Fork</button>
diff --git a/templates/repo_settings.tpl b/templates/repo_settings.tpl
--- a/templates/repo_settings.tpl
+++ b/templates/repo_settings.tpl
@@ -13,6 +13,7 @@
<section class="panel">
<h2>Description</h2>
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="save">
<label>
Repository description
@@ -30,6 +31,7 @@
<li class="panel-heading">
<span><a href="/{{contributor['username']}}">{{contributor["username"]}}</a> <span class="muted">added {{contributor["contributor_since"]}}</span></span>
<form class="inline-form" method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="remove_contributor">
<input type="hidden" name="user_id" value="{{contributor['id']}}">
<button class="button secondary small" type="submit">Remove</button>
@@ -41,6 +43,7 @@
<p class="empty">No contributors yet.</p>
% end
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="add_contributor">
<label>
Username
@@ -54,6 +57,7 @@
<h2>Delete repository</h2>
<p>This permanently deletes the repository, its issues, pull requests, and Mercurial data.</p>
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="action" value="delete">
<label>
Type {{repo["name"]}} to confirm
diff --git a/templates/signup.tpl b/templates/signup.tpl
--- a/templates/signup.tpl
+++ b/templates/signup.tpl
@@ -3,6 +3,7 @@
<section class="auth-card">
<h1>Create account</h1>
<form method="post">
+ {{!csrf_field()}}
<input type="hidden" name="next" value="{{next_url}}">
<label>
Username
diff --git a/tests/test_app.py b/tests/test_app.py
--- a/tests/test_app.py
+++ b/tests/test_app.py
@@ -3,6 +3,7 @@
from http.cookies import SimpleCookie
from io import BytesIO, StringIO
import os
+import re
import shutil
import subprocess
import sys
@@ -57,6 +58,7 @@
def __init__(self, wsgi_app):
self.wsgi_app = wsgi_app
self.cookies = {}
+ self.csrf_token = None
def get(self, path, headers=None):
return self.request("GET", path, headers=headers)
@@ -68,7 +70,16 @@
headers = headers or {}
split = urlsplit(path)
body = b""
+ if method == "POST" and data is None and not split.path.startswith("/hg/") and self.csrf_token:
+ data = {}
if data is not None:
+ if (
+ method == "POST"
+ and not split.path.startswith("/hg/")
+ and hglab.CSRF_FORM_FIELD not in data
+ and self.csrf_token
+ ):
+ data = {**data, hglab.CSRF_FORM_FIELD: self.csrf_token}
body = urlencode(data, doseq=True).encode("utf-8")
headers = {"Content-Type": "application/x-www-form-urlencoded", **headers}
environ = {
@@ -118,6 +129,7 @@
close()
response = WsgiResponse(captured["status"], captured["headers"], response_body)
self._store_cookies(response.headers)
+ self._store_csrf_token(response.text)
return response
def _store_cookies(self, headers):
@@ -132,11 +144,22 @@
else:
self.cookies[name] = morsel.value
+ def _store_csrf_token(self, text):
+ match = re.search(r'name="_csrf_token"\s+value="([^"]+)"', text)
+ if match:
+ self.csrf_token = match.group(1)
+
+
+def login_client(client, username, password="correct horse battery staple", next_url="/"):
+ client.get("/login")
+ return client.post("/login", {"username": username, "password": password, "next": next_url})
+
@pytest.fixture()
def isolated_app(tmp_path, monkeypatch):
monkeypatch.setattr(hglab, "DB_PATH", tmp_path / "hghost.sqlite3")
monkeypatch.setattr(hglab, "REPO_ROOT", tmp_path / "repos")
+ hglab.AUTH_FAILURES.clear()
hglab.init_db()
return hglab
@@ -274,6 +297,117 @@
assert rendered == '<a href="https://example.com">site</a> bold <strong>x</strong>'
+def test_startup_config_rejects_default_secret_outside_debug(monkeypatch):
+ monkeypatch.setattr(hglab, "DEBUG", False)
+ monkeypatch.setattr(hglab, "SECRET_KEY", hglab.DEFAULT_SECRET_KEY)
+
+ with pytest.raises(RuntimeError):
+ hglab.validate_startup_config()
+
+ monkeypatch.setattr(hglab, "SECRET_KEY", "production-secret")
+ hglab.validate_startup_config()
+
+
+def test_security_headers_and_secure_cookie_flags(isolated_app):
+ client = WsgiClient(isolated_app.app)
+
+ response = client.get("/login", headers={"X-Forwarded-Proto": "https"})
+ assert response.status_code == 200
+ assert response.header("X-Content-Type-Options") == "nosniff"
+ assert response.header("Referrer-Policy") == "same-origin"
+ assert response.header("X-Frame-Options") == "DENY"
+ assert "frame-ancestors 'none'" in response.header("Content-Security-Policy")
+ csrf_cookie = response.header("Set-Cookie")
+ assert "csrf_token=" in csrf_cookie
+ assert "HttpOnly" in csrf_cookie
+ assert "samesite=lax" in csrf_cookie.lower()
+ assert "Secure" in csrf_cookie
+
+ create_user("alice")
+ response = client.post(
+ "/login",
+ {"username": "alice", "password": "correct horse battery staple", "next": "/"},
+ headers={"X-Forwarded-Proto": "https"},
+ )
+ assert response.status_code == 303
+ session_cookie = response.header("Set-Cookie")
+ assert "session=" in session_cookie
+ assert "HttpOnly" in session_cookie
+ assert "samesite=lax" in session_cookie.lower()
+ assert "Secure" in session_cookie
+
+
+def test_csrf_required_for_browser_posts_and_hg_is_exempt(isolated_app):
+ owner = create_user("alice", password="owner-password")
+ isolated_app.create_repository(owner, "demo", "")
+ client = WsgiClient(isolated_app.app)
+
+ response = client.post("/login", {"username": "alice", "password": "owner-password", "next": "/"})
+ assert response.status_code == 403
+ assert "Invalid CSRF token." in response.text
+
+ client.get("/login")
+ response = client.post(
+ "/login",
+ {"username": "alice", "password": "owner-password", "next": "/", hglab.CSRF_FORM_FIELD: "bad-token"},
+ )
+ assert response.status_code == 403
+
+ response = client.post("/hg/alice/demo?cmd=unbundle")
+ assert response.status_code == 401
+ assert response.header("WWW-Authenticate") == 'Basic realm="HgHost"'
+
+
+def test_browser_form_size_limit(isolated_app, monkeypatch):
+ monkeypatch.setattr(hglab, "MAX_FORM_BYTES", 40)
+ client = WsgiClient(isolated_app.app)
+ client.get("/login")
+
+ response = client.post("/login", {"username": "a" * 100, "password": "bad", "next": "/"})
+ assert response.status_code == 413
+ assert "Request body too large." in response.text
+
+
+def test_login_and_hg_auth_failures_are_rate_limited(isolated_app, monkeypatch):
+ monkeypatch.setattr(hglab, "RATE_LIMIT_MAX_FAILURES", 2)
+ monkeypatch.setattr(hglab, "RATE_LIMIT_COOLDOWN_SECONDS", 60)
+ create_user("alice", password="owner-password")
+ client = WsgiClient(isolated_app.app)
+ client.get("/login")
+
+ for _ in range(2):
+ response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
+ assert response.status_code == 200
+ response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
+ assert response.status_code == 429
+ assert response.header("Retry-After") == "60"
+
+ hglab.AUTH_FAILURES.clear()
+ owner = hglab.get_user_by_username("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ for _ in range(2):
+ response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
+ assert response.status_code == 401
+ response = client.post("/hg/alice/demo?cmd=unbundle", headers=basic_auth("alice", "wrong"))
+ assert response.status_code == 429
+
+
+def test_readme_and_file_previews_are_truncated(isolated_app, monkeypatch):
+ monkeypatch.setattr(hglab, "MAX_RENDER_BYTES", 32)
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ commit_file(isolated_app.repo_path("alice", "demo"), "README.md", "A" * 200, message="large readme")
+ client = WsgiClient(isolated_app.app)
+
+ response = client.get("/alice/demo")
+ assert response.status_code == 200
+ assert "README preview truncated." in response.text
+
+ response = client.get("/alice/demo/src/README.md")
+ assert response.status_code == 200
+ assert "File preview truncated." in response.text
+
+
def test_build_tree_deduplicates_entries_and_sorts_directories_first():
files = ["README.md", "src/app.py", "src/utils/helpers.py", "docs/index.md", "src/z.txt"]
@@ -620,6 +754,8 @@
assert response.status_code == 303
assert response.location_path == "/login?next=/new"
+ response = client.get("/signup?next=/new")
+ assert response.status_code == 200
response = client.post(
"/signup",
{"username": "alice", "password": "password123", "next": "/new"},
@@ -700,10 +836,7 @@
create_repo_with_refs(owner)
bob_client = WsgiClient(isolated_app.app)
- response = bob_client.post(
- "/login",
- {"username": "bob", "password": "correct horse battery staple", "next": "/"},
- )
+ response = login_client(bob_client, "bob")
assert response.status_code == 303
response = bob_client.post("/alice/demo/star", {"action": "star"})
@@ -717,10 +850,7 @@
assert "Fork of <a href=\"/alice/demo\">alice/demo</a>" in bob_client.get("/bob/demo").text
owner_client = WsgiClient(isolated_app.app)
- response = owner_client.post(
- "/login",
- {"username": "alice", "password": "correct horse battery staple", "next": "/"},
- )
+ response = login_client(owner_client, "alice")
assert response.status_code == 303
response = owner_client.post(
@@ -756,7 +886,7 @@
owner = create_user("alice")
create_repo_with_refs(owner)
client = WsgiClient(isolated_app.app)
- client.post("/login", {"username": "alice", "password": "correct horse battery staple", "next": "/"})
+ login_client(client, "alice")
response = client.get("/alice/demo/issues/new")
assert response.status_code == 200
@@ -805,7 +935,7 @@
source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")
bob_client = WsgiClient(isolated_app.app)
- bob_client.post("/login", {"username": "bob", "password": "correct horse battery staple", "next": "/"})
+ login_client(bob_client, "bob")
response = bob_client.get("/alice/demo/pulls/new")
assert response.status_code == 200
@@ -841,7 +971,7 @@
assert "Only maintainers can update pull requests." in response.text
owner_client = WsgiClient(isolated_app.app)
- owner_client.post("/login", {"username": "alice", "password": "correct horse battery staple", "next": "/"})
+ login_client(owner_client, "alice")
response = owner_client.post("/alice/demo/pulls/1", {"action": "merge"})
assert response.status_code == 303
@patx: This commit hardens security for the app. 2026-05-05T04:24:05Z