import sys from specklepy.api.client import SpeckleClient from specklepy.api.credentials import get_default_account, get_local_accounts from specklepy.transports.server import ServerTransport from specklepy.api import operations from specklepy.objects.geometry import Polyline, Point from specklepy.objects import Base import numpy as np import pandas as pd import matplotlib.pyplot as plt import math import matplotlib import json from notion_client import Client import os from config import landuseColumnName from config import subdomainColumnName from config import sqmPerEmployeeColumnName from config import thresholdsColumnName from config import maxPointsColumnName from config import domainColumnName from config import landuseDatabaseId , streamId, dmBranchName, dmCommitId, luBranchName, luCommitId import speckle_utils import data_utils notionToken = os.getenv('notionToken') speckleToken = os.getenv('speckleToken') # ---------------------------------------------------------------------------------- # query full database def fetch_all_database_pages(client, database_id): """ Fetches all pages from a specified Notion database. :param client: Initialized Notion client. :param database_id: The ID of the Notion database to query. :return: A list containing all pages from the database. """ start_cursor = None all_pages = [] while True: response = client.databases.query( **{ "database_id": database_id, "start_cursor": start_cursor } ) all_pages.extend(response['results']) # Check if there's more data to fetch if response['has_more']: start_cursor = response['next_cursor'] else: break return all_pages def get_property_value(page, property_name): """ Extracts the value from a specific property in a Notion page based on its type. :param page: The Notion page data as retrieved from the API. :param property_name: The name of the property whose value is to be fetched. :return: The value or values contained in the specified property, depending on type. """ # Check if the property exists in the page if property_name not in page['properties']: return None # or raise an error if you prefer property_data = page['properties'][property_name] prop_type = property_data['type'] # Handle 'title' and 'rich_text' types if prop_type in ['title', 'rich_text']: return ''.join(text_block['text']['content'] for text_block in property_data[prop_type]) # Handle 'number' type elif prop_type == 'number': return property_data[prop_type] # Handle 'select' type elif prop_type == 'select': return property_data[prop_type]['name'] if property_data[prop_type] else None # Handle 'multi_select' type elif prop_type == 'multi_select': return [option['name'] for option in property_data[prop_type]] # Handle 'date' type elif prop_type == 'date': if property_data[prop_type]['end']: return (property_data[prop_type]['start'], property_data[prop_type]['end']) else: return property_data[prop_type]['start'] # Handle 'relation' type elif prop_type == 'relation': return [relation['id'] for relation in property_data[prop_type]] # Handle 'people' type elif prop_type == 'people': return [person['name'] for person in property_data[prop_type] if 'name' in person] # Add more handlers as needed for other property types else: # Return None or raise an error for unsupported property types return None def get_page_by_id(notion_db_pages, page_id): for pg in notion_db_pages: if pg["id"] == page_id: return pg # --------------------------------------------------------------------------------------------- # def getDataFromSpeckle( speckleClient, streamID, matrixBranchName, landuseBranchName, matrixComitID="", landuseComitID="", pathToData = ["@Data", "@{0}"], uuidColumn = "uuid", landuseColumns="lu+" ): if landuseBranchName: streamLanduses = speckle_utils.getSpeckleStream(streamId,luBranchName,speckleClient, luCommitId) streamData = streamLanduses["@Data"]["@{0}"] dfLanduses = speckle_utils.get_dataframe(streamData, return_original_df=False) dfLanduses = dfLanduses.set_index("uuid", drop=False) # variable, uuid as default if type(landuseColumns) == type("s"): # extract landuse columns with "landuseColumns" landuse_columns = [] for name in dfLanduses.columns: if name.startswith(landuseColumns): landuse_columns.append(name) elif type(landuseColumns) == type([]): #assmuming the user provided a lsit of columns landuse_columns = landuseColumns dfLanduses_filtered = dfLanduses[landuse_columns] dfLanduses_filtered.columns = [col.replace('lu+', '') for col in dfLanduses_filtered.columns] if matrixBranchName: streamObj = speckle_utils.getSpeckleStream(streamId,dmBranchName,speckleClient, dmCommitId) matrices = {} isDict = False try: data_part = streamObj["@Data"]["@{0}"] for matrix in data_part: # Find the matrix name matrix_name = next((attr for attr in dir(matrix) if "matrix" in attr), None) if not matrix_name: continue matrix_data = getattr(matrix, matrix_name) originUUID = matrix_data["@originUUID"] destinationUUID = matrix_data["@destinationUUID"] processed_rows = [] for chunk in matrix_data["@chunks"]: for row in chunk["@rows"]: processed_rows.append(row["@row"]) matrix_array = np.array(processed_rows) matrix_df = pd.DataFrame(matrix_array, index=originUUID, columns=destinationUUID) matrices[matrix_name] = matrix_df except KeyError: data_part = streamObj["@Data"].__dict__ print(data_part.keys()) for k, v in data_part.items(): if "matrix" in k: matrix_name = k matrix_data = v originUUID = matrix_data["@originUUID"] destinationUUID = matrix_data["@destinationUUID"] processed_rows = [] for chunk in matrix_data["@chunks"]: for row in chunk["@rows"]: processed_rows.append(row["@row"]) matrix_array = np.array(processed_rows) matrix_df = pd.DataFrame(matrix_array, index=originUUID, columns=destinationUUID) matrices[matrix_name] = matrix_df return dfLanduses_filtered, matrices def getDataFromNotion( notion, notionToken, landuseDatabaseID, subdomainDatabaseID, landuseColumnName ="LANDUSE", subdomainColumnName ="SUBDOMAIN_LIVABILITY", sqmPerEmployeeColumnName = "SQM PER EMPL", thresholdsColumnName="MANHATTAN THRESHOLD", maxPointsColumnName = "LIVABILITY MAX POINT", domainColumnName = "DOMAIN_LIVABILITY" ): landuse_attributes = fetch_all_database_pages(notion, landuseDatabaseID) livability_attributes = fetch_all_database_pages(notion, subdomainDatabaseID) landuseMapperDict ={} livabilityMapperDict ={} for page in landuse_attributes: value_landuse = get_property_value(page, landuseColumnName) value_subdomain = get_property_value(page, subdomainColumnName) origin = "false" if not get_property_value(page, "is_origin_mask") else get_property_value(page, "is_origin_mask") if value_subdomain and value_landuse: landuseMapperDict[value_landuse] = { 'subdomain livability': value_subdomain, 'is origin': origin } for page in livability_attributes: subdomain = get_property_value(page, subdomainColumnName) sqm_per_employee = get_property_value(page, sqmPerEmployeeColumnName) thresholds = get_property_value(page, thresholdsColumnName) max_points = get_property_value(page, maxPointsColumnName) domain = get_property_value(page, domainColumnName) if thresholds: livabilityMapperDict[subdomain] = { 'sqmPerEmpl': sqm_per_employee if sqm_per_employee != "" else 0, 'thresholds': thresholds, 'max_points': max_points, 'domain': [domain if domain != "" else 0] } return landuseMapperDict, livabilityMapperDict def getDataFromGrasshopper( inputJson, inputNameMatrix, inputNameLanduse, inputNameAttributeMapper, inputNameLanduseMapper, inputNameAlpha = "alpha", inputNameThreshold = "threshold" ): if inputNameMatrix is not None: matrix = inputJson['input'][inputNameMatrix] dfMatrix_gh = pd.DataFrame(matrix).T dfMatrix_gh = dfMatrix_gh.apply(pd.to_numeric, errors='coerce') dfMatrix_gh = dfMatrix_gh.replace([np.inf, -np.inf], 10000).fillna(0) dfMatrix_gh = dfMatrix_gh.round(0).astype(int) mask_connected = dfMatrix_gh.index.tolist() else: dfMatrix_gh = None if inputNameLanduse is not None: landuses = inputJson['input'][inputNameLanduse] dfLanduses_gh = pd.DataFrame(landuses).T dfLanduses_gh = dfLanduses_gh.apply(pd.to_numeric, errors='coerce') dfLanduses_gh = dfLanduses_gh.replace([np.inf, -np.inf], 0).fillna(0) # cleaning function? dfLanduses_gh = dfLanduses_gh.round(0).astype(int) if dfMatrix_gh is not None: valid_indexes = [idx for idx in mask_connected if idx in dfLanduses_gh.index] # Identify and report missing indexes missing_indexes = set(mask_connected) - set(valid_indexes) if missing_indexes: print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}, length: {len(missing_indexes)}") # Apply the filtered mask dfLanduses_gh = dfLanduses_gh.loc[valid_indexes] else: dfLanduses_gh = None if inputNameAttributeMapper is not None: attributeMapperDict_gh = inputJson['input'][inputNameAttributeMapper] else: attributeMapperDict_gh = None if inputNameLanduseMapper is not None: landuseMapperDict_gh = inputJson['input'][inputNameLanduseMapper] else: landuseMapperDict_gh = None if inputNameAlpha is not None: alpha = inputJson['input'][inputNameAlpha] alpha = float(alpha) if alpha is None: alpha = alphaDefault else: alpha = alphaDefault if inputNameThreshold is not None: threshold = inputJson['input'][inputNameThreshold] threshold = float(threshold) if threshold is None: threshold = thresholdDefault else: threshold = thresholdDefault return dfMatrix_gh, dfLanduses_gh, attributeMapperDict_gh, landuseMapperDict_gh, alpha, threshold def splitDictByStrFragmentInColumnName(original_dict, substrings): result_dicts = {substring: {} for substring in substrings} for key, nested_dict in original_dict.items(): for subkey, value in nested_dict.items(): for substring in substrings: if substring in subkey: if key not in result_dicts[substring]: result_dicts[substring][key] = {} result_dicts[substring][key][subkey] = value return result_dicts def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList): df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList) for subdomain in UniqueSubdomainsList: for lu, attributes in LanduseToSubdomainDict.items(): if attributes["subdomain livability"] == subdomain: if lu in LanduseDf.columns: if LanduseDf[lu].notna().any(): df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0) else: print(f"Warning: Column '{lu}' not found in landuse database") return df_LivabilitySubdomainsArea def FindWorkplacesNumber (DistanceMatrix,livabilityMapperDict,destinationWeights,UniqueSubdomainsList ): df_LivabilitySubdomainsWorkplaces = pd.DataFrame(0, index=DistanceMatrix.index, columns=['jobs']) for subdomain in UniqueSubdomainsList: for key, values in livabilityMapperDict.items(): if key and values['sqmPerEmpl']: sqm_per_empl = float(livabilityMapperDict[subdomain]['sqmPerEmpl']) if key in destinationWeights.columns and key == subdomain: if sqm_per_empl > 0: df_LivabilitySubdomainsWorkplaces['jobs'] += (round(destinationWeights[key] / sqm_per_empl,2)).fillna(0) else: df_LivabilitySubdomainsWorkplaces['jobs'] += 0 else: df_LivabilitySubdomainsWorkplaces['jobs'] += 0 return df_LivabilitySubdomainsWorkplaces def computeAccessibility (DistanceMatrix, destinationWeights=None,alpha = 0.0038, threshold = 600): decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold) # for weighted accessibility (e. g. areas) if destinationWeights is not None: #not destinationWeights.empty: subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=destinationWeights.columns) for col in destinationWeights.columns: subdomainsAccessibility[col] = (decay_factors * destinationWeights[col].values).sum(axis=1) else: print("Destination weights parameter is None") return subdomainsAccessibility def computeAccessibility_pointOfInterest (DistanceMatrix, columnName, alpha = 0.0038, threshold = 600): decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold) pointOfInterestAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=[columnName]) for col in pointOfInterestAccessibility.columns: pointOfInterestAccessibility[col] = (decay_factors * 1).sum(axis=1) return pointOfInterestAccessibility def remap(value, B_min, B_max, C_min, C_max): return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min)) def accessibilityToLivability (DistanceMatrix,accessibilityInputs, SubdomainAttributeDict,UniqueDomainsList): livability = pd.DataFrame(index=DistanceMatrix.index, columns=accessibilityInputs.columns) for domain in UniqueDomainsList: livability[domain] = 0 livability.fillna(0, inplace=True) templist = [] # remap accessibility to livability points for key, values in SubdomainAttributeDict.items(): threshold = float(SubdomainAttributeDict[key]['thresholds']) max_livability = float(SubdomainAttributeDict[key]['max_points']) domains = [str(item) for item in SubdomainAttributeDict[key]['domain']] if key in accessibilityInputs.columns and key != 'commercial': livability_score = remap(accessibilityInputs[key], 0, threshold, 0, max_livability) livability.loc[accessibilityInputs[key] >= threshold, key] = max_livability livability.loc[accessibilityInputs[key] < threshold, key] = livability_score if any(domains): for domain in domains: if domain != 'Workplaces': livability.loc[accessibilityInputs[key] >= threshold, domain] += max_livability livability.loc[accessibilityInputs[key] < threshold, domain] += livability_score elif key == 'commercial': livability_score = remap(accessibilityInputs['jobs'], 0, threshold, 0, max_livability) livability.loc[accessibilityInputs['jobs'] >= threshold, domains[0]] = max_livability livability.loc[accessibilityInputs['jobs'] < threshold, domains[0]] = livability_score return livability def findUniqueDomains (livabilityMapperDict): # find a set of unique domains, to which subdomains are aggregated temp = [] domain_list = [] for key, values in livabilityMapperDict.items(): domain = livabilityMapperDict[key]['domain'] for item in domain: if ',' in item: domain_list = item.split(',') livabilityMapperDict[key]['domain'] = domain_list for domain in domain_list: temp.append(domain) else: if item != 0: temp.append(item) domainsUnique = list(set(temp)) return domainsUnique def findUniqueSubdomains (landuseMapperDict): # find a list of unique subdomains, to which land uses are aggregated temp = [] for key, values in landuseMapperDict.items(): subdomain = str(landuseMapperDict[key]["subdomain livability"]) if subdomain != 0: temp.append(subdomain) subdomainsUnique = list(set(temp)) return subdomainsUnique