"""
AQC News & External Sentiment Bridge V2.1

Stable local sentiment bridge for Adaptive Quant Core X1.

Design goals:
- The EA request path never fetches external internet sources live.
- /sentiment returns from memory/persistent cache instantly.
- Background refresh uses per-provider intervals, timeouts, circuit breakers, and atomic cache replacement.
- Last-known-good sentiment persists across bridge restarts.
- Status hysteresis prevents noisy LKG <-> DEGRADED flipping.
- Provider health is separated from the main sentiment status.

Default service:
    http://127.0.0.1:8010

Endpoints:
    /health
    /sentiment?symbol=EURUSD
    /sources
    /cache
    /refresh?symbol=XAUUSD
"""
from __future__ import annotations

import json
import os
import socket
import threading
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse

APP_VERSION = "2.1"
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = int(os.getenv("AQC_SENTIMENT_PORT", "8010"))
SCRIPT_DIR = Path(__file__).resolve().parent
CACHE_FILE = Path(os.getenv("AQC_SENTIMENT_CACHE_FILE", str(SCRIPT_DIR / "adaptive_regime_trader_context_cache_v2_1.json")))
LOG_FILE = Path(os.getenv("AQC_SENTIMENT_LOG_FILE", str(SCRIPT_DIR / "aqc_sentiment_bridge_v2_1.log")))

# Cache/refresh defaults tuned for M15 trading and free public sources.
STALE_SECONDS = int(os.getenv("AQC_SENTIMENT_STALE_SECONDS", "3600"))
LKG_SECONDS = int(os.getenv("AQC_SENTIMENT_LKG_SECONDS", "7200"))
STATUS_HOLD_SECONDS = int(os.getenv("AQC_SENTIMENT_STATUS_HOLD_SECONDS", "300"))
DEGRADED_STRIKES = int(os.getenv("AQC_SENTIMENT_DEGRADED_STRIKES", "3"))
WARMUP_MIN_SECONDS = int(os.getenv("AQC_SENTIMENT_WARMUP_MIN_SECONDS", "2"))
MAX_ACTIVE_SYMBOLS = int(os.getenv("AQC_SENTIMENT_MAX_ACTIVE_SYMBOLS", "32"))
SYMBOL_IDLE_SECONDS = int(os.getenv("AQC_SENTIMENT_SYMBOL_IDLE_SECONDS", "7200"))
PRELOAD_SYMBOLS = [s.strip().upper() for s in os.getenv("AQC_SENTIMENT_PRELOAD_SYMBOLS", "EURUSD,XAUUSD,BTCUSD").split(",") if s.strip()]
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 AQC-Sentiment-Bridge/2.1"

PROVIDER_CONFIG = {
    "gdelt": {"interval": int(os.getenv("AQC_GDELT_INTERVAL", "600")), "timeout": float(os.getenv("AQC_GDELT_TIMEOUT", "12.0")), "cooldown": int(os.getenv("AQC_GDELT_COOLDOWN", "900"))},
    "coingecko": {"interval": int(os.getenv("AQC_COINGECKO_INTERVAL", "240")), "timeout": float(os.getenv("AQC_COINGECKO_TIMEOUT", "3.0")), "cooldown": int(os.getenv("AQC_COINGECKO_COOLDOWN", "600"))},
    "alternative_fng": {"interval": int(os.getenv("AQC_FNG_INTERVAL", "900")), "timeout": float(os.getenv("AQC_FNG_TIMEOUT", "3.0")), "cooldown": int(os.getenv("AQC_FNG_COOLDOWN", "900"))},
}

app = FastAPI(title="Adaptive Regime Trader Context Bridge", version=APP_VERSION)

_cache_lock = threading.RLock()
_payload_cache: Dict[str, Dict[str, Any]] = {}
_payload_time: Dict[str, float] = {}
_active_symbols: Dict[str, float] = {}
_refreshing: Dict[str, bool] = {}
_provider_health: Dict[str, Dict[str, Any]] = {}
_provider_state: Dict[str, Dict[str, Any]] = {}
_symbol_state: Dict[str, Dict[str, Any]] = {}
_stop_event = threading.Event()
_worker_started = False

POSITIVE_WORDS = {
    "rise", "rises", "rising", "higher", "gain", "gains", "gained", "rally", "rallies",
    "bullish", "support", "supports", "strong", "strength", "beats", "beat", "optimism",
    "improves", "improved", "recovery", "surge", "surges", "up", "hawkish", "safe haven",
    "demand", "inflows", "buying", "accumulation", "breakout", "eases", "cooling"
}
NEGATIVE_WORDS = {
    "fall", "falls", "falling", "lower", "drop", "drops", "dropped", "decline", "declines",
    "bearish", "weak", "weakens", "weakness", "misses", "miss", "fear", "risk", "selloff",
    "sell-off", "slump", "down", "dovish", "outflows", "selling", "recession", "crisis",
    "war", "tension", "sanctions", "inflation", "yields rise", "rates rise", "hawkish fed"
}
USD_STRONG_PHRASES = {"dollar rises", "dollar strengthens", "strong dollar", "usd rises", "usd strengthens", "fed hawkish", "hawkish fed", "yields rise", "treasury yields rise", "rates rise"}
USD_WEAK_PHRASES = {"dollar falls", "dollar weakens", "weak dollar", "usd falls", "usd weakens", "fed dovish", "dovish fed", "yields fall", "treasury yields fall", "rates fall"}
GOLD_BULLISH_PHRASES = {"gold rises", "gold gains", "bullion rises", "safe haven", "geopolitical risk", "central bank buying", "gold demand", "inflation hedge", "risk-off", "risk off"}
GOLD_BEARISH_PHRASES = {"gold falls", "gold drops", "bullion falls", "higher yields", "strong dollar", "dollar strengthens", "rates rise", "hawkish fed", "gold outflows"}
BTC_BULLISH_PHRASES = {"bitcoin rises", "bitcoin gains", "btc rises", "crypto rally", "spot etf inflows", "institutional demand", "risk-on", "risk on", "crypto inflows"}
BTC_BEARISH_PHRASES = {"bitcoin falls", "bitcoin drops", "btc falls", "crypto selloff", "risk-off", "risk off", "regulatory crackdown", "exchange hack", "outflows", "liquidations"}


def now_utc_iso() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")


def clamp(v: float, lo: float, hi: float) -> float:
    return max(lo, min(hi, v))


def sign(v: float, threshold: float = 0.08) -> int:
    if v > threshold:
        return 1
    if v < -threshold:
        return -1
    return 0


def log_line(message: str) -> None:
    try:
        line = f"{now_utc_iso()} {message}\n"
        with LOG_FILE.open("a", encoding="utf-8") as f:
            f.write(line)
    except Exception:
        pass


def provider_cfg(provider: str) -> Dict[str, Any]:
    return PROVIDER_CONFIG.get(provider, {"interval": 300, "timeout": 3.0, "cooldown": 600})


def set_provider_health(name: str, ok: bool, reason: str, elapsed_ms: float = 0.0, skipped: bool = False) -> None:
    with _cache_lock:
        state = _provider_state.setdefault(name, {"failures": 0, "circuit_until": 0.0, "last_attempt": 0.0, "last_ok": 0.0})
        now = time.time()
        state["last_attempt"] = now
        if ok:
            state["failures"] = 0
            state["last_ok"] = now
            state["circuit_until"] = 0.0
        elif not skipped:
            state["failures"] = int(state.get("failures", 0)) + 1
            if state["failures"] >= DEGRADED_STRIKES:
                state["circuit_until"] = now + float(provider_cfg(name).get("cooldown", 600))
        _provider_health[name] = {
            "ok": bool(ok),
            "reason": reason,
            "elapsed_ms": round(elapsed_ms, 1),
            "time_utc": now_utc_iso(),
            "failures": int(state.get("failures", 0)),
            "circuit_open": time.time() < float(state.get("circuit_until", 0.0)),
            "circuit_retry_seconds": max(0, int(float(state.get("circuit_until", 0.0)) - time.time())),
            "skipped": bool(skipped),
        }


def provider_available(provider: str) -> bool:
    # V2: provider polling frequency is controlled per symbol by the cache scheduler.
    # This function only enforces circuit-breaker state. The v1.02 global
    # per-provider interval could make EURUSD/XAUUSD/BTCUSD block each other and
    # return empty neutral payloads even though the service was reachable.
    with _cache_lock:
        state = _provider_state.setdefault(provider, {"failures": 0, "circuit_until": 0.0, "last_attempt": 0.0, "last_ok": 0.0})
        until = float(state.get("circuit_until", 0.0))
        if until > time.time():
            set_provider_health(provider, False, "circuit_open", 0.0, skipped=True)
            return False
    return True


def _short_error(exc: Exception) -> str:
    """Return a compact but useful provider error for /health diagnostics."""
    msg = str(exc).replace("\n", " ").strip()
    if len(msg) > 180:
        msg = msg[:177] + "..."
    return f"{type(exc).__name__}: {msg}" if msg else type(exc).__name__


def _urlopen_json_no_health(url: str, timeout: float) -> Dict[str, Any]:
    # Keep provider fetch isolated from the EA request path. This helper is used
    # only by the background worker / manual debug endpoints.
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/json,text/plain,*/*",
        "Connection": "close",
        "Cache-Control": "no-cache",
    }
    req = urllib.request.Request(url, headers=headers)
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        raw = resp.read().decode("utf-8", errors="replace")
        return json.loads(raw)


def http_json(url: str, provider: str) -> Optional[Dict[str, Any]]:
    # Generic one-shot provider fetch for the faster non-GDELT providers.
    return http_json_candidates([url], provider, force=False)


def http_json_candidates(urls: List[str], provider: str, force: bool = False) -> Optional[Dict[str, Any]]:
    """Try multiple candidate URLs before marking a provider failed.

    This is mainly for GDELT reliability: some VPS/Python builds can establish
    TCP but timeout during HTTPS handshake. We therefore try HTTPS first, then
    HTTP/fallback query variants, with one controlled provider failure recorded
    only after all candidates fail.
    """
    if not force and not provider_available(provider):
        return None
    timeout = float(provider_cfg(provider).get("timeout", 3.0))
    t0 = time.time()
    errors: List[str] = []
    for i, url in enumerate(urls, start=1):
        try:
            data = _urlopen_json_no_health(url, timeout=timeout)
            set_provider_health(provider, True, f"OK candidate={i}", (time.time() - t0) * 1000.0)
            return data
        except Exception as exc:
            errors.append(f"candidate={i} {_short_error(exc)}")
            # Tiny backoff reduces immediate repeated TLS/handshake failures but
            # keeps this entirely off the EA hot path.
            time.sleep(0.20)
    reason = " | ".join(errors[-3:]) if errors else "unknown_error"
    set_provider_health(provider, False, reason, (time.time() - t0) * 1000.0)
    log_line(f"provider_failed provider={provider} err={reason}")
    return None


def lower_join(items: List[str]) -> str:
    return " | ".join(items).lower()


def lexicon_score(text: str) -> float:
    t = text.lower()
    score = 0.0
    for w in POSITIVE_WORDS:
        if w in t:
            score += 1.0
    for w in NEGATIVE_WORDS:
        if w in t:
            score -= 1.0
    return clamp(score / 6.0, -1.0, 1.0)


def phrase_score(text: str, positive: set[str], negative: set[str]) -> float:
    t = text.lower()
    score = 0.0
    for p in positive:
        if p in t:
            score += 1.0
    for n in negative:
        if n in t:
            score -= 1.0
    return clamp(score / 4.0, -1.0, 1.0)


def symbol_family(symbol: str) -> str:
    s = symbol.upper()
    if "BTC" in s or "ETH" in s or "CRYPTO" in s:
        return "BTCUSD"
    if "XAU" in s or "GOLD" in s:
        return "XAUUSD"
    if "EUR" in s and "USD" in s:
        return "EURUSD"
    if "GBP" in s and "USD" in s:
        return "GBPUSD"
    if "USD" in s and "JPY" in s:
        return "USDJPY"
    return s


def relevant_providers_for(symbol: str) -> List[str]:
    fam = symbol_family(symbol)
    if fam == "BTCUSD":
        return ["gdelt", "alternative_fng", "coingecko"]
    return ["gdelt"]


def gdelt_query_for(symbol: str) -> str:
    fam = symbol_family(symbol)
    if fam == "XAUUSD":
        return '("spot gold" OR gold OR bullion OR "safe haven" OR "Federal Reserve" OR "US dollar" OR "Treasury yields" OR inflation OR "central bank gold" OR geopolitical OR "interest rates")'
    if fam == "BTCUSD":
        return '(bitcoin OR BTC OR crypto OR cryptocurrency OR "spot ETF" OR blockchain OR "risk on" OR "risk off")'
    if fam == "EURUSD":
        return '("EURUSD" OR "EUR/USD" OR euro OR "European Central Bank" OR ECB OR "US dollar" OR dollar OR "Federal Reserve" OR Fed OR inflation OR "interest rates")'
    if fam == "GBPUSD":
        return '(GBP OR sterling OR pound OR "Bank of England" OR USD OR dollar OR "Federal Reserve" OR Fed OR inflation OR rates)'
    if fam == "USDJPY":
        return '(USD OR dollar OR JPY OR yen OR "Bank of Japan" OR "Federal Reserve" OR yields OR rates)'
    return f'({symbol} OR markets OR dollar OR rates OR inflation)'


def gdelt_query_tiers_for(symbol: str) -> List[str]:
    fam = symbol_family(symbol)
    primary = gdelt_query_for(symbol)
    if fam == "XAUUSD":
        return [
            primary,
            '(gold OR bullion OR "spot gold" OR "US dollar" OR dollar OR "Treasury yields" OR "Federal Reserve")',
            '(gold OR bullion OR inflation OR rates OR yields OR geopolitical OR "safe haven")',
        ]
    if fam == "EURUSD":
        return [
            primary,
            '(euro OR EUR OR ECB OR dollar OR USD OR Fed OR "Federal Reserve")',
            '(forex OR currency OR dollar OR euro OR rates OR inflation)',
        ]
    if fam == "BTCUSD":
        return [
            primary,
            '(bitcoin OR BTC OR crypto OR cryptocurrency)',
            '(crypto OR bitcoin OR "risk on" OR "risk off")',
        ]
    return [primary, f'({symbol} OR markets OR dollar OR rates)', '(markets OR dollar OR inflation OR rates)']


def gdelt_candidate_urls(symbol: str) -> List[str]:
    urls: List[str] = []
    # Smaller maxrecords and simpler fallback tiers reduce slow GDELT responses.
    for q_index, query in enumerate(gdelt_query_tiers_for(symbol), start=1):
        params = {
            "query": query,
            "mode": "artlist",
            "format": "json",
            "maxrecords": "30" if q_index == 1 else "20",
            "timespan": "24h",
            "sort": "hybridrel",
        }
        qs = urllib.parse.urlencode(params)
        urls.append("https://api.gdeltproject.org/api/v2/doc/doc?" + qs)
        # HTTP fallback avoids SSL-handshake timeouts seen on some VPS/Python
        # combinations. If GDELT redirects to HTTPS, the error detail will show it.
        urls.append("http://api.gdeltproject.org/api/v2/doc/doc?" + qs)
    return urls


def fetch_gdelt_articles(symbol: str) -> Tuple[float, int, str, List[str]]:
    data = http_json_candidates(gdelt_candidate_urls(symbol), "gdelt", force=False)
    if not data or "articles" not in data:
        return 0.0, 0, "GDELT unavailable/skipped", []
    articles = data.get("articles") or []
    titles: List[str] = []
    for a in articles[:50]:
        title = str(a.get("title") or "")
        domain = str(a.get("domain") or "")
        if title:
            titles.append(title + (f" ({domain})" if domain else ""))
    text = lower_join(titles[:35])
    fam = symbol_family(symbol)
    base = lexicon_score(text)
    if fam == "XAUUSD":
        directed = phrase_score(text, GOLD_BULLISH_PHRASES | USD_WEAK_PHRASES, GOLD_BEARISH_PHRASES | USD_STRONG_PHRASES)
    elif fam == "BTCUSD":
        directed = phrase_score(text, BTC_BULLISH_PHRASES | USD_WEAK_PHRASES, BTC_BEARISH_PHRASES | USD_STRONG_PHRASES)
    elif fam in {"EURUSD", "GBPUSD"}:
        usd = phrase_score(text, USD_WEAK_PHRASES, USD_STRONG_PHRASES)
        local_positive = phrase_score(text, {"euro rises", "eur rises", "ecb hawkish", "sterling rises", "pound rises"}, {"euro falls", "eur falls", "ecb dovish", "sterling falls", "pound falls"})
        directed = clamp(usd + local_positive, -1.0, 1.0)
    else:
        directed = base
    score = clamp(0.35 * base + 0.65 * directed, -1.0, 1.0)
    return score, len(titles), f"GDELT {len(titles)} headlines score={score:+.2f}", titles[:5]


def debug_gdelt_fetch(symbol: str) -> Dict[str, Any]:
    """Manual diagnostic endpoint. Bypasses circuit breaker and tries every candidate."""
    timeout = float(provider_cfg("gdelt").get("timeout", 12.0))
    results: List[Dict[str, Any]] = []
    for i, url in enumerate(gdelt_candidate_urls(symbol), start=1):
        t0 = time.time()
        try:
            data = _urlopen_json_no_health(url, timeout=timeout)
            articles = data.get("articles", []) if isinstance(data, dict) else []
            results.append({"candidate": i, "ok": True, "elapsed_ms": round((time.time()-t0)*1000.0, 1), "articles": len(articles), "url": url[:220]})
        except Exception as exc:
            results.append({"candidate": i, "ok": False, "elapsed_ms": round((time.time()-t0)*1000.0, 1), "error": _short_error(exc), "url": url[:220]})
    return {"ok": any(r.get("ok") for r in results), "symbol": symbol.upper(), "timeout": timeout, "results": results}


def fetch_fear_greed() -> Tuple[Optional[float], str]:
    data = http_json("https://api.alternative.me/fng/?limit=1&format=json", "alternative_fng")
    try:
        item = data["data"][0]
        value = float(item["value"])
        classification = str(item.get("value_classification", ""))
        score = clamp((value - 50.0) / 50.0, -1.0, 1.0)
        return score, f"FearGreed {value:.0f} {classification} score={score:+.2f}"
    except Exception:
        return None, "FearGreed unavailable/skipped"


def fetch_coingecko_context() -> Tuple[Optional[float], str]:
    url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd&include_24hr_change=true"
    data = http_json(url, "coingecko")
    try:
        btc = float(data["bitcoin"].get("usd_24h_change", 0.0))
        eth = float(data["ethereum"].get("usd_24h_change", 0.0))
        avg = (btc * 0.7 + eth * 0.3)
        score = clamp(avg / 8.0, -1.0, 1.0)
        return score, f"CoinGecko BTC24h={btc:+.2f}% ETH24h={eth:+.2f}% score={score:+.2f}"
    except Exception:
        return None, "CoinGecko unavailable/skipped"


def source_health_for(symbol: str) -> Tuple[str, Dict[str, Any]]:
    relevant = relevant_providers_for(symbol)
    with _cache_lock:
        snapshot = {p: dict(_provider_health.get(p, {"ok": False, "reason": "not_attempted", "failures": 0, "circuit_open": False})) for p in relevant}
    if not snapshot:
        return "UNKNOWN", snapshot
    oks = sum(1 for h in snapshot.values() if h.get("ok"))
    if oks == len(snapshot):
        return "OK", snapshot
    if oks > 0:
        return "PARTIAL", snapshot
    return "DEGRADED", snapshot


def combine_sources(symbol: str) -> Dict[str, Any]:
    fam = symbol_family(symbol)
    sources: List[Tuple[str, float, float, str]] = []
    headlines: List[str] = []
    gdelt_score, count, gdelt_reason, headlines = fetch_gdelt_articles(symbol)
    if count > 0:
        sources.append(("gdelt", gdelt_score, 0.50 if fam == "BTCUSD" else 0.85, gdelt_reason))
    if fam == "BTCUSD":
        fg, fg_reason = fetch_fear_greed()
        if fg is not None:
            sources.append(("alternative_fng", fg, 0.30, fg_reason))
        cg, cg_reason = fetch_coingecko_context()
        if cg is not None:
            sources.append(("coingecko", cg, 0.20, cg_reason))
    provider_health, provider_detail = source_health_for(symbol)
    if not sources:
        return {"ok": False, "status": "NO_SOURCE", "provider_health": provider_health, "provider_detail": provider_detail, "score": 0.0, "confidence": 0.0, "source_agreement": 0.0, "reason": "No relevant free sentiment sources produced usable data; keep prior cache or neutral technical-only", "headlines": [], "source_details": []}
    total_w = sum(w for _, _, w, _ in sources)
    score = sum(v * w for _, v, w, _ in sources) / total_w if total_w else 0.0
    signs = [sign(v) for _, v, _, _ in sources if sign(v) != 0]
    if not signs:
        agreement = 50.0
    else:
        dominant = max(signs.count(1), signs.count(-1))
        agreement = 100.0 * dominant / len(signs)
    strength = abs(score)
    confidence = clamp(35.0 + strength * 50.0 + min(len(sources), 3) * 5.0 + (agreement - 50.0) * 0.20, 0.0, 100.0)
    reasons = [r for _, _, _, r in sources]
    return {"ok": True, "status": "OK", "provider_health": provider_health, "provider_detail": provider_detail, "score": clamp(score, -1.0, 1.0), "confidence": round(confidence, 1), "source_agreement": round(agreement, 1), "reason": "; ".join(reasons), "headlines": headlines, "source_details": [{"source": n, "score": round(v, 3), "weight": w, "reason": r} for n, v, w, r in sources]}


def base_payload(symbol: str, status: str, reason: str) -> Dict[str, Any]:
    # V2 rule: a zero-confidence neutral fallback is technical-only, not LKG.
    if status in {"DEGRADED", "NO_SOURCE", "PARTIAL", "WARMING_UP"}:
        status = "TECHNICAL_ONLY" if status != "WARMING_UP" else "WARMING_UP"
    return {"ok": True, "status": status, "bridge_status": "ONLINE", "sentiment_status": status, "provider_health": "UNKNOWN", "ea_action": "IGNORE_SENTIMENT", "usable_sentiment": False, "server": "Adaptive Regime Trader Context Bridge", "version": APP_VERSION, "symbol": symbol.upper(), "timestamp_utc": now_utc_iso(), "freshness_seconds": 0, "cached": True, "direction_bias": "NEUTRAL", "raw_score": 0.0, "confidence": 0.0, "source_agreement": 0.0, "risk_mode": "TECHNICAL_ONLY", "action": "TECHNICAL_ONLY", "score_adjust_buy": 0.0, "score_adjust_sell": 0.0, "veto_buy": False, "veto_sell": False, "shadow_only": False, "reason": reason, "sources": [], "headlines": [], "provider_detail": {}}


def is_usable_payload(payload: Dict[str, Any]) -> bool:
    try:
        if not bool(payload.get("ok", False)):
            return False
        if not bool(payload.get("usable_sentiment", False)):
            return False
        if float(payload.get("confidence", 0.0)) <= 0.0:
            return False
        if not payload.get("sources"):
            return False
        return True
    except Exception:
        return False


def payload_from_combined(symbol: str, combined: Dict[str, Any]) -> Dict[str, Any]:
    score = float(combined.get("score", 0.0))
    confidence = float(combined.get("confidence", 0.0))
    agreement = float(combined.get("source_agreement", 0.0))
    source_details = combined.get("source_details", []) or []
    usable = bool(source_details) and confidence > 0.0
    bias = "NEUTRAL"
    if usable and score >= 0.12:
        bias = "BUY"
    elif usable and score <= -0.12:
        bias = "SELL"
    boost = round(clamp(abs(score) * 6.0, 0.0, 5.0), 2) if usable else 0.0
    penalty = round(clamp(abs(score) * 9.0, 0.0, 10.0), 2) if usable else 0.0
    buy_adj = 0.0
    sell_adj = 0.0
    if bias == "BUY":
        buy_adj = boost
        sell_adj = -penalty
    elif bias == "SELL":
        buy_adj = -penalty
        sell_adj = boost
    strong_conflict = usable and confidence >= 82.0 and agreement >= 70.0 and abs(score) >= 0.55
    provider_health = str(combined.get("provider_health", "UNKNOWN"))
    status = "OK" if usable else "TECHNICAL_ONLY"
    return {"ok": True, "status": status, "bridge_status": "ONLINE", "sentiment_status": status, "provider_health": provider_health, "ea_action": ("USE" if bias != "NEUTRAL" else ("IGNORE" if usable else "IGNORE_SENTIMENT")), "usable_sentiment": usable, "server": "Adaptive Regime Trader Context Bridge", "version": APP_VERSION, "symbol": symbol.upper(), "timestamp_utc": now_utc_iso(), "freshness_seconds": 0, "cached": False, "direction_bias": bias, "raw_score": round(score, 4) if usable else 0.0, "confidence": round(confidence, 1) if usable else 0.0, "source_agreement": round(agreement, 1) if usable else 0.0, "risk_mode": "EXTERNAL_CONTEXT" if usable else "TECHNICAL_ONLY", "action": "TECHNICAL_ONLY" if bias == "NEUTRAL" else (f"CONFIRM_{bias}_REDUCE_{'SELL' if bias == 'BUY' else 'BUY'}"), "score_adjust_buy": buy_adj, "score_adjust_sell": sell_adj, "veto_buy": bool(strong_conflict and bias == "SELL"), "veto_sell": bool(strong_conflict and bias == "BUY"), "shadow_only": False, "reason": combined.get("reason", ""), "sources": source_details, "headlines": combined.get("headlines", []), "provider_detail": combined.get("provider_detail", {})}


def atomic_write_json(path: Path, payload: Dict[str, Any]) -> None:
    try:
        tmp = path.with_suffix(path.suffix + ".tmp")
        tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
        os.replace(tmp, path)
    except Exception as exc:
        log_line(f"cache_save_failed err={type(exc).__name__}")


def save_persistent_cache() -> None:
    # Persist only true usable sentiment, never neutral technical-only fallback.
    with _cache_lock:
        filtered_cache = {k: v for k, v in _payload_cache.items() if is_usable_payload(v)}
        filtered_time = {k: _payload_time.get(k, time.time()) for k in filtered_cache}
        payload = {"version": APP_VERSION, "saved_at": now_utc_iso(), "payload_cache": filtered_cache, "payload_time": filtered_time}
    atomic_write_json(CACHE_FILE, payload)


def load_persistent_cache() -> None:
    if not CACHE_FILE.exists():
        return
    try:
        data = json.loads(CACHE_FILE.read_text(encoding="utf-8"))
        pc = data.get("payload_cache") or {}
        pt = data.get("payload_time") or {}
        now = time.time()
        loaded = 0
        with _cache_lock:
            for sym, payload in pc.items():
                age = now - float(pt.get(sym, 0.0))
                if age <= LKG_SECONDS and is_usable_payload(payload):
                    payload = dict(payload)
                    payload["status"] = "LKG"
                    payload["sentiment_status"] = "LKG"
                    payload["cached"] = True
                    payload["reason"] = "Loaded persistent last-known-good cache; " + str(payload.get("reason", ""))
                    _payload_cache[sym] = payload
                    _payload_time[sym] = float(pt.get(sym, now))
                    loaded += 1
        if loaded:
            log_line(f"persistent_cache_loaded symbols={loaded}")
    except Exception as exc:
        log_line(f"cache_load_failed err={type(exc).__name__}")


def register_symbol(symbol: str) -> None:
    key = symbol.upper()
    with _cache_lock:
        if len(_active_symbols) >= MAX_ACTIVE_SYMBOLS and key not in _active_symbols:
            oldest = sorted(_active_symbols.items(), key=lambda kv: kv[1])[0][0]
            _active_symbols.pop(oldest, None)
        _active_symbols[key] = time.time()


def mark_symbol_weak(symbol: str) -> int:
    key = symbol.upper()
    with _cache_lock:
        st = _symbol_state.setdefault(key, {"weak_count": 0, "status_hold_until": 0.0, "last_status": "WARMING_UP"})
        st["weak_count"] = int(st.get("weak_count", 0)) + 1
        st["status_hold_until"] = time.time() + STATUS_HOLD_SECONDS
        return int(st["weak_count"])


def mark_symbol_ok(symbol: str) -> None:
    key = symbol.upper()
    with _cache_lock:
        st = _symbol_state.setdefault(key, {"weak_count": 0, "status_hold_until": 0.0, "last_status": "OK"})
        st["weak_count"] = 0
        st["last_status"] = "OK"
        st["status_hold_until"] = time.time() + STATUS_HOLD_SECONDS


def refresh_symbol(symbol: str) -> None:
    key = symbol.upper()
    with _cache_lock:
        if _refreshing.get(key, False):
            return
        _refreshing[key] = True
    try:
        combined = combine_sources(key)
        now = time.time()
        if combined.get("ok", False):
            payload = payload_from_combined(key, combined)
            mark_symbol_ok(key)
            with _cache_lock:
                _payload_cache[key] = payload
                _payload_time[key] = now
            save_persistent_cache()
            log_line(f"refresh_ok symbol={key} status=OK provider={payload.get('provider_health')} score={payload.get('raw_score')}")
            return
        weak = mark_symbol_weak(key)
        provider_health = str(combined.get("provider_health", "DEGRADED"))
        with _cache_lock:
            old = dict(_payload_cache.get(key) or {})
            old_time = float(_payload_time.get(key, 0.0))
        age = int(now - old_time) if old_time > 0 else 999999
        if old and is_usable_payload(old) and age <= LKG_SECONDS:
            old["ok"] = True
            old["status"] = "LKG"
            old["sentiment_status"] = "LKG"
            old["bridge_status"] = "ONLINE"
            old["provider_health"] = provider_health
            old["ea_action"] = "USE_REDUCED"
            old["cached"] = True
            old["freshness_seconds"] = age
            old["reason"] = f"Using true last-known-good; provider weak_count={weak}; " + str(combined.get("reason", ""))
            with _cache_lock:
                _payload_cache[key] = old
            save_persistent_cache()
            log_line(f"refresh_lkg symbol={key} weak={weak} age={age} provider={provider_health}")
            return
        neutral = base_payload(key, "TECHNICAL_ONLY", str(combined.get("reason", "No usable source and no valid LKG")))
        neutral["provider_health"] = provider_health
        neutral["ea_action"] = "IGNORE_SENTIMENT"
        neutral["provider_detail"] = combined.get("provider_detail", {})
        with _cache_lock:
            _payload_cache[key] = neutral
            _payload_time[key] = now
        log_line(f"refresh_degraded_neutral symbol={key} weak={weak} provider={provider_health}")
    except Exception as exc:
        log_line(f"refresh_exception symbol={key} err={type(exc).__name__}")
    finally:
        with _cache_lock:
            _refreshing[key] = False


def schedule_refresh_if_needed(symbol: str) -> None:
    key = symbol.upper()
    now = time.time()
    with _cache_lock:
        last = _payload_time.get(key, 0.0)
        refreshing = _refreshing.get(key, False)
    # The endpoint never waits for refresh. It just schedules it.
    if (last <= 0.0 or now - last >= min(provider_cfg(p).get("interval", 300) for p in relevant_providers_for(key))) and not refreshing:
        threading.Thread(target=refresh_symbol, args=(key,), daemon=True).start()


def get_cached_payload(symbol: str) -> Dict[str, Any]:
    key = symbol.upper()
    now = time.time()
    register_symbol(key)
    schedule_refresh_if_needed(key)
    with _cache_lock:
        payload = dict(_payload_cache.get(key) or base_payload(key, "WARMING_UP", "No cached sentiment yet; background refresh scheduled; neutral technical-only fallback"))
        last = _payload_time.get(key, 0.0)
        refreshing = _refreshing.get(key, False)
        st = dict(_symbol_state.get(key, {}))
    age = int(now - last) if last > 0 else 0
    payload["freshness_seconds"] = age
    payload["cached"] = True
    payload["refreshing"] = refreshing
    payload["bridge_status"] = "ONLINE"
    if last <= 0:
        payload["status"] = "WARMING_UP"
        payload["sentiment_status"] = "WARMING_UP"
        payload["provider_health"] = "UNKNOWN"
        payload["ea_action"] = "IGNORE"
        payload["reason"] = "Background refresh scheduled; neutral technical-only fallback"
    elif not is_usable_payload(payload):
        # Neutral fallback is a valid bridge response but not usable external sentiment.
        payload["status"] = "TECHNICAL_ONLY"
        payload["sentiment_status"] = "TECHNICAL_ONLY"
        payload["risk_mode"] = "TECHNICAL_ONLY"
        payload["action"] = "TECHNICAL_ONLY"
        payload["score_adjust_buy"] = 0.0
        payload["score_adjust_sell"] = 0.0
        payload["veto_buy"] = False
        payload["veto_sell"] = False
        payload["shadow_only"] = False
        payload["usable_sentiment"] = False
        payload["ea_action"] = "IGNORE_SENTIMENT"
    elif age > LKG_SECONDS:
        payload["status"] = "STALE"
        payload["sentiment_status"] = "STALE"
        payload["usable_sentiment"] = False
        payload["risk_mode"] = "TECHNICAL_ONLY"
        payload["action"] = "TECHNICAL_ONLY"
        payload["score_adjust_buy"] = 0.0
        payload["score_adjust_sell"] = 0.0
        payload["veto_buy"] = False
        payload["veto_sell"] = False
        payload["shadow_only"] = False
        payload["ea_action"] = "IGNORE"
        payload["reason"] = "Cache beyond LKG window; technical-only fallback. " + str(payload.get("reason", ""))
    elif age > STALE_SECONDS:
        payload["status"] = "LKG"
        payload["sentiment_status"] = "LKG"
        payload["ea_action"] = "USE_REDUCED"
        payload["reason"] = "Cache stale but within true LKG window; reduced influence. " + str(payload.get("reason", ""))
    return payload


def background_worker() -> None:
    time.sleep(WARMUP_MIN_SECONDS)
    while not _stop_event.is_set():
        try:
            now = time.time()
            with _cache_lock:
                symbols = [s for s, seen in _active_symbols.items() if now - seen < SYMBOL_IDLE_SECONDS]
            for sym in symbols:
                with _cache_lock:
                    last = _payload_time.get(sym, 0.0)
                    refreshing = _refreshing.get(sym, False)
                min_interval = min(provider_cfg(p).get("interval", 300) for p in relevant_providers_for(sym))
                if (last <= 0 or now - last >= min_interval) and not refreshing:
                    refresh_symbol(sym)
                if _stop_event.wait(0.2):
                    break
        except Exception as exc:
            log_line(f"worker_exception err={type(exc).__name__}")
        _stop_event.wait(2.0)


def ensure_worker() -> None:
    global _worker_started
    if not _worker_started:
        _worker_started = True
        for sym in PRELOAD_SYMBOLS:
            register_symbol(sym)
        threading.Thread(target=background_worker, daemon=True).start()


@app.on_event("startup")
def startup_event() -> None:
    load_persistent_cache()
    ensure_worker()


@app.get("/")
def root() -> Dict[str, Any]:
    return {"ok": True, "service": "Adaptive Regime Trader Context Bridge", "version": APP_VERSION, "endpoint": "/sentiment?symbol=EURUSD", "mode": "cache-first-background-refresh-circuit-breaker", "note": "Runs separately from the existing quant server. Default port 8010 avoids 127.0.0.1:8000 conflicts."}


@app.get("/health")
def health() -> Dict[str, Any]:
    with _cache_lock:
        active = dict(_active_symbols)
        cache_keys = list(_payload_cache.keys())
        health_copy = dict(_provider_health)
        state_copy = dict(_provider_state)
        symbol_copy = dict(_symbol_state)
    return {"ok": True, "bridge_status": "ONLINE", "service": "Adaptive Regime Trader Context Bridge", "version": APP_VERSION, "time_utc": now_utc_iso(), "mode": "V2.1 cache-first technical-only-safe sentiment with GDELT HTTP fallback", "active_symbols": list(active.keys()), "cache_symbols": cache_keys, "stale_seconds": STALE_SECONDS, "lkg_seconds": LKG_SECONDS, "status_hold_seconds": STATUS_HOLD_SECONDS, "degraded_strikes": DEGRADED_STRIKES, "provider_config": PROVIDER_CONFIG, "provider_health": health_copy, "provider_state": state_copy, "symbol_state": symbol_copy, "cache_file": str(CACHE_FILE)}


@app.get("/debug/gdelt")
def debug_gdelt(symbol: str = Query("XAUUSD", min_length=3)) -> Dict[str, Any]:
    return debug_gdelt_fetch(symbol.upper())


@app.get("/sources")
def sources() -> Dict[str, Any]:
    return {"ok": True, "mode": "background cached; /sentiment returns instantly", "free_sources": ["GDELT DOC 2.0 news/headline context", "Alternative.me Fear & Greed Index for BTC/crypto", "CoinGecko public simple price for BTC/ETH 24h context"], "stability_features": ["per-provider circuit breakers", "persistent last-known-good cache", "status hysteresis", "atomic cache replacement", "per-symbol cache", "source-specific intervals/timeouts"]}


@app.get("/cache")
def cache() -> Dict[str, Any]:
    now = time.time()
    with _cache_lock:
        items = {k: {"age_seconds": int(now - _payload_time.get(k, now)), "status": v.get("status"), "sentiment_status": v.get("sentiment_status"), "provider_health": v.get("provider_health"), "ea_action": v.get("ea_action"), "bias": v.get("direction_bias"), "confidence": v.get("confidence"), "refreshing": _refreshing.get(k, False), "reason": v.get("reason", "")[:180]} for k, v in _payload_cache.items()}
    return {"ok": True, "items": items}


@app.get("/sentiment")
def sentiment(symbol: str = Query("EURUSD", min_length=3), tf: Optional[str] = None, account_id: Optional[str] = None, magic: Optional[str] = None) -> JSONResponse:
    ensure_worker()
    payload = get_cached_payload(symbol)
    payload["tf"] = tf
    payload["account_id"] = account_id
    payload["magic"] = magic
    return JSONResponse(payload)


@app.get("/refresh")
def manual_refresh_get(symbol: str = Query("EURUSD", min_length=3), force: bool = False) -> Dict[str, Any]:
    key = symbol.upper()
    register_symbol(key)
    if force:
        # Clear circuit breaker only for providers relevant to this symbol.
        with _cache_lock:
            for provider in relevant_providers_for(key):
                st = _provider_state.setdefault(provider, {"failures": 0, "circuit_until": 0.0, "last_attempt": 0.0, "last_ok": 0.0})
                st["failures"] = 0
                st["circuit_until"] = 0.0
                _provider_health[provider] = {"ok": False, "reason": "force_refresh_reset", "elapsed_ms": 0.0, "time_utc": now_utc_iso(), "failures": 0, "circuit_open": False, "circuit_retry_seconds": 0, "skipped": False}
    threading.Thread(target=refresh_symbol, args=(key,), daemon=True).start()
    return {"ok": True, "symbol": key, "force": bool(force), "message": "refresh scheduled"}


@app.post("/refresh")
def manual_refresh_post(symbol: str = Query("EURUSD", min_length=3), force: bool = False) -> Dict[str, Any]:
    return manual_refresh_get(symbol, force)


def is_port_free(host: str, port: int) -> bool:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.settimeout(0.25)
        return sock.connect_ex((host, port)) != 0


def choose_port(host: str, preferred: int) -> int:
    for port in range(preferred, preferred + 11):
        if is_port_free(host, port):
            return port
    raise RuntimeError(f"No free port found from {preferred} to {preferred + 10}")


if __name__ == "__main__":
    import uvicorn
    host = os.getenv("AQC_SENTIMENT_HOST", DEFAULT_HOST)
    preferred_port = int(os.getenv("AQC_SENTIMENT_PORT", str(DEFAULT_PORT)))
    port = choose_port(host, preferred_port)
    if port != preferred_port:
        print(f"[AQC Sentiment Bridge] Preferred port {preferred_port} is busy; using {port} instead.")
        print(f"[AQC Sentiment Bridge] Update EA InpSentimentBridgeUrl to http://{host}:{port}/sentiment")
    else:
        print(f"[AQC Sentiment Bridge] Running on http://{host}:{port} (no conflict with quant server on :8000)")
    print("[AQC Sentiment Bridge] V2.1: cache-first + true-LKG + GDELT HTTP fallback + detailed provider diagnostics")
    uvicorn.run(app, host=host, port=port, log_level="info")
