from datetime import datetime from contextlib import closing from excel import Driver, Vehicle, Route, Branch, RoutePlan import psycopg2 import streamlit as st class Data: _DEFAULT_ESTIMATE_CONTAINERS = [{ "amount" : 1, "weightKg" : None, "materialTypeId" : 17, # Basura "containerTypeId": 8, # Tambo 200 L "charge_type" : 2, # Por contenedor (?) "unitary_price" : 0, "visit_price" : 0 }] @staticmethod def connect(): return psycopg2.connect(**st.secrets["postgres"]) def __init__(self, recycler_id: int): self.recycler_id = recycler_id def get_drivers(self) -> list[Driver]: # id, name data: list[tuple[int, str]] = [] with closing(Data.connect()) as conn: with conn.cursor() as cur: cur.execute("""SELECT id, name from drivers WHERE id_cliente_reciclador = %s ORDER BY name ASC""", (self.recycler_id, )) data = cur.fetchall() return [Driver(id=id, name=name) for id, name in data] def get_vehicles(self) -> list[Driver]: # id, name data: list[tuple[int, str]] = [] with closing(Data.connect()) as conn: with conn.cursor() as cur: cur.execute("""SELECT id, name from trucks WHERE id_cliente_reciclador = %s ORDER BY name ASC""", (self.recycler_id, )) data = cur.fetchall() return [Vehicle(id=id, name=name) for id, name in data] def get_routes(self, date: datetime) -> list[RoutePlan]: # id, name routes: list[tuple[int, str]] = [] # id, route_id, driver_id, driver_name, vehicle_id, vehicle_name, index, branch_id, branch_name, customer_id, customer_name raw_collections: list[tuple[int, int, int, str, int, str, int, int, str, int, str]] = [] with closing(Data.connect()) as conn: with conn.cursor() as cur: cur.execute("""SELECT id, name FROM cat_route WHERE id_cliente_reciclador = %s AND status = TRUE ORDER BY name ASC""", (self.recycler_id, )) routes = cur.fetchall() with conn.cursor() as cur: cur.execute( """ SELECT r.id, r.id_cat_route route_id, r.id_driver driver_id, d.name driver_name, r.id_truck vehicle_id, t.name vehicle_name, r."order" "index", s.id branch_id, s.nombre_sucursal branch_name, eu.id customer_id, CASE WHEN eu.business_name IS NOT NULL AND TRIM(eu.business_name) != '' THEN eu.business_name WHEN eu.razon_social IS NOT NULL AND TRIM(eu.razon_social ) != '' THEN eu.razon_social ELSE eu.id::text END customer_name FROM recollections r LEFT JOIN drivers d ON r.id_driver = d.id LEFT JOIN trucks t ON r.id_truck = t.id LEFT JOIN sucursales s ON r.id_sucursal = s.id LEFT JOIN end_users eu ON s.id_end_user = eu.id WHERE r.id_cliente_reciclador = %s AND r.enabled = TRUE AND r.date = %s AND r.id_sucursal IS NOT NULL AND s.id_end_user IS NOT NULL ORDER BY r."order" ASC """, (self.recycler_id, f"{date.year}-{date.month}-{date.day}")) raw_collections = cur.fetchall() if len(raw_collections) == 0: return [ RoutePlan( route = Route (id=route_id, name=route_name), driver = Driver (id=-1, name="Sin asignar"), vehicle = Vehicle(id=-1, name="Sin asignar"), collection_points = [] ) for route_id, route_name in routes ] collections_by_route_id = dict() for id, route_id, driver_id, driver_name, vehicle_id, vehicle_name, index, branch_id, branch_name, customer_id, customer_name in raw_collections: if collections_by_route_id.get(route_id, None) is None: collections_by_route_id[route_id] = [] collections_by_route_id[route_id].append(( index, Driver(id=driver_id, name=driver_name), Vehicle(id=vehicle_id, name=vehicle_name), Branch( branch_id=branch_id, branch_name=branch_name, customer_id=customer_id, customer_name=customer_name ), )) output: list[RoutePlan] = [] for (route_id, route_name) in routes: collections: list[tuple[int, Driver, Vehicle, Branch]] = collections_by_route_id.get(route_id, []) collections = sorted(collections, key=lambda x: x[0]) if len(collections) == 0: output.append(RoutePlan( route = Route (id=route_id, name=route_name), driver = Driver (id=-1, name="Sin asignar"), vehicle = Vehicle(id=-1, name="Sin asignar"), collection_points = [] )) continue index, driver, vehicle, b = collections[0] output.append(RoutePlan( route = Route(id=route_id, name=route_name), driver = driver, vehicle = vehicle, collection_points = [branch for __, ___, ____, branch in collections]) ) return output def get_branches(self) -> list[Branch]: # id, name data: list[tuple[int, str]] = [] with closing(Data.connect()) as conn: with conn.cursor() as cur: cur.execute( """SELECT s.id branch_id, s.nombre_sucursal branch_name, eu.id customer_id, CASE WHEN eu.business_name IS NOT NULL AND TRIM(eu.business_name) != '' THEN eu.business_name WHEN eu.razon_social IS NOT NULL AND TRIM(eu.razon_social) != '' THEN eu.razon_social ELSE eu.id::text END customer_name FROM sucursales s LEFT JOIN end_users eu ON s.id_end_user = eu.id WHERE s.id_cliente_reciclador = %s AND s.id_end_user IS NOT NULL ORDER BY customer_name ASC""", (self.recycler_id, )) data = cur.fetchall() return [Branch( branch_id=branch_id, branch_name=branch_name, customer_id=customer_id, customer_name=customer_name ) for branch_id, branch_name, customer_id, customer_name in data] def get_estimated_containers(self, branch_ids: list[int]) -> list[list[dict]]: """ Returns list of dictionaries with shape: { amount: int, materialTypeId: int, containerTypeId: int, unitary_price: float, charge_type: int, visit_price: float } """ if len(branch_ids) == 0: return [] BASE_QUERY = """(SELECT id, estimate_containers FROM recollections WHERE id_cliente_reciclador = %s AND id_sucursal = %s ORDER BY created_at DESC LIMIT 1)""" args = [] full_query = [] for branch_id in branch_ids: args.append(self.recycler_id) args.append(branch_id) full_query.append(BASE_QUERY) full_query = " UNION ALL ".join(full_query) output: list[list[dict]] = [] with closing(Data.connect()) as conn: with conn.cursor() as cur: # Tomar la última recolección programada y usar sus contenedores cur.execute(full_query, args) data: list[tuple[int, list[dict]]] = cur.fetchall() if len(data) == 0: for _ in branch_ids: output.append(Data._DEFAULT_ESTIMATE_CONTAINERS.copy()) for row in data: estimate_containers: list[dict] = [] if row is None or row[1] is None or len(row[1]) == 0: output.append(Data._DEFAULT_ESTIMATE_CONTAINERS.copy()) continue collection_id, containers = row for c in containers: if c.get("materialTypeId", False) and c.get("containerTypeId", False) and c.get("amount", False): estimate_containers.append({ "amount" : c["amount"], "weightKg" : c.get("weightKg", None), "materialTypeId" : c["materialTypeId"], "containerTypeId": c["containerTypeId"], "unitary_price" : c.get("unitary_price", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["unitary_price"]), "charge_type" : c.get("charge_type", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["charge_type"]), "visit_price" : c.get("visit_price", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["visit_price"]) }) output.append(estimate_containers if len(estimate_containers) > 0 else Data._DEFAULT_ESTIMATE_CONTAINERS.copy()) if len(output) == 0: print("[OUTPUT WAS EMPTY]", branch_ids) return output