|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
|
from fastapi.responses import HTMLResponse
|
|
import json
|
|
import hashlib
|
|
import sqlite3
|
|
from github import Github
|
|
from time import time as t
|
|
import os
|
|
|
|
app = FastAPI()
|
|
|
|
tokens = {"divydon", "yatteshdon", "subhdon"}
|
|
|
|
def sha256(string: str) -> str:
|
|
return hashlib.sha256(string.encode()).hexdigest()
|
|
|
|
@app.get("/")
|
|
async def get():
|
|
return HTMLResponse("Hello, world!")
|
|
|
|
@app.websocket("/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
await websocket.accept()
|
|
try:
|
|
authuri = await websocket.receive_text()
|
|
if authuri not in tokens:
|
|
await websocket.close()
|
|
return
|
|
|
|
fileName = f"{sha256(authuri)}.db"
|
|
conn = sqlite3.connect(fileName)
|
|
cursor = conn.cursor()
|
|
while True:
|
|
jsonScript = await websocket.receive_text()
|
|
jsonScript = json.loads(jsonScript)
|
|
jsonOutput = []
|
|
clss = ["conn", "cursor"]
|
|
notallowed = ["conn.close", "cursor.close"]
|
|
for i in jsonScript:
|
|
functionName = i["funcname"]
|
|
if functionName in notallowed:
|
|
continue
|
|
if len([c for c in clss if functionName.startswith(c)]) == 0:
|
|
continue
|
|
|
|
args = i.get("args", [])
|
|
kwargs = i.get("kwargs", {})
|
|
try:
|
|
out = eval(functionName + "(*args, **kwargs)")
|
|
except Exception as e:
|
|
out = str(e)
|
|
i["eval"] = f"{out}"
|
|
i.pop("args", None)
|
|
i.pop("kwargs", None)
|
|
jsonOutput.append(i)
|
|
await websocket.send_text(json.dumps(jsonOutput))
|
|
except WebSocketDisconnect:
|
|
await websocket.close()
|
|
except Exception as e:
|
|
...
|
|
finally:
|
|
pushToDb(fileName)
|
|
conn.close()
|
|
|
|
def pushToDb(token: str):
|
|
token = token.removesuffix(".db")
|
|
g = Github(os.getenv("GITHUB_TOKEN"))
|
|
user = g.get_user()
|
|
try:
|
|
repo = user.get_repo(token)
|
|
except:
|
|
repo = user.create_repo(token)
|
|
try:
|
|
with open(f"{token}.db", "rb") as f:
|
|
contents = f.read()
|
|
try:
|
|
repo_file = repo.get_contents(f"{token}.db")
|
|
repo.update_file(repo_file.path, f"Updated at {t()}", contents, repo_file.sha)
|
|
except:
|
|
repo.create_file(f"{token}.db", f"Created at {t()}", contents)
|
|
except Exception as e:
|
|
print(f"Error pushing to GitHub: {e}")
|
|
|
|
|
|
def downloadDbIfExist(token: str):
|
|
token = token.removesuffix(".db")
|
|
g = Github(os.getenv("GITHUB_TOKEN"))
|
|
user = g.get_user()
|
|
try:
|
|
repo = user.get_repo(token)
|
|
except:
|
|
repo = user.create_repo(token)
|
|
try:
|
|
repo_file = repo.get_contents(f"{token}.db")
|
|
with open(f"{token}.db", "wb") as f:
|
|
f.write(repo_file.decoded_content)
|
|
except Exception as e:
|
|
print(f"Error downloading from GitHub: {e}")
|
|
|
|
|
|
@app.on_event("startup")
|
|
def on_startup():
|
|
for token in tokens:
|
|
downloadDbIfExist(sha256(token))
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(app=app, host="127.0.0.1")
|
|
|
|
|