File size: 3,371 Bytes
d8e86bd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import json
import hashlib
import sqlite3
from github import Github
from time import time as t
import os
app = FastAPI()
tokens = {"divydon", "yatteshdon", "subhdon"}
def sha256(string: str) -> str:
return hashlib.sha256(string.encode()).hexdigest()
@app.get("/")
async def get():
return HTMLResponse("Hello, world!")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
authuri = await websocket.receive_text()
if authuri not in tokens:
await websocket.close()
return
fileName = f"{sha256(authuri)}.db"
conn = sqlite3.connect(fileName)
cursor = conn.cursor()
while True:
jsonScript = await websocket.receive_text()
jsonScript = json.loads(jsonScript)
jsonOutput = []
clss = ["conn", "cursor"]
notallowed = ["conn.close", "cursor.close"]
for i in jsonScript:
functionName = i["funcname"]
if functionName in notallowed:
continue
if len([c for c in clss if functionName.startswith(c)]) == 0:
continue
args = i.get("args", [])
kwargs = i.get("kwargs", {})
try:
out = eval(functionName + "(*args, **kwargs)")
except Exception as e:
out = str(e)
i["eval"] = f"{out}"
i.pop("args", None)
i.pop("kwargs", None)
jsonOutput.append(i)
await websocket.send_text(json.dumps(jsonOutput))
except WebSocketDisconnect:
await websocket.close()
except Exception as e:
...
finally:
pushToDb(fileName)
conn.close()
def pushToDb(token: str):
token = token.removesuffix(".db")
g = Github(os.getenv("GITHUB_TOKEN"))
user = g.get_user()
try:
repo = user.get_repo(token)
except:
repo = user.create_repo(token)
try:
with open(f"{token}.db", "rb") as f:
contents = f.read()
try:
repo_file = repo.get_contents(f"{token}.db")
repo.update_file(repo_file.path, f"Updated at {t()}", contents, repo_file.sha)
except:
repo.create_file(f"{token}.db", f"Created at {t()}", contents)
except Exception as e:
print(f"Error pushing to GitHub: {e}")
def downloadDbIfExist(token: str):
token = token.removesuffix(".db")
g = Github(os.getenv("GITHUB_TOKEN"))
user = g.get_user()
try:
repo = user.get_repo(token)
except:
repo = user.create_repo(token)
try:
repo_file = repo.get_contents(f"{token}.db")
with open(f"{token}.db", "wb") as f:
f.write(repo_file.decoded_content)
except Exception as e:
print(f"Error downloading from GitHub: {e}")
@app.on_event("startup")
def on_startup():
for token in tokens:
downloadDbIfExist(sha256(token))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app=app, host="127.0.0.1")
|