sub / app.py
sfun's picture
Update app.py
78baf01 verified
raw
history blame
No virus
4.1 kB
import re
import yaml
import requests
import datetime
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
def extract_and_transform_proxies(input_text):
proxies_match = re.search(r'proxies:\s*\n((?:[-\s]*{.*\n?)*)', input_text, re.MULTILINE)
if not proxies_match:
return "未找到proxies部分"
proxies_text = proxies_match.group(1)
try:
proxies_list = yaml.safe_load(proxies_text)
except yaml.YAMLError:
proxies_list = []
for proxy_str in re.findall(r'{(.*?)}', proxies_text, re.DOTALL):
proxy_dict = {}
for item in proxy_str.split(','):
key, value = item.split(':', 1)
proxy_dict[key.strip()] = value.strip()
proxies_list.append(proxy_dict)
if not proxies_list:
return "未找到有效的代理配置"
transformed_proxies = []
for proxy in proxies_list:
if proxy.get('type') == 'ss':
name = proxy.get('name', '').strip()
server = proxy.get('server', '').strip()
port = str(proxy.get('port', '')).strip()
cipher = proxy.get('cipher', '').strip()
password = proxy.get('password', '').strip()
udp = 'true' if proxy.get('udp') in [True, 'true', 'True'] else 'false'
transformed = f"{name} = ss, {server}, {port}, encrypt-method={cipher}, password={password}, udp-relay={udp}"
transformed_proxies.append(transformed)
return "\n".join(transformed_proxies) if transformed_proxies else "未找到有效的SS代理配置"
class RequestHandler(BaseHTTPRequestHandler):
def log_request(self, code='-', size='-'):
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sys.stdout.write(f"{timestamp} - {self.client_address[0]} - \"{self.requestline}\" {code} {size}\n")
sys.stdout.flush()
def do_GET(self):
parsed_path = urlparse(self.path)
query_params = parse_qs(parsed_path.query)
if parsed_path.path == '/':
if 'url' in query_params:
url = query_params['url'][0]
sys.stdout.write(f"处理URL: {url}\n")
sys.stdout.flush()
try:
response = requests.get(url)
response.raise_for_status()
input_text = response.text
result = extract_and_transform_proxies(input_text)
self.send_response(200)
self.send_header('Content-type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(result.encode('utf-8'))
except requests.RequestException as e:
self.send_error(500, f"Error fetching data: {str(e)}")
else:
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
usage_guide = """
<html>
<body>
<h1>代理配置转换工具</h1>
<p>使用方法:在URL参数中提供包含代理配置的网址。</p>
<p>示例:<code>http://localhost:8080/?url=https://example.com/path-to-proxy-config</code></p>
</body>
</html>
"""
self.wfile.write(usage_guide.encode('utf-8'))
else:
self.send_error(404, "Not Found")
def log_message(self, format, *args):
# 重写此方法以禁用默认的服务器日志
pass
def run_server(port=8080):
server_address = ('0.0.0.0', port) # 使用0.0.0.0而不是空字符串
httpd = HTTPServer(server_address, RequestHandler)
sys.stdout.write(f"===== Application Startup at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====\n")
sys.stdout.write(f"Server running on port {port}\n")
sys.stdout.flush()
httpd.serve_forever()
if __name__ == "__main__":
run_server()