from asyncio import create_subprocess_shell, gather, sleep from logging import ERROR, INFO, basicConfig, getLogger from pathlib import Path from random import choice from shutil import rmtree from subprocess import CalledProcessError, PIPE from typing import Any, List from uuid import uuid4 from fastapi import FastAPI, HTTPException from fastapi.responses import PlainTextResponse from httpx import AsyncClient, HTTPStatusError, RequestError from pydantic import BaseModel, HttpUrl from uvicorn import run as uvicorn_run need_logging = False basicConfig(level = INFO if need_logging else ERROR) logger = getLogger(__name__) oxipng_bin = Path(__file__).parent / 'oxipng' if not oxipng_bin.stat().st_mode & 0o111: oxipng_bin.chmod(0o755) tokens = [ # мне в общем-то все равно на эти токены '7e0ea3da6a73d77003c1abba7f0ea13c', 'bc2e68b5918e5bb59ebca6c05d73daf9', 'fecbfbe0938bcd1df27b7a9be1702cc9', '04e9981d4d0981964cb4c9753173244d', 'dee75b07981c7aa211628ea7c7cbc03d', ] async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path: logger.info(f'загрузка изображения: {url}') for attempt in range(retries): try: response = await client.get(url, timeout=30.0) response.raise_for_status() file_path = Path(__file__).parent / folder / f'{uuid4()}.png' file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_bytes(response.content) return file_path except (HTTPStatusError, RequestError) as e: if attempt < retries - 1: await sleep(2 ** attempt) else: raise e async def download_pngs(urls: str | list[str]) -> list[Any]: urls = [urls] if isinstance(urls, str) else urls logger.info(f'скачивается список список из {len(urls)}: {urls}') # бот coze имеет баг, и из воркфлоу прибавляет предыдущий ответ к ссылкам, если включен контекст чата: valid_urls = [url for url in urls if url and '\n' not in url and '\\n' not in url and url.strip() != ''] if len(valid_urls) != len(urls): logger.warning(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}') async with AsyncClient() as client: tasks = [download_png(url, str(uuid4()), client) for url in valid_urls] return list(await gather(*tasks)) async def optimize_png(image_path: Path, retries: int = 3) -> None: command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}' logger.info(f'оптимизация картинки {image_path}') for attempt in range(retries): try: process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE) stdout, stderr = await process.communicate() if process.returncode == 0: return else: raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr) except CalledProcessError as e: logger.error(f'ошибка при оптимизации {image_path}') if attempt < retries - 1: await sleep(2 ** attempt) else: raise e async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None: image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] logger.info(f'оптимизируется список список из {len(image_paths)}: {image_paths}') tasks = [optimize_png(image_path) for image_path in image_paths] await gather(*tasks) async def telegraph_upload_png(file_path: str | Path) -> str | None: file_path = Path(file_path) if not file_path.is_file() or file_path.stat().st_size > 5 * 1024 * 1024: return None url = 'https://telegra.ph/upload' headers = { 'authority': url.rsplit('/')[2], 'accept': 'application/json, text/javascript, */*; q=0.01', 'origin': url.rsplit('/', 1)[0], 'referer': url.rsplit('/', 1)[0], 'x-requested-with': 'XMLHttpRequest', } async with AsyncClient() as client: try: response = await client.post(url, headers=headers, files={'file': ('blob', file_path.read_bytes(), 'image/png')}) response.raise_for_status() result = response.json() except: return None if response.is_success and 'error' not in result: link = result[0]['src'] return url.rsplit('/', 1)[0] + link else: return None async def upload_image_to_imgbb(file_path: Path) -> str | None: url = f'https://api.imgbb.com/1/upload?key={choice(tokens)}' try: with file_path.open('rb') as file: files = {'image': (file_path.name, file, 'image/png')} data = {} async with AsyncClient() as client: response = await client.post(url, files=files, data=data, timeout=30) response.raise_for_status() json = response.json() if json.get('success'): return json['data']['url'] except: return None async def upload_image(file_path: Path | str) -> str | None: file_path = Path(file_path) return await telegraph_upload_png(file_path) or await upload_image_to_imgbb(file_path) async def optimize_and_upload(images_urls: list[str] | str) -> list[str]: images_urls = [images_urls] if isinstance(images_urls, str) else images_urls logger.info(f'принятые ссылки в обработку ({len(images_urls)}): {images_urls}') images_paths = await download_pngs(images_urls) await optimize_pngs(images_paths) new_images_urls = [] for image_path in images_paths: new_url = await upload_image(image_path) if new_url: new_images_urls.append(new_url) logger.info(f'загружено изображение {image_path} в {new_url}') try: image_path.unlink() except Exception as e: logger.error(f'не удалось удалить файл {image_path}: {e}') logger.info(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}') try: rmtree(images_paths[0].parent) except Exception as e: logger.error(f'не удалось удалить файл {images_paths[0].parent}: {e}') return new_images_urls app = FastAPI() class ImageURLs(BaseModel): urls: List[HttpUrl] @app.get('/') async def read_root(): return PlainTextResponse('ну пролапс, ну и что', status_code=200) @app.post('/pngopt_by_urls/') async def optimize_images_endpoint(image_urls: ImageURLs): try: optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls]) return {"optimized_urls": optimized_urls} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90)