getapi commited on
Commit
4597eac
1 Parent(s): 9688baa

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +156 -0
app.py ADDED
@@ -0,0 +1,156 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from asyncio import create_subprocess_shell, gather, sleep
2
+ from pathlib import Path
3
+ from random import choice
4
+ from subprocess import CalledProcessError, PIPE
5
+ from typing import Any
6
+ from uuid import uuid4
7
+
8
+ from httpx import AsyncClient, HTTPStatusError, RequestError
9
+
10
+ oxipng_bin = Path(__file__).parent / 'oxipng'
11
+ oxipng_bin.chmod(0o755)
12
+
13
+ tokens = [
14
+ '7e0ea3da6a73d77003c1abba7f0ea13c',
15
+ 'bc2e68b5918e5bb59ebca6c05d73daf9',
16
+ 'fecbfbe0938bcd1df27b7a9be1702cc9',
17
+ '04e9981d4d0981964cb4c9753173244d',
18
+ 'dee75b07981c7aa211628ea7c7cbc03d',
19
+ ]
20
+
21
+
22
+ async def download_png(url: str, client: AsyncClient, retries: int = 5) -> Path:
23
+ for attempt in range(retries):
24
+ try:
25
+ response = await client.get(url, timeout=30.0)
26
+ response.raise_for_status()
27
+ file_path = Path(f'{uuid4()}.png')
28
+ file_path.write_bytes(response.content)
29
+ return file_path
30
+ except (HTTPStatusError, RequestError) as e:
31
+ if attempt < retries - 1:
32
+ await sleep(2 ** attempt)
33
+ else:
34
+ raise e
35
+
36
+
37
+ async def download_pngs(urls: str | list[str]) -> list[Any]:
38
+ urls = [urls] if isinstance(urls, str) else urls
39
+ async with AsyncClient() as client:
40
+ tasks = [download_png(url, client) for url in urls]
41
+ return list(await gather(*tasks))
42
+
43
+
44
+ async def optimize_png(image_path: Path, retries: int = 3) -> None:
45
+ command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}'
46
+ for attempt in range(retries):
47
+ try:
48
+ process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE)
49
+ stdout, stderr = await process.communicate()
50
+ if process.returncode == 0:
51
+ return
52
+ else:
53
+ raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr)
54
+ except CalledProcessError as e:
55
+ if attempt < retries - 1:
56
+ await sleep(2 ** attempt)
57
+ else:
58
+ raise e
59
+
60
+
61
+ async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None:
62
+ image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)]
63
+ tasks = [optimize_png(image_path) for image_path in image_paths]
64
+ await gather(*tasks)
65
+
66
+
67
+ async def telegraph_upload_png(file_path: str | Path) -> str | None:
68
+ file_path = Path(file_path)
69
+ if not file_path.is_file() or file_path.stat().st_size > 5 * 1024 * 1024:
70
+ return None
71
+ url = 'https://telegra.ph/upload'
72
+ headers = {
73
+ 'authority': url.rsplit('/')[2],
74
+ 'accept': 'application/json, text/javascript, */*; q=0.01',
75
+ 'origin': url.rsplit('/', 1)[0],
76
+ 'referer': url.rsplit('/', 1)[0],
77
+ 'x-requested-with': 'XMLHttpRequest',
78
+ }
79
+ async with AsyncClient() as client:
80
+ try:
81
+ response = await client.post(url, headers=headers, files={'file': ('blob', file_path.read_bytes(), 'image/png')})
82
+ response.raise_for_status()
83
+ result = response.json()
84
+ except:
85
+ return None
86
+ if response.is_success and 'error' not in result:
87
+ link = result[0]['src']
88
+ return url.rsplit('/', 1)[0] + link
89
+ else:
90
+ return None
91
+
92
+
93
+ async def upload_image_to_imgbb(file_path: Path) -> str | None:
94
+ url = f'https://api.imgbb.com/1/upload?key={choice(tokens)}'
95
+ try:
96
+ with file_path.open('rb') as file:
97
+ files = {'image': (file_path.name, file, 'image/png')}
98
+ data = {}
99
+ async with AsyncClient() as client:
100
+ response = await client.post(url, files=files, data=data, timeout=30)
101
+ response.raise_for_status()
102
+ json = response.json()
103
+ if json.get('success'):
104
+ return json['data']['url']
105
+ except:
106
+ return None
107
+
108
+
109
+ async def upload_image(file_path: Path | str) -> str | None:
110
+ file_path = Path(file_path)
111
+ return await telegraph_upload_png(file_path) or await upload_image_to_imgbb(file_path)
112
+
113
+
114
+ async def optimize_and_upload(images_urls: list[str] | str) -> list[str]:
115
+ images_urls = [images_urls] if isinstance(images_urls, str) else images_urls
116
+ images_paths = await download_pngs(images_urls)
117
+ await optimize_pngs(images_paths)
118
+ new_images_urls = []
119
+ for image_path in images_paths:
120
+ new_url = await upload_image(image_path)
121
+ if new_url:
122
+ new_images_urls.append(new_url)
123
+ image_path.unlink(missing_ok=True)
124
+ return new_images_urls
125
+
126
+
127
+ from fastapi import FastAPI, HTTPException
128
+ from fastapi.responses import PlainTextResponse
129
+ from pydantic import BaseModel, HttpUrl
130
+ from typing import List
131
+
132
+
133
+ app = FastAPI()
134
+
135
+
136
+ class ImageURLs(BaseModel):
137
+ urls: List[HttpUrl]
138
+
139
+
140
+ @app.get('/')
141
+ async def read_root():
142
+ return PlainTextResponse('ну пролапс, ну и что', status_code=200)
143
+
144
+
145
+ @app.post('/pngopt_by_urls/')
146
+ async def optimize_images_endpoint(image_urls: ImageURLs):
147
+ try:
148
+ optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls])
149
+ return {"optimized_urls": optimized_urls}
150
+ except Exception as e:
151
+ raise HTTPException(status_code=500, detail=str(e))
152
+
153
+
154
+ if __name__ == "__main__":
155
+ from uvicorn import run as uvicorn_run
156
+ uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90)