const express = require('express'); const fetch = require('node-fetch'); const zlib = require('zlib'); const { promisify } = require('util'); const stream = require('stream'); const bodyParser = require('body-parser'); const gunzip = promisify(zlib.gunzip); const pipeline = promisify(stream.pipeline); const PROJECT_ID = process.env.PROJECT_ID; const CLIENT_ID = process.env.CLIENT_ID; const CLIENT_SECRET = process.env.CLIENT_SECRET; const REFRESH_TOKEN = process.env.REFRESH_TOKEN; const API_KEY = process.env.API_KEY; const TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'; let tokenCache = { accessToken: '', expiry: 0, refreshPromise: null }; function logRequest(req, status, message) { const timestamp = new Date().toISOString(); const method = req.method; const url = req.originalUrl; const ip = req.ip; console.log(`[${timestamp}] ${method} ${url} - Status: ${status}, IP: ${ip}, Message: ${message}`); } async function getAccessToken() { const now = Date.now() / 1000; if (tokenCache.accessToken && now < tokenCache.expiry - 120) { return tokenCache.accessToken; } if (tokenCache.refreshPromise) { await tokenCache.refreshPromise; return tokenCache.accessToken; } tokenCache.refreshPromise = (async () => { try { const response = await fetch(TOKEN_URL, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ client_id: CLIENT_ID, client_secret: CLIENT_SECRET, refresh_token: REFRESH_TOKEN, grant_type: 'refresh_token' }) }); const data = await response.json(); tokenCache.accessToken = data.access_token; tokenCache.expiry = now + data.expires_in; } finally { tokenCache.refreshPromise = null; } })(); await tokenCache.refreshPromise; return tokenCache.accessToken; } function getLocation() { const currentSeconds = new Date().getSeconds(); return currentSeconds < 30 ? 'europe-west1' : 'us-east5'; } function constructApiUrl(location, model) { return `https://${location}-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/${location}/publishers/anthropic/models/${model}:streamRawPredict`; } function formatModelName(model) { if (model === 'claude-3-5-sonnet-20240620') { return 'claude-3-5-sonnet@20240620'; } return model; } async function handleRequest(req, res) { if (req.method === 'OPTIONS') { handleOptions(res); logRequest(req, 204, 'CORS preflight request'); return; } const apiKey = req.headers['x-api-key']; if (apiKey !== API_KEY) { res.status(403).json({ type: "error", error: { type: "permission_error", message: "Your API key does not have permission to use the specified resource." } }); logRequest(req, 403, 'Invalid API key'); return; } const accessToken = await getAccessToken(); const location = getLocation(); let requestBody = req.body; let model = requestBody.model || 'claude-3-5-sonnet@20240620'; model = formatModelName(model); const apiUrl = constructApiUrl(location, model); if (requestBody.anthropic_version) { delete requestBody.anthropic_version; } if (requestBody.model) { delete requestBody.model; } requestBody.anthropic_version = "vertex-2023-10-16"; try { const response = await fetch(apiUrl, { method: 'POST', headers: { 'Authorization': `Bearer ${accessToken}`, 'Content-Type': 'application/json; charset=utf-8', 'Accept-Encoding': 'gzip, deflate' }, body: JSON.stringify(requestBody), compress: false }); res.status(response.status); for (const [key, value] of response.headers.entries()) { if (key.toLowerCase() !== 'content-encoding' && key.toLowerCase() !== 'content-length') { res.setHeader(key, value); } } res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'POST, GET, OPTIONS'); res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, x-api-key, anthropic-version, model'); const contentType = response.headers.get('content-type'); const contentEncoding = response.headers.get('content-encoding'); if (contentType && contentType.includes('text/event-stream')) { // 处理 SSE res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); const gunzipStream = zlib.createGunzip(); const transformStream = new stream.Transform({ transform(chunk, encoding, callback) { this.push(chunk); callback(); } }); pipeline(response.body, gunzipStream, transformStream) .then(() => { console.log('Stream processing completed'); }) .catch((err) => { console.error('Stream processing error:', err); }); transformStream.pipe(res); } else { // 非流式响应的处理 const buffer = await response.buffer(); let data; if (contentEncoding === 'gzip') { try { data = await gunzip(buffer); } catch (error) { console.error('Gunzip error:', error); throw new Error('Failed to decompress the response'); } } else { data = buffer; } res.send(data); } logRequest(req, response.status, `Request forwarded successfully for model: ${model}`); } catch (error) { console.error('Request error:', error); res.status(500).json({ type: "error", error: { type: "internal_server_error", message: "An unexpected error occurred while processing your request." } }); logRequest(req, 500, `Error: ${error.message}`); } } function handleOptions(res) { res.status(204); res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Methods', 'POST, GET, OPTIONS'); res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, x-api-key, anthropic-version, model'); res.end(); } const app = express(); // 增加 body-parser 的限制 app.use(bodyParser.json({ limit: '50mb' })); app.use(bodyParser.urlencoded({ limit: '50mb', extended: true })); // 根路由处理 app.get('/', (req, res) => { res.status(200).send('GCP VertexAI For Claude Proxy'); }); app.all('/ai/v1/messages', handleRequest); const PORT = 8080; app.listen(PORT, () => { console.log(`Server is running on port ${PORT}`); });