File size: 9,403 Bytes
eb0b2fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
from datetime import datetime
from contextlib import closing
from excel import Driver, Vehicle, Route, Branch, RoutePlan
import psycopg2
import streamlit as st


class Data:
    
    _DEFAULT_ESTIMATE_CONTAINERS = [{
        "amount"         : 1,
        "weightKg"       : None,
        "materialTypeId" : 17, # Basura
        "containerTypeId": 8,  # Tambo 200 L
        "charge_type"    : 2,  # Por contenedor (?)
        "unitary_price"  : 0,
        "visit_price"    : 0
    }]

    @staticmethod
    def connect():
        return psycopg2.connect(**st.secrets["postgres"])
    
    def __init__(self, recycler_id: int):
        self.recycler_id = recycler_id

    def get_drivers(self) -> list[Driver]:
        # id, name
        data: list[tuple[int, str]] = []

        with closing(Data.connect()) as conn:
            with conn.cursor() as cur:
                cur.execute("""SELECT id, name from drivers WHERE id_cliente_reciclador = %s ORDER BY name ASC""", (self.recycler_id, ))
                data = cur.fetchall()

        return [Driver(id=id, name=name) for id, name in data]

    def get_vehicles(self) -> list[Driver]:
        # id, name
        data: list[tuple[int, str]] = []

        with closing(Data.connect()) as conn:
            with conn.cursor() as cur:
                cur.execute("""SELECT id, name from trucks WHERE id_cliente_reciclador = %s ORDER BY name ASC""", (self.recycler_id, ))
                data = cur.fetchall()

        return [Vehicle(id=id, name=name) for id, name in data]

    def get_routes(self, date: datetime) -> list[RoutePlan]:
        
        # id, name
        routes: list[tuple[int, str]] = [] 

        # id, route_id, driver_id, driver_name, vehicle_id, vehicle_name, index, branch_id, branch_name, customer_id, customer_name
        raw_collections: list[tuple[int, int, int, str, int, str, int, int, str, int, str]] = []

        with closing(Data.connect()) as conn:
            with conn.cursor() as cur:
                cur.execute("""SELECT id, name FROM cat_route WHERE id_cliente_reciclador = %s AND status = TRUE ORDER BY name ASC""", (self.recycler_id, ))
                routes = cur.fetchall()
            with conn.cursor() as cur:
                cur.execute(
"""
SELECT 
	r.id, 
	r.id_cat_route route_id,  
	r.id_driver driver_id,
	d.name driver_name,
	r.id_truck vehicle_id,
	t.name vehicle_name,
	r."order" "index",
	s.id branch_id,
	s.nombre_sucursal branch_name,
	eu.id customer_id,
	CASE
		WHEN eu.business_name IS NOT NULL AND TRIM(eu.business_name) != '' THEN eu.business_name
		WHEN eu.razon_social  IS NOT NULL AND TRIM(eu.razon_social ) != '' THEN eu.razon_social
		ELSE eu.id::text
	END customer_name
	
FROM 
	recollections r LEFT JOIN 
	drivers    d ON r.id_driver   = d.id LEFT JOIN
	trucks     t ON r.id_truck    = t.id LEFT JOIN
	sucursales s ON r.id_sucursal = s.id LEFT JOIN
	end_users eu ON s.id_end_user = eu.id 
WHERE 
	r.id_cliente_reciclador   = %s   AND
	r.enabled                 = TRUE AND
	r.date                    = %s   AND
	r.id_sucursal IS NOT NULL        AND
	s.id_end_user IS NOT NULL
ORDER BY
	r."order" ASC
""", (self.recycler_id, f"{date.year}-{date.month}-{date.day}"))
                raw_collections = cur.fetchall()
        
        if len(raw_collections) == 0:
            return [
                RoutePlan(
                    route             = Route  (id=route_id, name=route_name), 
                    driver            = Driver (id=-1,       name="Sin asignar"), 
                    vehicle           = Vehicle(id=-1,       name="Sin asignar"),
                    collection_points = []
                ) for route_id, route_name in routes
            ]

        collections_by_route_id = dict()
        for  id, route_id, driver_id, driver_name, vehicle_id, vehicle_name, index,  branch_id, branch_name, customer_id, customer_name in raw_collections:
            if collections_by_route_id.get(route_id, None) is None:
                collections_by_route_id[route_id] = []

            collections_by_route_id[route_id].append((
                index,
                Driver(id=driver_id, name=driver_name),
                Vehicle(id=vehicle_id, name=vehicle_name),
                Branch(
                    branch_id=branch_id, 
                    branch_name=branch_name,
                    customer_id=customer_id, 
                    customer_name=customer_name
                ),
            ))

        output: list[RoutePlan] = []
        for (route_id, route_name) in routes:
            collections: list[tuple[int, Driver, Vehicle, Branch]] = collections_by_route_id.get(route_id, [])
            collections = sorted(collections, key=lambda x: x[0])
            if len(collections) == 0:
                output.append(RoutePlan(
                    route             = Route  (id=route_id, name=route_name), 
                    driver            = Driver (id=-1,       name="Sin asignar"), 
                    vehicle           = Vehicle(id=-1,       name="Sin asignar"),
                    collection_points = []
                ))
                continue
            
            index, driver, vehicle, b = collections[0]
            
            output.append(RoutePlan(
                route             = Route(id=route_id, name=route_name), 
                driver            = driver, 
                vehicle           = vehicle, 
                collection_points = [branch for __, ___, ____, branch in collections])
            )
        
        return output

    def get_branches(self) -> list[Branch]:
        # id, name
        data: list[tuple[int, str]] = []

        with closing(Data.connect()) as conn:
            with conn.cursor() as cur:
                cur.execute(
    """SELECT 
    	s.id branch_id,
    	s.nombre_sucursal branch_name,
    	eu.id customer_id,
    	CASE 
    		WHEN eu.business_name IS NOT NULL AND TRIM(eu.business_name) != '' THEN eu.business_name
    		WHEN eu.razon_social IS NOT NULL AND TRIM(eu.razon_social) != '' THEN eu.razon_social  
    		ELSE eu.id::text
    	END customer_name
    FROM sucursales s LEFT JOIN end_users eu
    ON s.id_end_user = eu.id 
    WHERE 
    	s.id_cliente_reciclador = %s AND
    	s.id_end_user IS NOT NULL
    ORDER BY
    	customer_name ASC""", (self.recycler_id, ))
                data = cur.fetchall()

        return [Branch(
            branch_id=branch_id,
            branch_name=branch_name,
            customer_id=customer_id,
            customer_name=customer_name
        ) for branch_id, branch_name, customer_id, customer_name in data]

    def get_estimated_containers(self, branch_ids: list[int]) -> list[list[dict]]:
        """
        Returns list of dictionaries with shape: 
        {
            amount: int,
            materialTypeId: int,
            containerTypeId: int,
            unitary_price: float,
            charge_type: int,
            visit_price: float
        }
        """
        if len(branch_ids) == 0:
            return []

        BASE_QUERY = """(SELECT id, estimate_containers FROM recollections WHERE id_cliente_reciclador = %s AND id_sucursal = %s ORDER BY created_at DESC LIMIT 1)"""
        args = []
        full_query = []
        for branch_id in branch_ids:
            args.append(self.recycler_id)
            args.append(branch_id)
            full_query.append(BASE_QUERY)
        full_query = " UNION ALL ".join(full_query)

        output: list[list[dict]] = []
        with closing(Data.connect()) as conn:
            with conn.cursor() as cur:
                # Tomar la última recolección programada y usar sus contenedores
                cur.execute(full_query, args)
                data: list[tuple[int, list[dict]]] = cur.fetchall()
                if len(data) == 0:
                    for _ in branch_ids:
                        output.append(Data._DEFAULT_ESTIMATE_CONTAINERS.copy())

                for row in data:
                    estimate_containers: list[dict] = []
                    if row is None or row[1] is None or len(row[1]) == 0:
                        output.append(Data._DEFAULT_ESTIMATE_CONTAINERS.copy())
                        continue

                    collection_id, containers = row
                    for c in containers:
                        if c.get("materialTypeId",  False) and c.get("containerTypeId", False) and c.get("amount", False):
                            estimate_containers.append({
                                "amount"         : c["amount"],
                                "weightKg"       : c.get("weightKg", None),

                                "materialTypeId" : c["materialTypeId"],
                                "containerTypeId": c["containerTypeId"],

                                "unitary_price"  : c.get("unitary_price", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["unitary_price"]),
                                "charge_type"    : c.get("charge_type",   Data._DEFAULT_ESTIMATE_CONTAINERS[0]["charge_type"]),
                                "visit_price"    : c.get("visit_price",   Data._DEFAULT_ESTIMATE_CONTAINERS[0]["visit_price"])
                            })
                    output.append(estimate_containers if len(estimate_containers) > 0 else Data._DEFAULT_ESTIMATE_CONTAINERS.copy())
        
        if len(output) == 0:
            print("[OUTPUT WAS EMPTY]", branch_ids)
        return output