import re import yaml import aiohttp import asyncio import datetime import sys from aiohttp import web from urllib.parse import parse_qs async def fetch_url(url, session): async with session.get(url) as response: return await response.text() async def extract_and_transform_proxies(input_text): try: data = yaml.safe_load(input_text) if isinstance(data, dict) and 'proxies' in data: proxies_list = data['proxies'] elif isinstance(data, list): proxies_list = data else: proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE) if proxies_match: proxies_text = proxies_match.group(1) proxies_list = yaml.safe_load(proxies_text) else: return "未找到有效的代理配置" except yaml.YAMLError: return "YAML解析错误" if not proxies_list: return "未找到有效的代理配置" transformed_proxies = [] for proxy in proxies_list: if proxy.get('type') == 'ss': name = proxy.get('name', '').strip() server = proxy.get('server', '').strip() port = str(proxy.get('port', '')).strip() ss_parts = [f"{name} = ss, {server}, {port}"] if 'cipher' in proxy: ss_parts.append(f"encrypt-method={proxy['cipher'].strip()}") if 'password' in proxy: ss_parts.append(f"password={proxy['password'].strip()}") if 'udp' in proxy: ss_parts.append(f"udp-relay={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") transformed = ", ".join(ss_parts) transformed_proxies.append(transformed) elif proxy.get('type') == 'trojan': name = proxy.get('name', '').strip() server = proxy.get('server', '').strip() port = str(proxy.get('port', '')).strip() trojan_parts = [f"{name} = trojan, {server}, {port}"] if 'password' in proxy: trojan_parts.append(f"password={proxy['password'].strip()}") if 'sni' in proxy: trojan_parts.append(f"sni={proxy['sni'].strip()}") if 'skip-cert-verify' in proxy: trojan_parts.append(f"skip-cert-verify={str(proxy['skip-cert-verify']).lower()}") if 'udp' in proxy: trojan_parts.append(f"udp={'true' if proxy['udp'] in [True, 'true', 'True'] else 'false'}") transformed = ", ".join(trojan_parts) transformed_proxies.append(transformed) return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS或Trojan代理配置" async def log_request(request, response): timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') client_ip = request.remote request_line = f"{request.method} {request.path}" if request.query_string: request_line += f"?{request.query_string}" status_code = response.status content_length = response.content_length log_message = f"{timestamp} - {client_ip} - \"{request_line}\" {status_code} {content_length}" print(log_message, flush=True) @web.middleware async def logging_middleware(request, handler): response = await handler(request) await log_request(request, response) return response async def handle_request(request): if request.path == '/': query_params = parse_qs(request.query_string) if 'url' in query_params: url = query_params['url'][0] try: async with aiohttp.ClientSession() as session: input_text = await fetch_url(url, session) result = await extract_and_transform_proxies(input_text) return web.Response(text=result, content_type='text/plain') except Exception as e: return web.Response(text=f"Error: {str(e)}", status=500) else: usage_guide = """

代理配置转换工具

使用方法:在URL参数中提供包含代理配置的网址。

示例:http://localhost:8080/?url=https://example.com/path-to-proxy-config

""" return web.Response(text=usage_guide, content_type='text/html') else: return web.Response(text="Not Found", status=404) async def init_app(): app = web.Application(middlewares=[logging_middleware]) app.router.add_get('/', handle_request) return app if __name__ == "__main__": print(f"===== Application Startup at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") print("Server running on port 8080") web.run_app(init_app(), port=8080, print=lambda _: None) # Disable default startup message