{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import threading\n", "import queue\n", "import secrets\n", "import concurrent.futures\n", "from typing import Callable, Any, Dict" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "class BackgroundTaskService:\n", " def __init__(self, max_tasks: int):\n", " self.max_tasks = max_tasks\n", " self.task_queue = queue.Queue()\n", " self.results = {}\n", " self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_tasks)\n", " self.lock = threading.Lock() # To handle concurrent access to results dictionary\n", " threading.Thread(target=self._worker, daemon=True).start()\n", " self.registry={}\n", " def register(self,func):\n", " self.registry[func.__name__]=func\n", " def _worker(self):\n", " while True:\n", " task_id, func, args = self.task_queue.get()\n", " result = self.executor.submit(func, *args).result()\n", " with self.lock:\n", " self.results[task_id] = result\n", "\n", " def launch_task(self, func_name, *args) -> Any:\n", " func=self.registry[func_name]\n", " if self.task_queue.qsize() >= self.max_tasks:\n", " return \"Queue Full\"\n", " task_id = secrets.token_hex(16)\n", " self.task_queue.put((task_id, func, args))\n", " with self.lock:\n", " self.results[task_id] = \"In Progress\"\n", " return task_id\n", "\n", " def get_result(self, task_id) -> Any:\n", " with self.lock:\n", " return self.results.get(task_id, \"No such task\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "py310all", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.13" } }, "nbformat": 4, "nbformat_minor": 2 }