import os
import sys
import time
import uuid
import json
import base64
import random
import logging
import asyncio
import argparse
from quart import Quart, request, jsonify, make_response
from camoufox.async_api import AsyncCamoufox

COLORS = {
    'MAGENTA': '\033[35m',
    'BLUE': '\033[34m',
    'GREEN': '\033[32m',
    'YELLOW': '\033[33m',
    'RED': '\033[31m',
    'RESET': '\033[0m',
}


class CustomLogger(logging.Logger):
    @staticmethod
    def format_message(level, color, message):
        timestamp = time.strftime('%H:%M:%S')
        return f"[{timestamp}] [{COLORS.get(color)}{level}{COLORS.get('RESET')}] -> {message}"

    def debug(self, message, *args, **kwargs):
        super().debug(self.format_message('DEBUG', 'MAGENTA', message), *args, **kwargs)

    def info(self, message, *args, **kwargs):
        super().info(self.format_message('INFO', 'BLUE', message), *args, **kwargs)

    def success(self, message, *args, **kwargs):
        super().info(self.format_message('SUCCESS', 'GREEN', message), *args, **kwargs)

    def warning(self, message, *args, **kwargs):
        super().warning(self.format_message('WARNING', 'YELLOW', message), *args, **kwargs)

    def error(self, message, *args, **kwargs):
        super().error(self.format_message('ERROR', 'RED', message), *args, **kwargs)


logging.setLoggerClass(CustomLogger)
# noinspection PyTypeChecker
logger: CustomLogger = logging.getLogger("TurnstileAPIServer")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)

# NOTE: 缓存清理默认间隔（秒），每隔此时间执行一次过期任务扫描
CLEANUP_INTERVAL_SECONDS = 60


class TurnstileAPIServer:

    def __init__(self, thread: int, proxy_support: bool, max_cache_age: int, debug: bool,
                 cors_mode: str = 'none', cors_origins: list = None,
                 auth_token: str = None, basic_auth: str = None):
        self.app = Quart(__name__)
        self.debug = debug
        self.results = self._load_results()
        self.thread_count = thread
        self.proxy_support = proxy_support
        self.max_cache_age = max_cache_age
        self.cors_mode = cors_mode
        self.cors_origins = cors_origins or []
        self.auth_token = auth_token
        self.basic_auth = basic_auth
        self.browser_pool = asyncio.Queue()
        self.camoufox = None
        self.browser_type = 'camoufox'  # will be set during init
        self._playwright = None
        self._chromium_args = []

        self._setup_routes()

    @staticmethod
    def _load_results():
        """从 results.json 加载历史结果。"""
        try:
            if os.path.exists("results.json"):
                with open("results.json", "r") as f:
                    return json.load(f)
        except (json.JSONDecodeError, IOError) as e:
            logger.warning(f"Error loading results: {str(e)}. Starting with an empty results dictionary.")
        return {}

    def _save_results(self):
        """将结果持久化到 results.json。"""
        try:
            with open("results.json", "w") as result_file:
                json.dump(self.results, result_file, indent=4)
        except IOError as e:
            logger.error(f"Error saving results to file: {str(e)}")

    def _cleanup_expired_tasks(self):
        """删除超过 max_cache_age 的过期任务，返回被清理的数量。"""
        now = time.time()
        expired_keys = [
            task_id for task_id, data in self.results.items()
            if isinstance(data, dict) and now - data.get("created_at", now) > self.max_cache_age
        ]
        for key in expired_keys:
            del self.results[key]
        if expired_keys:
            self._save_results()
            logger.info(f"Cleaned up {len(expired_keys)} expired task(s)")
        return len(expired_keys)

    async def _periodic_cleanup(self):
        """后台协程：定期执行过期任务清理。"""
        while True:
            await asyncio.sleep(CLEANUP_INTERVAL_SECONDS)
            try:
                self._cleanup_expired_tasks()
            except Exception as e:
                logger.error(f"Error during periodic cleanup: {str(e)}")

    def _setup_routes(self) -> None:
        """注册路由、CORS 和认证中间件、启动钩子。"""
        self.app.before_serving(self._startup)
        self.app.route('/turnstile', methods=['POST', 'OPTIONS'])(self.process_turnstile)
        self.app.route('/result', methods=['GET', 'OPTIONS'])(self.get_result)
        self.app.route('/')(self.index)

        # ── CORS 中间件 ──────────────────────────────────────────────────────────
        if self.cors_mode != 'none':
            @self.app.after_request
            async def add_cors_headers(response):
                origin = request.headers.get('Origin', '')
                if self.cors_mode == 'all':
                    response.headers['Access-Control-Allow-Origin'] = '*'
                elif self.cors_mode == 'whitelist' and origin in self.cors_origins:
                    response.headers['Access-Control-Allow-Origin'] = origin
                    response.headers['Vary'] = 'Origin'

                if response.headers.get('Access-Control-Allow-Origin'):
                    response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
                    response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'

                return response

            @self.app.before_request
            async def handle_preflight():
                if request.method == 'OPTIONS':
                    response = await make_response('', 204)
                    return response
                return None

        # ── 认证中间件 ────────────────────────────────────────────────────────────
        if self.auth_token or self.basic_auth:
            @self.app.before_request
            async def check_auth():
                # 跳过首页和 OPTIONS 预检
                if request.path == '/' and request.method == 'GET':
                    return None
                if request.method == 'OPTIONS':
                    return None

                auth_header = request.headers.get('Authorization', '')
                # Bearer token 认证
                if self.auth_token and auth_header.startswith('Bearer '):
                    token = auth_header[7:]
                    if token == self.auth_token:
                        return None
                # Basic Auth 认证
                if self.basic_auth and auth_header.startswith('Basic '):
                    try:
                        decoded = base64.b64decode(auth_header[6:]).decode('utf-8')
                        if decoded == self.basic_auth:
                            return None
                    except Exception:
                        pass

                # 认证失败
                www_auth_parts = []
                if self.basic_auth:
                    www_auth_parts.append('Basic realm="Turnstile API"')
                if self.auth_token:
                    www_auth_parts.append('Bearer')
                return jsonify({
                    "status": "error",
                    "error": "Unauthorized"
                }), 401, {'WWW-Authenticate': ', '.join(www_auth_parts)}

    async def _startup(self) -> None:
        """服务启动时初始化浏览器池并注册后台清理任务。"""
        logger.info("Starting browser initialization")
        try:
            await self._initialize_browser()
        except Exception as e:
            logger.error(f"Failed to initialize browser: {str(e)}")
            raise

        # NOTE: 启动时先清理一次历史过期数据，再启动定期清理协程
        self._cleanup_expired_tasks()
        asyncio.create_task(self._periodic_cleanup())
        logger.info(f"Periodic cache cleanup enabled (max_cache_age={self.max_cache_age}s)")

        # 日志输出 CORS 和认证配置
        if self.cors_mode == 'none':
            logger.info("CORS: disabled (no cross-origin requests allowed)")
        elif self.cors_mode == 'all':
            logger.info("CORS: allow all origins (*)")
        elif self.cors_mode == 'whitelist':
            logger.info(f"CORS: whitelist mode, allowed origins: {self.cors_origins}")

        if self.auth_token and self.basic_auth:
            logger.info("Auth: Bearer token + Basic Auth enabled (either accepted)")
        elif self.auth_token:
            logger.info("Auth: Bearer token enabled")
        elif self.basic_auth:
            logger.info("Auth: Basic Auth enabled")
        else:
            logger.warning("Auth: disabled (API is open, no authentication required)")

    async def _initialize_browser(self) -> None:
        """初始化浏览器池。优先使用 CloakBrowser (headed + humanize)，不可用时 fallback 到 Camoufox/Chromium。"""
        # 优先: CloakBrowser (反指纹 Chromium，headed 模式过 Turnstile)
        try:
            from cloakbrowser import launch as cloak_launch_sync
            from cloakbrowser import launch_async as cloak_launch_async

            # CloakBrowser 需要 headed 模式 (Xvfb 环境)
            test_browser = await cloak_launch_async(headless=False, humanize=True)
            test_ctx = await test_browser.new_context()
            _ = await test_ctx.new_page()
            await test_ctx.close()
            # 验证通过
            await self.browser_pool.put((1, test_browser))
            for i in range(1, self.thread_count):
                browser = await cloak_launch_async(headless=False, humanize=True)
                await self.browser_pool.put((i + 1, browser))
            self.browser_type = 'cloakbrowser'
            logger.success(f"Browser pool initialized with {self.thread_count} CloakBrowser (headed+humanize) browsers")
            return
        except Exception as e:
            logger.warning(f"CloakBrowser init failed: {e}. Falling back to Camoufox.")

        # 次选: Camoufox
        try:
            self.camoufox = AsyncCamoufox(headless=True)
            test_browser = await self.camoufox.start()
            test_ctx = await test_browser.new_context()
            _ = await test_ctx.new_page()  # 验证是否能创建页面
            await test_ctx.close()
            await self.browser_pool.put((1, test_browser))
            for i in range(1, self.thread_count):
                browser = await self.camoufox.start()
                await self.browser_pool.put((i + 1, browser))
            self.browser_type = 'camoufox'
            logger.success(f"Browser pool initialized with {self.thread_count} Camoufox browsers")
            return
        except Exception as e:
            logger.warning(f"Camoufox init failed: {e}. Falling back to Chromium.")

        # Fallback: Chromium
        from playwright.async_api import async_playwright

        self._playwright = await async_playwright().start()
        self._chromium_args = ['--no-sandbox', '--disable-gpu', '--disable-dev-shm-usage']
        self.browser_type = 'chromium'

        for i in range(self.thread_count):
            browser = await self._playwright.chromium.launch(
                executable_path='/usr/bin/chromium',
                headless=True,
                args=self._chromium_args,
            )
            await self.browser_pool.put((i + 1, browser))
            if self.debug:
                logger.success(f"Chromium browser {i + 1} initialized successfully")

        logger.success(f"Browser pool initialized with {self.thread_count} Chromium browsers")

    async def _solve_turnstile(self, cf_selector: str, task_id: str, url: str, sitekey: str, action: str = None,
                               cdata: str = None):
        """执行 Turnstile 验证码求解。"""
        index, browser = await self.browser_pool.get()

        # NOTE: 根据 proxy_support 配置决定是否使用代理
        if self.proxy_support:
            proxy_file_path = os.path.join(os.getcwd(), "proxies.txt")

            with open(proxy_file_path) as proxy_file:
                proxies = [line.strip() for line in proxy_file if line.strip()]

            proxy = random.choice(proxies) if proxies else None

            if proxy:
                parts = proxy.split(':')
                if len(parts) == 3:
                    context = await browser.new_context(proxy={"server": f"{proxy}"})
                elif len(parts) == 5:
                    proxy_scheme, proxy_ip, proxy_port, proxy_user, proxy_pass = parts
                    context = await browser.new_context(
                        proxy={"server": f"{proxy_scheme}://{proxy_ip}:{proxy_port}", "username": proxy_user,
                               "password": proxy_pass})
                else:
                    raise ValueError("Invalid proxy format")
            else:
                context = await browser.new_context()
        else:
            context = await browser.new_context()

        page = await context.new_page()

        start_time = time.time()

        # ── 构造注入页面 ──────────────────────────────────────────────────────────
        # data-* 属性按需拼接，避免空值污染
        extra_attrs = ""
        if action:
            extra_attrs += f' data-action="{action}"'
        if cdata:
            extra_attrs += f' data-cdata="{cdata}"'

        injected_html = f"""<!DOCTYPE html>
        <html>
        <head>
          <meta charset="utf-8">
          <!--
            仅保留 Turnstile api.js。
            浏览器仍处于目标 URL 的 Origin，满足 Cloudflare 域名校验。
          -->
          <script src="https://challenges.cloudflare.com/turnstile/v0/api.js"
                  async defer></script>
        </head>
        <body>
          <div class="cf-turnstile"
               data-sitekey="{sitekey}"{extra_attrs}
               style="width:70px">
          </div>
        </body>
        </html>"""

        # ── 路由拦截：只替换第一个主文档，其余请求（api.js 等）正常放行 ──────────
        intercepted = {"done": False}

        async def _intercept(route, req):
            # NOTE: 仅拦截一次主文档导航，其余资源（Cloudflare js、字体等）全部放行
            if not intercepted["done"] and req.resource_type == "document":
                intercepted["done"] = True
                if self.debug:
                    logger.debug(
                        f"Browser {index}: Intercepted document request → "
                        f"injecting Turnstile-only page | url={req.url}"
                    )
                await route.fulfill(
                    status=200,
                    content_type="text/html; charset=utf-8",
                    body=injected_html,
                )
            else:
                await route.continue_()

        await page.route("**/*", _intercept)

        try:

            if self.debug:
                logger.debug(
                    f"Browser {index}: Navigating to {url} "
                    f"(response will be replaced by injected HTML)"
                )

            # wait_until="commit" 表示拿到响应头即可，不等待页面资源全部加载完毕。
            # 配合路由拦截，浏览器几乎立即拿到我们的自定义 HTML。
            # CloakBrowser 使用 domcontentloaded 以确保 Turnstile JS 加载完毕。
            wait_mode = "domcontentloaded" if self.browser_type == 'cloakbrowser' else "commit"
            await page.goto(url, wait_until=wait_mode)

            if self.debug:
                logger.debug(f"Browser {index}: Setting up Turnstile widget dimensions")

            await page.wait_for_selector('[name=cf-turnstile-response]', state="attached", timeout=45000)
            await page.eval_on_selector(f"{cf_selector}", "el => el.style.width = '70px'")

            if self.debug:
                logger.debug(f"Browser {index}: Starting Turnstile response retrieval loop")

            # CloakBrowser 需要更多等待时间（真实浏览器需要更多时间完成挑战）
            max_attempts = 40 if self.browser_type == 'cloakbrowser' else 30

            for attempt in range(max_attempts):
                try:
                    turnstile_check = await page.input_value("[name=cf-turnstile-response]", timeout=3000)
                    if turnstile_check == "":
                        if self.debug:
                            logger.debug(f"Browser {index}: Attempt {attempt} - No Turnstile response yet")
                        # 使用注入页面里固定的 .cf-turnstile，也兼容外部传入的 cf_selector
                        target = cf_selector if cf_selector else ".cf-turnstile"
                        try:
                            await page.locator(target).click(timeout=3000)
                        except Exception:
                            pass
                        await asyncio.sleep(1.5)
                    else:
                        elapsed_time = round(time.time() - start_time, 3)

                        logger.success(
                            f"Browser {index}: Successfully solved captcha - {COLORS.get('MAGENTA')}{turnstile_check[:10]}{COLORS.get('RESET')} in {COLORS.get('GREEN')}{elapsed_time}{COLORS.get('RESET')} Seconds")

                        self.results[task_id] = {
                            "status": "success",
                            "token": turnstile_check,
                            "elapsed_time": elapsed_time,
                            "created_at": self.results[task_id].get("created_at", time.time()),
                        }
                        self._save_results()
                        break
                except Exception as e:
                    logger.info(f"Exception occurred while trying to solve {e}")
                    pass

            # 循环结束仍未写入成功结果 → 标记失败
            result = self.results.get(task_id, {})
            if not isinstance(result, dict) or result.get("status") != "success":
                elapsed_time = round(time.time() - start_time, 3)
                self.results[task_id] = {
                    "status": "failed",
                    "error": "CAPTCHA_FAIL",
                    "elapsed_time": elapsed_time,
                    "created_at": result.get("created_at", time.time()) if isinstance(result, dict) else time.time(),
                }
                self._save_results()
                if self.debug:
                    logger.error(
                        f"Browser {index}: Error solving Turnstile in {COLORS.get('RED')}{elapsed_time}{COLORS.get('RESET')} Seconds")

        except Exception as e:
            elapsed_time = round(time.time() - start_time, 3)
            created_at = time.time()
            existing = self.results.get(task_id)
            if isinstance(existing, dict):
                created_at = existing.get("created_at", created_at)
            self.results[task_id] = {
                "status": "failed",
                "error": "CAPTCHA_FAIL",
                "elapsed_time": elapsed_time,
                "created_at": created_at,
            }
            self._save_results()
            if self.debug:
                logger.error(f"Browser {index}: Error solving Turnstile: {str(e)}")

        # ── 通用清理 ──────────────────────────────────────────────────────────
        if self.debug:
            logger.debug(f"Browser {index}: Clearing page state")

        # 清理路由（仅非 CloakBrowser 模式设置了路由拦截）
        try:
            await page.unroute("**/*", _intercept)
        except Exception:
            pass

        await context.close()
        await self.browser_pool.put((index, browser))

    async def process_turnstile(self):
        """处理 /turnstile 端点请求，创建验证码求解任务。"""

        try:
            data = await request.get_json()
        except Exception:
            return jsonify({
                "status": "error",
                "error": "Invalid JSON body or missing Content-Type: application/json"
            }), 400

        if not data or not isinstance(data, dict):
            return jsonify({
                "status": "error",
                "error": "Request body must be a JSON object"
            }), 400

        url = data.get("url")
        cf_selector = data.get("cf_selector") or ".cf-turnstile"
        sitekey = data.get("sitekey")
        action = data.get("action")
        cdata = data.get("cdata")

        if not url or not sitekey:
            return jsonify({
                "status": "error",
                "error": "Both 'url' and 'sitekey' are required"
            }), 400

        task_id = str(uuid.uuid4())
        # NOTE: 初始化时记录 created_at，供过期清理使用
        self.results[task_id] = {"status": "pending", "created_at": time.time()}

        try:
            asyncio.create_task(
                self._solve_turnstile(cf_selector=cf_selector, task_id=task_id, url=url, sitekey=sitekey, action=action,
                                      cdata=cdata))

            if self.debug:
                logger.debug(f"Request completed with taskid {task_id}.")
            return jsonify({"status": "created", "task_id": task_id}), 202
        except Exception as e:
            logger.error(f"Unexpected error processing request: {str(e)}")
            return jsonify({
                "status": "error",
                "error": str(e)
            }), 500

    async def get_result(self):
        """查询任务结果，所有状态均返回统一 JSON 格式。"""
        task_id = request.args.get('id')

        if not task_id or task_id not in self.results:
            return jsonify({"status": "error", "error": "Invalid task ID"}), 404

        result = self.results[task_id]

        # NOTE: 兼容旧格式数据（非 dict 的历史遗留数据视为 pending）
        if not isinstance(result, dict):
            return jsonify({"status": "pending"}), 202

        status = result.get("status", "pending")

        if status == "pending":
            return jsonify({"status": "pending"}), 202
        elif status == "success":
            return jsonify({
                "status": "success",
                "data": {
                    "token": result.get("token"),
                    "elapsed_time": result.get("elapsed_time"),
                }
            }), 200
        else:
            # status == "failed"
            return jsonify({
                "status": "error",
                "error": result.get("error", "CAPTCHA_FAIL"),
                "elapsed_time": result.get("elapsed_time"),
            }), 500

    @staticmethod
    async def index():
        """提供 API 文档首页（读取 index.html）。"""
        html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "index.html")
        with open(html_path, "r", encoding="utf-8") as f:
            return f.read()


def parse_args():
    """解析命令行参数。"""
    parser = argparse.ArgumentParser(description="Turnstile API Server")

    parser.add_argument('--thread', type=int, default=1,
                        help='浏览器并发线程数（默认: 1）')
    parser.add_argument('--proxy', type=bool, default=False,
                        help='是否启用代理支持，从 proxies.txt 随机选择代理（默认: False）')
    parser.add_argument('--host', type=str, default='0.0.0.0',
                        help='API 监听地址（默认: 0.0.0.0）')
    parser.add_argument('--port', type=str, default='5000',
                        help='API 监听端口（默认: 5000）')
    parser.add_argument('--max_cache_age', type=int, default=3600,
                        help='任务缓存最大存活时长，单位秒（默认: 3600）')
    parser.add_argument('--debug', type=bool, default=False,
                        help='启用调试日志（默认: False）')

    parser.add_argument('--cors', type=str, default='none', choices=['none', 'whitelist', 'all'],
                        help='CORS 模式：none=禁止跨域, whitelist=白名单, all=允许所有（默认: none）')
    parser.add_argument('--cors-origins', type=str, default='',
                        help='CORS 白名单，逗号分隔（仅在 --cors=whitelist 时生效），例如: https://example.com,https://app.test')

    parser.add_argument('--auth-token', type=str, default=None,
                        help='API 安全密钥，客户端需通过 Authorization: Bearer <token> 请求头传入')
    parser.add_argument('--basic-auth', type=str, default=None,
                        help='Basic Auth 凭据，格式为 user:password，客户端需通过 Authorization: Basic <base64> 请求头传入')
    return parser.parse_args()


def create_app(thread: int, proxy_support: bool, max_cache_age: int, debug: bool,
               cors_mode: str = 'none', cors_origins: list = None,
               auth_token: str = None, basic_auth: str = None) -> Quart:
    """创建并返回 Quart 应用实例。"""
    server = TurnstileAPIServer(
        thread=thread, proxy_support=proxy_support, max_cache_age=max_cache_age, debug=debug,
        cors_mode=cors_mode, cors_origins=cors_origins,
        auth_token=auth_token, basic_auth=basic_auth,
    )
    return server.app


if __name__ == '__main__':
    args = parse_args()

    # 参数校验
    if args.cors == 'whitelist' and not args.cors_origins:
        print("Error: --cors-origins is required when --cors=whitelist")
        sys.exit(1)
    if args.basic_auth and ':' not in args.basic_auth:
        print("Error: --basic-auth must be in 'user:password' format")
        sys.exit(1)

    cors_origins_list = [o.strip() for o in args.cors_origins.split(',') if o.strip()] if args.cors_origins else []
    app = create_app(
        thread=args.thread, proxy_support=args.proxy, max_cache_age=args.max_cache_age, debug=args.debug,
        cors_mode=args.cors, cors_origins=cors_origins_list,
        auth_token=args.auth_token, basic_auth=args.basic_auth,
    )
    app.run(host=args.host, port=int(args.port))
