In [1]:
import threading
import queue
import secrets
import concurrent.futures
from typing import Callable, Any, Dict

In [2]:
class BackgroundTaskService:
 def __init__(self, max_tasks: int):
 self.max_tasks = max_tasks
 self.task_queue = queue.Queue()
 self.results = {}
 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_tasks)
 self.lock = threading.Lock() # To handle concurrent access to results dictionary
 threading.Thread(target=self._worker, daemon=True).start()
 self.registry={}
 def register(self,func):
 self.registry[func.__name__]=func
 def _worker(self):
 while True:
 task_id, func, args = self.task_queue.get()
 result = self.executor.submit(func, *args).result()
 with self.lock:
 self.results[task_id] = result

 def launch_task(self, func_name, *args) -> Any:
 func=self.registry[func_name]
 if self.task_queue.qsize() >= self.max_tasks:
 return "Queue Full"
 task_id = secrets.token_hex(16)
 self.task_queue.put((task_id, func, args))
 with self.lock:
 self.results[task_id] = "In Progress"
 return task_id

 def get_result(self, task_id) -> Any:
 with self.lock:
 return self.results.get(task_id, "No such task")