pplx2api / app.py
smgc's picture
Create app.py
e3596d7 verified
raw
history blame
No virus
6.45 kB
import os
import json
import uuid
from flask import Flask, request, Response, jsonify, stream_with_context
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import socketio
import requests
from requests.exceptions import RequestException
app = Flask(__name__)
# 从环境变量中获取配置
API_KEY = os.environ.get('PPLX_KEY')
PPLX_COOKIE = os.environ.get('PPLX_COOKIE')
USER_AGENT = os.environ.get('USER_AGENT')
PROXY = os.environ.get('PROXY')
# 设置限流
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["100 per 5 minutes"],
storage_uri="memory://"
)
# Socket.IO 客户端设置
sio = socketio.Client()
# 代理设置
proxies = {'https': PROXY} if PROXY else None
@app.route('/')
def home():
return jsonify({
"message": "Welcome to the Perplexity AI Proxy API",
"endpoints": {
"/ai/v1/messages": {
"method": "POST",
"description": "Send a message to the AI",
"headers": {
"x-api-key": "Your API key (required)",
"Content-Type": "application/json"
},
"body": {
"messages": "Array of message objects",
"stream": "Boolean (true for streaming response)",
}
}
}
})
@app.route('/health')
def health_check():
return jsonify({"status": "OK"}), 200
def validate_api_key():
api_key = request.headers.get('x-api-key')
if api_key != API_KEY:
return jsonify({"error": "Invalid API key"}), 401
@app.route('/ai/v1/messages', methods=['POST'])
@limiter.limit("100 per 15 minutes")
def ai_messages():
auth_result = validate_api_key()
if auth_result:
return auth_result
try:
data = request.json
if not data.get('stream', False):
return jsonify({
"id": str(uuid.uuid4()),
"content": [
{"text": "Please turn on streaming."},
{"id": "string", "name": "string", "input": {}}
],
"model": "string",
"stop_reason": "end_turn",
"stop_sequence": "string",
"usage": {"input_tokens": 0, "output_tokens": 0}
})
def generate():
previous_messages = "\n\n".join([msg['content'] for msg in data['messages']])
msg_id = str(uuid.uuid4())
yield create_event("message_start", {
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"content": [],
"model": "claude-3-opus-20240229",
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 8, "output_tokens": 1},
},
})
yield create_event("content_block_start", {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}})
yield create_event("ping", {"type": "ping"})
try:
sio.connect('wss://www.perplexity.ai', transports=['websocket'],
headers={
'Cookie': PPLX_COOKIE,
'User-Agent': USER_AGENT
},
http_session=requests.Session() if PROXY else None,
proxies=proxies)
sio.emit('perplexity_ask', previous_messages, {
"version": "2.9",
"source": "default",
"attachments": [],
"language": "en-GB",
"timezone": "Europe/London",
"search_focus": "writing",
"frontend_uuid": str(uuid.uuid4()),
"mode": "concise",
"is_related_query": False,
"is_default_related_query": False,
"visitor_id": str(uuid.uuid4()),
"frontend_context_uuid": str(uuid.uuid4()),
"prompt_source": "user",
"query_source": "home"
})
@sio.on('query_progress')
def on_query_progress(data):
if data.get('text'):
text = json.loads(data['text'])
chunk = text['chunks'][-1] if text['chunks'] else None
if chunk:
yield create_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": chunk},
})
sio.wait()
except Exception as e:
print(f"Socket error: {e}")
yield create_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": "An error occurred while processing your request"},
})
finally:
sio.disconnect()
yield create_event("content_block_stop", {"type": "content_block_stop", "index": 0})
yield create_event("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {"output_tokens": 12},
})
yield create_event("message_stop", {"type": "message_stop"})
return Response(stream_with_context(generate()), content_type='text/event-stream')
except Exception as e:
print(f"Request error: {e}")
return jsonify({"error": str(e)}), 400
def create_event(event, data):
if isinstance(data, dict):
data = json.dumps(data)
return f"event: {event}\ndata: {data}\n\n"
@app.errorhandler(404)
def not_found(error):
return jsonify({"error": "Not Found"}), 404
@app.errorhandler(500)
def server_error(error):
return jsonify({"error": "Internal Server Error"}), 500
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8081))
app.run(host='0.0.0.0', port=port)