#!/usr/bin/env python3
"""Portable state-path helpers for agent-learning-compounder.

Also home to the durable-write primitives (``atomic_write_text``,
``atomic_rewrite``) used by every script that mutates persistent state.
They live here so the three pre-fix truncate-in-place sites in
refresh_learning_state and causal_probe share one implementation.
"""

from __future__ import annotations

import contextlib
import fcntl
import hashlib
import os
import pathlib
import warnings
import re
import stat

from state_handle import StateHandle

warnings.warn(
    "state_paths is deprecated; use bin.state_handle.StateHandle for canonical state resolution",
    DeprecationWarning,
    stacklevel=2,
)


def slugify(value: str, fallback: str = "repo") -> str:
    slug = re.sub(r"[^A-Za-z0-9._-]+", "-", value.strip()).strip("-").lower()
    return slug or fallback


def repo_id(repo: pathlib.Path) -> str:
    resolved = str(repo.expanduser().resolve())
    digest = hashlib.sha256(resolved.encode("utf-8")).hexdigest()[:12]
    return f"{slugify(pathlib.Path(resolved).name)}-{digest}"


def resolve_state_dir(
    state_dir: str | pathlib.Path | None = None,
    personal: str | pathlib.Path | None = None,
    repo: str | pathlib.Path | None = None,
) -> pathlib.Path:
    """Resolve the agent-learning state root using the legacy precedence chain.

    Precedence (highest wins):
      1. ``state_dir`` argument (typically ``--state-dir``)
      2. ``AGENT_LEARNING_STATE_DIR`` environment variable
      3. ``personal`` argument (typically ``--personal``) — resolves under
         ``<personal>/reports/agent-learning``
      4. ``<repo>/.agent-learning`` when ``repo`` is supplied
      5. ``$XDG_STATE_HOME/agent-learning`` when set
      6. ``~/.local/state/agent-learning`` as the final fallback

    This precedence is part of the public contract; tests in
    ``fixtures/tests/test_state_paths_precedence.py`` lock it in place.
    Callers that want the XDG/home fallback even when a repo path is known
    must pass ``repo=None``.
    """
    return StateHandle.resolve_state_root(state_dir, personal, repo)[0]


def repo_state_dir(repo: str | pathlib.Path, state_dir: str | pathlib.Path | None = None, personal: str | pathlib.Path | None = None) -> pathlib.Path:
    resolved, _ = StateHandle.resolve_repo_state_dir(repo, state_dir, personal)
    return resolved


# --- durable-write primitives ------------------------------------------------
#
# Why a sidecar lockfile, not flock on the data file itself: the data file's
# inode lock does not survive its own os.replace. After one writer's atomic
# swap, a concurrent writer that opened the original inode before the swap
# still holds a lock on that orphaned inode and its own os.replace silently
# clobbers the first writer's change. The sidecar `<path>.lock` is never
# renamed, so its lock remains a valid mutex across replaces.
#
# Why pid-tagged tmp filenames: a shared `<path>.tmp` would be truncated by a
# concurrent writer's open("w"), and the loser's os.replace would then
# FileNotFoundError on a tmp the winner already consumed.


def _assert_regular_or_absent(path: pathlib.Path) -> None:
    """Reject targets that already exist as a symlink or non-regular file.
    Inlined here (rather than imported from collect_hook_event) to avoid
    the import cycle and to ensure every atomic_* primitive enforces the
    check under its own lock, closing the TOCTOU where a caller checks
    once before the lock is acquired."""
    try:
        mode = path.lstat().st_mode
    except FileNotFoundError:
        return
    if stat.S_ISLNK(mode):
        raise ValueError(f"refusing to write through symlink: {path}")
    if not stat.S_ISREG(mode):
        raise ValueError(f"refusing to write to non-regular file: {path}")


def _write_tmp_then_replace(path: pathlib.Path, text: str, mode: int) -> None:
    tmp = path.parent / f"{path.name}.{os.getpid()}.tmp"
    fd = os.open(str(tmp), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode)
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as fh:
            fh.write(text)
            fh.flush()
            os.fsync(fh.fileno())
        os.replace(tmp, path)
    finally:
        if tmp.exists():
            try:
                tmp.unlink()
            except OSError:
                pass


def atomic_write_text(path: pathlib.Path, text: str, *, mode: int = 0o644) -> None:
    """Write ``text`` to ``path`` atomically, serialized against other writers
    via ``<path>.lock``. Crash-atomic: a SIGKILL between open and os.replace
    leaves the file at its prior content -- never truncated.

    Refuses to write through a symlink or to a non-regular file at the
    destination, checked under the lock so a symlink swap between an
    earlier caller-side check and the actual write cannot slip through.
    """
    path.parent.mkdir(parents=True, exist_ok=True)
    lock_path = path.parent / f"{path.name}.lock"
    lock_fd = os.open(str(lock_path), os.O_RDWR | os.O_CREAT, 0o644)
    try:
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        _assert_regular_or_absent(path)
        _write_tmp_then_replace(path, text, mode)
    finally:
        try:
            fcntl.flock(lock_fd, fcntl.LOCK_UN)
        except OSError:
            pass
        os.close(lock_fd)


@contextlib.contextmanager
def atomic_rewrite(path: pathlib.Path, *, mode: int = 0o644):
    """Acquire the sidecar lock for ``path`` and yield ``(current_text,
    commit)``. The caller reads ``current_text``, computes the new content,
    and calls ``commit(new_text)`` to atomically replace the file. The lock
    is held across read+compute+commit, so concurrent writers serialize.

    Replaces the seek+truncate+write pattern that left the file empty on
    mid-write process death. Callers that decide not to write can simply
    not call ``commit`` -- the file is untouched on exit.

    Refuses to operate on a symlink or non-regular file at the destination
    (under the lock), closing the symlink-swap TOCTOU between an earlier
    caller-side validation and the read or write.
    """
    path.parent.mkdir(parents=True, exist_ok=True)
    lock_path = path.parent / f"{path.name}.lock"
    lock_fd = os.open(str(lock_path), os.O_RDWR | os.O_CREAT, 0o644)
    try:
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        _assert_regular_or_absent(path)
        current = path.read_text(encoding="utf-8") if path.exists() else ""

        def commit(new_text: str) -> None:
            _write_tmp_then_replace(path, new_text, mode)

        yield current, commit
    finally:
        try:
            fcntl.flock(lock_fd, fcntl.LOCK_UN)
        except OSError:
            pass
        os.close(lock_fd)
