Source code for fasttrips.Assignment

from __future__ import print_function
from __future__ import division
from past.builtins import execfile
from future import standard_library
standard_library.install_aliases()
from builtins import zip
from builtins import str
from builtins import range
from builtins import object

__copyright__ = "Copyright 2015-2017 Contributing Entities"
__license__   = """
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
"""
import configparser
import datetime
import math
import multiprocessing
import os
import queue
import sys
import traceback

import numpy as np
import pandas as pd

import _fasttrips
from .Error import ConfigurationError
from .Logger import FastTripsLogger, setupLogging
from .Passenger import Passenger
from .PathSet import PathSet
from .Performance import Performance
from .Trip import Trip
from .Util import Util


[docs]class Assignment(object): """ Assignment class. Documentation forthcoming. """ #: Configuration file for fasttrips CONFIGURATION_FILE = None #: Configuration functions CONFIGURATION_FUNCTIONS_FILE = None #: Output copy of the configuration file in case anything got overridden #: (Hmm naming conventions are a bit awkward here) CONFIGURATION_OUTPUT_FILE = 'ft_output_config.txt' #: Configuration: Input network directory INPUT_NETWORK_ARCHIVE = None #: Configuration: Input demand directory INPUT_DEMAND_DIR = None #: Configuration: Pathweight parameters INPUT_WEIGHTS = None #: Configuration: Run Configuration OUTPUT_DIR = None #: Configuration: Maximum number of iterations to remove capacity violations. When #: the transit system is not crowded or when capacity constraint is #: relaxed the model will terminate after the first iteration MAX_ITERATIONS = None MAX_PF_ITERATIONS = 10 CONVERGENCE_GAP = None NETWORK_BUILD_DATE = datetime.datetime.today() NETWORK_BUILD_DATE_START_TIME = datetime.datetime.combine(NETWORK_BUILD_DATE, datetime.time()) #: Find paths deterministically, using shortest path search based on travel time. PATHFINDING_TYPE_DETERMINISTIC = 'deterministic' #: Find paths stochastically using trip-based hyperpath PATHFINDING_TYPE_STOCHASTIC = 'stochastic' #: Don't find paths; read :py:attr:`Passenger.PF_PATHS_CSV` and :py:attr:`Passenger.PF_LINKS_CSV`. PATHFINDING_TYPE_READ_FILE = 'file' #: Configuration: Pathfinding Type. Should be one of `Deterministic`, `Stochastic` or `File` PATHFINDING_TYPE = None #: Configuration: Do simulation? It should be True for iterative assignment. In a one shot #: assignment with simulation flag off, the passengers are assigned to #: paths but are not loaded to the network. Boolean. SIMULATION = None #: Configuration: Passenger trajectory output flag. Passengers' path and time will be #: reported if this flag is on. Note that the simulation flag should be on for #: passengers' time. Boolean. OUTPUT_PASSENGER_TRAJECTORIES = None #: Configuration: If true, outputs pathset every simulation iteration. If false, #: outputs pathset every path-finding iteration. OUTPUT_PATHSET_PER_SIM_ITER = None #: Configuration: Path time-window. This is the time in which the paths are generated. #: E.g. with a typical 30 min window, any path within 30 min of the #: departure time will be checked. A :py:class:`datetime.timedelta` instance. TIME_WINDOW = None #: Configuration: Create skims flag. This is specific to the travel demand models #: (not working in this version). Boolean. CREATE_SKIMS = None #: Configuration: Beginning of the time period for which the skim is required. #: (specify as 'HH:MM'). A :py:class:`datetime.datetime` instance. SKIM_START_TIME = None #: Configuration: End of the time period for which the skim is required #: (specify as 'HH:MM'). A :py:class:`datetime.datetime` instance. SKIM_END_TIME = None #: Route choice configuration: Max number of paths in a pathset. #: Used in conjuntion with :py:attr:`Assignment.MIN_PATH_PROBABILITY` MAX_NUM_PATHS = None #: Route choice configuration: Minimum path probability for the path to be used. #: Used in conjucntion with :py:attr:`Assignment.MAX_NUM_PATHS`, so it only #: kicks in if that is specified AND we hit it, then we start dropping using #: this threshhold. MIN_PATH_PROBABILITY = None #: Route choice configuration: Dispersion parameter in the logit function. #: Higher values result in less stochasticity. Must be nonnegative. #: If unknown use a value between 0.5 and 1. Float. STOCH_DISPERSION = None #: Stop labeling configuration: Multiplies the utilities by this factor #: so that there are not negative costs labels which can result in #: lengthy and ineffective path-finding. Must be positive; should be greater #: than 1.0. Double. UTILS_CONVERSION = None #: In path-finding, suppress trying to adjust fares using transfer fare rules. #: This is for performance testing. TRANSFER_FARE_IGNORE_PATHFINDING = None #: In path-enumeration, suppress trying to adjust fares using transfer fare rules. #: This is for performance testing. TRANSFER_FARE_IGNORE_PATHENUM = None #: Route choice configuration: How many times max times should we process a stop #: during labeling? Use -1 to specify no max. Int. #: Setting this to a positive value may increase runtime but may decrease #: pathset quality. (Todo: test/quantify this.) STOCH_MAX_STOP_PROCESS_COUNT = None #: Route choice configuration: How many stochastic paths will we generate #: (not necessarily unique) to define a path choice set? Int. STOCH_PATHSET_SIZE = None #: Route choice configuration: Use vehicle capacity constraints. Boolean. CAPACITY_CONSTRAINT = None #: Debug mode: only run trace passengers DEBUG_TRACE_ONLY = False #: Debug mode: only run this number of trips, -1 to run all. Int. DEBUG_NUM_TRIPS = -1 #: Debug: include debug columns in output DEBUG_OUTPUT_COLUMNS = False #: Fare zone symmetry. If True, will assume fare zone symmetry. That is, if fare_id X is # configured from origin zone A to destination zone B, and there is no fare configured # from zone B to zone A, we'll assume that fare_id X also applies. FARE_ZONE_SYMMETRY = False #: Skip these passengers SKIP_PERSON_IDS = None #: Trace these persons/person trips (a list of tuples) TRACE_IDS = [] #: Prepend the route id to the trip id? This is for readability in debugging, since #: route IDs are typically more readable and trip ids are inscrutable PREPEND_ROUTE_ID_TO_TRIP_ID = False #: Number of processes to use for path finding (via :py:mod:`multiprocessing`) #: Set to 1 to run everything in this process #: Set to less than 1 to use the result of :py:func:`multiprocessing.cpu_count` #: Set to positive integer greater than 1 to set a fixed number of processes NUMBER_OF_PROCESSES = None #: Extra time so passengers don't get bumped (?). A :py:class:`datetime.timedelta` instance. BUMP_BUFFER = None #: This is the only simulation state that exists across iterations #: It's a dictionary of (trip_id, stop_id) -> earliest time a bumped passenger started waiting bump_wait = {} bump_wait_df = None #: Simulation: bump one stop at a time (slower, more accurate) #: #: When addressing capacity constraints in simulation, we look at all the (trip, stop)-pairs #: where the boards are not allowed since vehicle is over capacity. The faster way to address #: this is to bump all of those passengers, which means we call the assigned path bad and try #: to reassign. #: #: However, this could over-bump passengers, because if a passenger rides multiple #: crowded vehicles, then bumping her frees up space on other vehicles and so some other bumping #: may not be necessary. Thus, the more accurate (but slower) method is to bump passengers from #: each (trip,stop) at a time, in order of the full vehicle arrival time, and then recalculate #: loads, and iterate until we have no capacity issues. Boolean. BUMP_ONE_AT_A_TIME = None #: MSA the results that affect the next iteration to avoid oscillation: boards, alights, overcap onboard at stops MSA_RESULTS = False #: Are we finding paths for everyone right now? Or just un-arrived folks? PATHFINDING_EVERYONE = True #: How many Simulation Iterations should we do before going back to path-finding? MAX_SIMULATION_ITERS = 10 #: Column names for simulation SIM_COL_PAX_BOARD_TIME = 'board_time' #: Board time on the transit vehicle SIM_COL_PAX_ALIGHT_TIME = 'alight_time' #: Alight time from the transit vehicle SIM_COL_PAX_ALIGHT_DELAY_MIN = 'alight_delay_min' #: Delay in alight_time from original pathfinding understanding of alight time SIM_COL_PAX_A_TIME = 'new_A_time' #: Time of arrival at A SIM_COL_PAX_B_TIME = 'new_B_time' #: Time of arrival at B SIM_COL_PAX_LINK_TIME = 'new_linktime' #: Link time (SIM_COL_PAX_B_TIME - SIM_COL_PAX_A_TIME) SIM_COL_PAX_WAIT_TIME = 'new_waittime' #: Wait time SIM_COL_PAX_MISSED_XFER = 'missed_xfer' #: Is this link a missed transfer SIM_COL_PAX_OVERCAP = Trip.SIM_COL_VEH_OVERCAP #: SIM_COL_PAX_OVERCAP_FRAC = Trip.SIM_COL_VEH_OVERCAP_FRAC #: If board at an overcap stop, fraction of boards that are overcap SIM_COL_PAX_BUMP_ITER = 'bump_iter' SIM_COL_PAX_BOARD_STATE = 'board_state' #: NaN if not relevent, 1 if lucky enough to board at an at- or over-capacity stop, 0 if bumped. Set by :py:meth:`Assignment.flag_bump_overcap_passengers` SIM_COL_PAX_DISTANCE = "distance" #: Link distance SIM_COL_PAX_FARE = "fare" #: Link fare in currency SIM_COL_PAX_FARE_PERIOD = "fare_period" #: Fare period id SIM_COL_PAX_FREE_TRANSFER = "free_transfer" #: Free transfer? NaN, 0.0 or 1.0, only free based on `fare_attributes_ft.txt` SIM_COL_PAX_COST = 'sim_cost' #: Link cost. (Cannot be `cost` because it collides with TAZ.DRIVE_ACCESS_COLUMN_COST) SIM_COL_PAX_LNPS = 'ln_PS' #: log(PathSize) SIM_COL_PAX_PROBABILITY = 'probability' #: Probability of this path SIM_COL_PAX_LOGSUM = 'logsum' #: Logsum of all paths #: Is this link/path a missed transfer? #: Set in both pathset links and pathset paths, this is a 1 or 0 SIM_COL_MISSED_XFER = 'missed_xfer' #: Values for :py:attr:`Assignment.SIM_COL_PAX_BOARD_STATE` column BOARD_STATE_CATEGORICAL = [ \ "board_easy", #: path chosen and no capacity problems "boarded", #: path chosen and lucky enough to board an at-capacity or over-capacity vehicle "bumped", #: path chosen but bumped due to capacity problems "bumped_othertrip", #: path invalidated due to being bumped on another link of this person's trip "bumped_unchosen"] #: path invalidated before ever chosen due to capacity problems #: Chosen status for path SIM_COL_PAX_CHOSEN = 'chosen' #: categories for SIM_COL_PAX_CHOSEN CHOSEN_NOT_CHOSEN_YET = "unchosen" CHOSEN_REJECTED = "rejected" #: These will be ordered, so to select chosen, choose those > CHOSEN_NOT_CHOSEN_YET CHOSEN_CATEGORIES = [CHOSEN_REJECTED,CHOSEN_NOT_CHOSEN_YET]
[docs] def __init__(self): """ This does nothing. Assignment methods are static methods for now. """ pass
[docs] @staticmethod def read_functions(func_file): """ Read the functions from :py:attr:`Assignment.CONFIGURATION_FUNCTIONS_FILE """ # Functions are defined in here -- read this and eval it if func_file and os.path.exists(func_file): my_globals = {} FastTripsLogger.info("Reading %s" % func_file) execfile(func_file, my_globals, PathSet.CONFIGURED_FUNCTIONS) FastTripsLogger.info("PathSet.CONFIGURED_FUNCTIONS = %s" % str(PathSet.CONFIGURED_FUNCTIONS))
[docs] @staticmethod def read_configuration(config_fullpath): """ Read the configuration parameters from :py:attr:`Assignment.CONFIGURATION_FILE` """ pd.set_option('display.width', 1000) # pd.set_option('display.height', 1000) pd.set_option('display.max_rows', 1000) pd.set_option('display.max_columns', 100) parser = configparser.RawConfigParser( defaults={'max_iterations' :1, 'max_pf_iterations' :1, 'network_build_date' : datetime.date.today().strftime("%m/%d/%Y"), 'simulation' :'True', 'learning_convergence' :'False', 'learning_rate' : 0.01, 'convergence_gap' : 0.001, 'output_pathset_per_sim_iter' :'False', 'output_passenger_trajectories' :'True', 'create_skims' :'False', 'skim_start_time' :'5:00', 'skim_end_time' :'10:00', 'capacity_constraint' :'False', 'skip_person_ids' :'None', 'trace_ids' :[], 'debug_trace_only' :'False', 'debug_num_trips' :-1, 'debug_output_columns' :'False', 'fare_zone_symmetry' :'False', 'prepend_route_id_to_trip_id' :'False', 'number_of_processes' :0, 'bump_buffer' :5, 'bump_one_at_a_time' :'False', # pathfinding 'max_num_paths' :-1, 'min_path_probability' :0.005, 'min_transfer_penalty' :0.1, 'overlap_chunk_size' :500, 'overlap_scale_parameter' :1.0, 'overlap_split_transit' :'False', 'overlap_variable' :'count', 'pathfinding_type' :Assignment.PATHFINDING_TYPE_STOCHASTIC, 'pathweights_fixed_width' :'False', 'utils_conversion_factor' :1.0, 'stochastic_dispersion' :1.0, 'stochastic_max_stop_process_count':20, 'stochastic_pathset_size' :1000, 'time_window' :30, 'transfer_fare_ignore_pathfinding' :'False', 'transfer_fare_ignore_pathenum' :'False', 'user_class_function' :'generic_user_class', 'arrive_late_allowed_min' : 0, 'depart_early_allowed_min' : 0, }) # Read configuration from specified configuration directory FastTripsLogger.info("Reading configuration file %s" % config_fullpath) parser.read(config_fullpath) Assignment.MAX_ITERATIONS = parser.getint ('fasttrips','max_iterations') Assignment.MAX_PF_ITERATIONS = parser.getint ('fasttrips','max_pf_iterations') Assignment.NETWORK_BUILD_DATE = datetime.datetime.strptime( parser.get('fasttrips', 'network_build_date'), '%m/%d/%Y').date() Assignment.NETWORK_BUILD_DATE_START_TIME = datetime.datetime.combine(Assignment.NETWORK_BUILD_DATE, datetime.time()) Assignment.SIMULATION = parser.getboolean('fasttrips','simulation') PathSet.LEARN_ROUTES = parser.getboolean('fasttrips', 'learning_convergence') PathSet.LEARN_ROUTES_RATE = parser.getfloat('fasttrips', 'learning_rate') Assignment.CONVERGENCE_GAP = parser.getfloat('fasttrips', 'convergence_gap') Assignment.OUTPUT_PASSENGER_TRAJECTORIES = parser.getboolean('fasttrips','output_passenger_trajectories') Assignment.OUTPUT_PATHSET_PER_SIM_ITER = parser.getboolean('fasttrips','output_pathset_per_sim_iter') Assignment.CREATE_SKIMS = parser.getboolean('fasttrips','create_skims') Assignment.SKIM_START_TIME = datetime.datetime.strptime( parser.get ('fasttrips','skim_start_time'),'%H:%M') Assignment.SKIM_END_TIME = datetime.datetime.strptime( parser.get ('fasttrips','skim_end_time'),'%H:%M') Assignment.CAPACITY_CONSTRAINT = parser.getboolean('fasttrips','capacity_constraint') Assignment.SKIP_PERSON_IDS = eval(parser.get ('fasttrips','skip_person_ids')) try: Assignment.TRACE_IDS = eval(parser.get ('fasttrips','trace_ids')) except: e = "Must have a TRACE_IDS line in config_ft.txt; even if it is trace_ids = []" print(e) FastTripsLogger.error(e) raise Assignment.DEBUG_TRACE_ONLY = parser.getboolean('fasttrips','debug_trace_only') Assignment.DEBUG_NUM_TRIPS = parser.getint ('fasttrips','debug_num_trips') Assignment.DEBUG_OUTPUT_COLUMNS = parser.getboolean('fasttrips','debug_output_columns') Assignment.FARE_ZONE_SYMMETRY = parser.getboolean('fasttrips','fare_zone_symmetry') Assignment.PREPEND_ROUTE_ID_TO_TRIP_ID = parser.getboolean('fasttrips','prepend_route_id_to_trip_id') Assignment.NUMBER_OF_PROCESSES = parser.getint ('fasttrips','number_of_processes') Assignment.BUMP_BUFFER = datetime.timedelta( minutes = parser.getfloat ('fasttrips','bump_buffer')) Assignment.BUMP_ONE_AT_A_TIME = parser.getboolean('fasttrips','bump_one_at_a_time') # pathfinding Assignment.MAX_NUM_PATHS = parser.getint ('pathfinding','max_num_paths') Assignment.MIN_PATH_PROBABILITY = parser.getfloat ('pathfinding','min_path_probability') PathSet.MIN_TRANSFER_PENALTY = parser.getfloat ('pathfinding','min_transfer_penalty') PathSet.OVERLAP_CHUNK_SIZE = parser.getint ('pathfinding','overlap_chunk_size') PathSet.OVERLAP_SCALE_PARAMETER = parser.getfloat ('pathfinding','overlap_scale_parameter') PathSet.OVERLAP_SPLIT_TRANSIT = parser.getboolean('pathfinding','overlap_split_transit') PathSet.OVERLAP_VARIABLE = parser.get ('pathfinding','overlap_variable') Assignment.PATHFINDING_TYPE = parser.get ('pathfinding','pathfinding_type') PathSet.WEIGHTS_FIXED_WIDTH = parser.getboolean('pathfinding','pathweights_fixed_width') Assignment.STOCH_DISPERSION = parser.getfloat ('pathfinding','stochastic_dispersion') Assignment.UTILS_CONVERSION = parser.getfloat ('pathfinding','utils_conversion_factor') Assignment.STOCH_MAX_STOP_PROCESS_COUNT = parser.getint ('pathfinding','stochastic_max_stop_process_count') Assignment.STOCH_PATHSET_SIZE = parser.getint ('pathfinding','stochastic_pathset_size') Assignment.TIME_WINDOW = datetime.timedelta( minutes = parser.getfloat ('pathfinding','time_window')) Assignment.TRANSFER_FARE_IGNORE_PATHFINDING = parser.getboolean('pathfinding','transfer_fare_ignore_pathfinding') Assignment.TRANSFER_FARE_IGNORE_PATHENUM = parser.getboolean('pathfinding','transfer_fare_ignore_pathenum') PathSet.USER_CLASS_FUNCTION = parser.get ('pathfinding','user_class_function') PathSet.DEPART_EARLY_ALLOWED_MIN = datetime.timedelta( minutes = parser.getfloat('pathfinding', 'depart_early_allowed_min')) PathSet.ARRIVE_LATE_ALLOWED_MIN = datetime.timedelta( minutes = parser.getfloat ('pathfinding','arrive_late_allowed_min')) if Assignment.PATHFINDING_TYPE not in [Assignment.PATHFINDING_TYPE_STOCHASTIC, \ Assignment.PATHFINDING_TYPE_DETERMINISTIC, \ Assignment.PATHFINDING_TYPE_READ_FILE]: msg = "pathfinding type [%s] not available. Expected values: %s" % (Assignment.PATHFINDING_TYPE, str([Assignment.PATHFINDING_TYPE_STOCHASTIC, Assignment.PATHFINDING_TYPE_DETERMINISTIC, Assignment.PATHFINDING_TYPE_READ_FILE])) FastTripsLogger.fatal(msg) raise ConfigurationError(config_fullpath, msg) if PathSet.OVERLAP_VARIABLE not in PathSet.OVERLAP_VARIABLE_OPTIONS: msg = "pathfinding.overlap_variable [%s] not defined. Expected values: %s" % (PathSet.OVERLAP_VARIABLE, str(PathSet.OVERLAP_VARIABLE_OPTIONS)) FastTripsLogger.fatal(msg) raise ConfigurationError(config_fullpath, msg) if PathSet.USER_CLASS_FUNCTION not in PathSet.CONFIGURED_FUNCTIONS: msg = "User class function [%s] not defined. Please check your function file [%s]" % (PathSet.USER_CLASS_FUNCTION, Assignment.CONFIGURATION_FUNCTIONS_FILE) FastTripsLogger.fatal(msg) raise ConfigurationError(config_fullpath, msg)
[docs] @staticmethod def read_weights(weights_file = INPUT_WEIGHTS): """ Read the weights from :py:attr:`Assignment.INPUT_WEIGHTS """ if not os.path.exists(weights_file): FastTripsLogger.fatal("No path weights file %s" % weights_file) sys.exit(2) if PathSet.WEIGHTS_FIXED_WIDTH: weights = pd.read_fwf(weights_file) weights[PathSet.WEIGHTS_COLUMN_PURPOSE] = weights[PathSet.WEIGHTS_COLUMN_PURPOSE].astype(str) else: print (PathSet.WEIGHTS_COLUMN_PURPOSE) ##LMZ weights = pd.read_csv(weights_file, dtype={PathSet.WEIGHTS_COLUMN_PURPOSE:'S'}, skipinitialspace=True) PathSet.WEIGHTS_DF = Assignment.process_weight_qualifiers(weights) FastTripsLogger.debug("Weights =\n%s" % str(PathSet.WEIGHTS_DF)) FastTripsLogger.debug("Weight types = \n%s" % str(PathSet.WEIGHTS_DF.dtypes))
[docs] @staticmethod def process_weight_qualifiers(weights): """ Qualifiers are used to change the default behavior of weight_names. Qualifiers are added by adding a period (.) after the weight_name and specifying the qualifier name. Qualifier attributes are specified after a second dot. For example: depart_early_cost_min.logistic.growth_rate depart_early_cost_min is being qualified as a logistic penalty instead of the default behavior. growth_rate is an attribute of the logistic qualifier. :param weights: vertically oriented qualifiers :return: pivoted weights table with the qualifiers normalized horizontally. """ growth_type = weights[weights[PathSet.WEIGHTS_COLUMN_WEIGHT_NAME].str.count('\.') == 1].copy() qualifiers = weights[weights[PathSet.WEIGHTS_COLUMN_WEIGHT_NAME].str.count('\.') == 2].copy() weights = weights[~weights[PathSet.WEIGHTS_COLUMN_WEIGHT_NAME].str.contains('\.')].copy() weights[PathSet.WEIGHTS_GROWTH_TYPE] = PathSet.CONSTANT_GROWTH_MODEL # if only 'linear' this process is done and can return if growth_type.shape[0] == 0: return weights growth_type[PathSet.WEIGHTS_GROWTH_TYPE] = growth_type[PathSet.WEIGHTS_COLUMN_WEIGHT_NAME].str.extract('((?<=\.)\w+)', expand=False) growth_type[PathSet.WEIGHTS_COLUMN_WEIGHT_NAME] = growth_type[PathSet.WEIGHTS_COLUMN_WEIGHT_NAME].str.extract('(\w+(?=\.))', expand=False) weights = pd.concat([weights, growth_type]) if (~weights[PathSet.WEIGHTS_GROWTH_TYPE].isin(PathSet.PENALTY_GROWTH_MODELS)).any(): FastTripsLogger.fatal("Invalid qualifier type specified.") raise KeyError('Invalid qualifier type specified.') # if only 'constant' and 'exponential' this process is done and can return. if qualifiers.shape[0] == 0: return weights qualifiers = qualifiers.rename(columns={PathSet.WEIGHTS_COLUMN_WEIGHT_NAME: 'qualifier'}) qualifiers[PathSet.WEIGHTS_COLUMN_WEIGHT_NAME] = qualifiers['qualifier'].str.extract('(^\w+)', expand=False) qualifiers['variable'] = qualifiers['qualifier'].str.extract('(\w+$)', expand=False) qualifier_values = qualifiers.pivot(columns='variable', values=PathSet.WEIGHTS_COLUMN_WEIGHT_VALUE) qualifier_columns = qualifier_values.columns.values qualifiers = pd.concat([qualifiers, qualifier_values], axis=1) merge_cols = [ PathSet.WEIGHTS_COLUMN_USER_CLASS, PathSet.WEIGHTS_COLUMN_PURPOSE, PathSet.WEIGHTS_COLUMN_DEMAND_MODE_TYPE, PathSet.WEIGHTS_COLUMN_DEMAND_MODE, PathSet.WEIGHTS_COLUMN_SUPPLY_MODE, PathSet.WEIGHTS_COLUMN_WEIGHT_NAME, ] qualifiers = qualifiers.groupby(merge_cols)[qualifier_columns].max().reset_index() weights_df = pd.merge(weights, qualifiers, on=merge_cols, how='left') #Check for required logarithmic attributes and that they are all non-zero and positive if len(weights_df[weights_df[PathSet.WEIGHTS_GROWTH_TYPE] == PathSet.LOGARITHMIC_GROWTH_MODEL]) > 0: assert(PathSet.WEIGHTS_GROWTH_LOG_BASE in weights_df) assert( len(weights_df[ (weights_df[PathSet.WEIGHTS_GROWTH_TYPE] == PathSet.LOGARITHMIC_GROWTH_MODEL) & (weights_df[PathSet.WEIGHTS_GROWTH_LOG_BASE] <= 0) ]) == 0) if len(weights_df[weights_df[PathSet.WEIGHTS_GROWTH_TYPE] == PathSet.LOGISTIC_GROWTH_MODEL]) > 0: assert(PathSet.WEIGHTS_GROWTH_LOGISTIC_MID in weights_df) assert (PathSet.WEIGHTS_GROWTH_LOGISTIC_MAX in weights_df) assert( len(weights_df[ (weights_df[PathSet.WEIGHTS_GROWTH_TYPE] == PathSet.LOGISTIC_GROWTH_MODEL) & (weights_df[PathSet.WEIGHTS_GROWTH_LOGISTIC_MAX] <= 0) ]) == 0) assert ( len(weights_df[ (weights_df[PathSet.WEIGHTS_GROWTH_TYPE] == PathSet.LOGISTIC_GROWTH_MODEL) & (weights_df[PathSet.WEIGHTS_GROWTH_LOGISTIC_MID] <= 0) ]) == 0) return weights_df
[docs] @staticmethod def write_configuration(output_dir): """ Write the configuration parameters to function as a record with the output. """ parser = configparser.SafeConfigParser() parser.add_section('fasttrips') parser.set('fasttrips','input_demand_dir', Assignment.INPUT_DEMAND_DIR) parser.set('fasttrips','input_network_dir', Assignment.INPUT_NETWORK_ARCHIVE) parser.set('fasttrips','input_weights', Assignment.INPUT_WEIGHTS) if Assignment.CONFIGURATION_FUNCTIONS_FILE: parser.set('fasttrips','input_functions', Assignment.CONFIGURATION_FUNCTIONS_FILE) parser.set('fasttrips','run_config', Assignment.CONFIGURATION_FILE) parser.set('fasttrips','max_iterations', '%d' % Assignment.MAX_ITERATIONS) parser.set('fasttrips','max_pf_iterations', '%d' % Assignment.MAX_PF_ITERATIONS) parser.set('fasttrips','simulation', 'True' if Assignment.SIMULATION else 'False') parser.set('fasttrips','output_dir', Assignment.OUTPUT_DIR) parser.set('fasttrips','output_passenger_trajectories', 'True' if Assignment.OUTPUT_PASSENGER_TRAJECTORIES else 'False') parser.set('fasttrips','output_pathset_per_sim_iter', 'True' if Assignment.OUTPUT_PATHSET_PER_SIM_ITER else 'False') parser.set('fasttrips','create_skims', 'True' if Assignment.CREATE_SKIMS else 'False') parser.set('fasttrips','skim_start_time', Assignment.SKIM_START_TIME.strftime('%H:%M')) parser.set('fasttrips','skim_end_time', Assignment.SKIM_END_TIME.strftime('%H:%M')) parser.set('fasttrips','capacity_constraint', 'True' if Assignment.CAPACITY_CONSTRAINT else 'False') parser.set('fasttrips','skip_person_ids', '%s' % str(Assignment.SKIP_PERSON_IDS)) parser.set('fasttrips','trace_ids', '%s' % str(Assignment.TRACE_IDS)) parser.set('fasttrips','debug_trace_only', 'True' if Assignment.DEBUG_TRACE_ONLY else 'False') parser.set('fasttrips','debug_num_trips', '%d' % Assignment.DEBUG_NUM_TRIPS) parser.set('fasttrips','debug_output_columns', 'True' if Assignment.DEBUG_OUTPUT_COLUMNS else 'False') parser.set('fasttrips','fare_zone_symmetry', 'True' if Assignment.FARE_ZONE_SYMMETRY else 'False') parser.set('fasttrips','prepend_route_id_to_trip_id', 'True' if Assignment.PREPEND_ROUTE_ID_TO_TRIP_ID else 'False') parser.set('fasttrips','number_of_processes', '%d' % Assignment.NUMBER_OF_PROCESSES) parser.set('fasttrips','bump_buffer', '%f' % (Assignment.BUMP_BUFFER.total_seconds()/60.0)) parser.set('fasttrips','bump_one_at_a_time', 'True' if Assignment.BUMP_ONE_AT_A_TIME else 'False') #pathfinding parser.add_section('pathfinding') parser.set('pathfinding','max_num_paths', '%d' % Assignment.MAX_NUM_PATHS) parser.set('pathfinding','min_path_probability', '%f' % Assignment.MIN_PATH_PROBABILITY) parser.set('pathfinding','min_transfer_penalty', '%f' % PathSet.MIN_TRANSFER_PENALTY) parser.set('pathfinding','overlap_chunk_size', '%d' % PathSet.OVERLAP_CHUNK_SIZE) parser.set('pathfinding','overlap_scale_parameter', '%f' % PathSet.OVERLAP_SCALE_PARAMETER) parser.set('pathfinding','overlap_split_transit', 'True' if PathSet.OVERLAP_SPLIT_TRANSIT else 'False') parser.set('pathfinding','overlap_variable', '%s' % PathSet.OVERLAP_VARIABLE) parser.set('pathfinding','pathfinding_type', Assignment.PATHFINDING_TYPE) parser.set('pathfinding','pathweights_fixed_width', 'True' if PathSet.WEIGHTS_FIXED_WIDTH else 'False') parser.set('pathfinding','stochastic_dispersion', '%f' % Assignment.STOCH_DISPERSION) parser.set('pathfinding','utils_conversion_factor', '%f' % Assignment.UTILS_CONVERSION) parser.set('pathfinding','stochastic_max_stop_process_count', '%d' % Assignment.STOCH_MAX_STOP_PROCESS_COUNT) parser.set('pathfinding','stochastic_pathset_size', '%d' % Assignment.STOCH_PATHSET_SIZE) parser.set('pathfinding','time_window', '%f' % (Assignment.TIME_WINDOW.total_seconds()/60.0)) parser.set('pathfinding','transfer_fare_ignore_pathfinding', 'True' if Assignment.TRANSFER_FARE_IGNORE_PATHFINDING else 'False') parser.set('pathfinding','transfer_fare_ignore_pathenum', 'True' if Assignment.TRANSFER_FARE_IGNORE_PATHENUM else 'False') parser.set('pathfinding','user_class_function', '%s' % PathSet.USER_CLASS_FUNCTION) parser.set('pathfinding','arrive_late_allowed_min', '%f' % (PathSet.ARRIVE_LATE_ALLOWED_MIN.total_seconds()/60.0)) parser.set('pathfinding','depart_early_allowed_min', '%f' % (PathSet.DEPART_EARLY_ALLOWED_MIN.total_seconds()/60.0)) output_file = open(os.path.join(output_dir, Assignment.CONFIGURATION_OUTPUT_FILE), 'w') parser.write(output_file) output_file.close()
[docs] @staticmethod def initialize_fasttrips_extension(process_number, output_dir, stop_times_df): """ Initialize the C++ fasttrips extension by passing it the network supply. """ FastTripsLogger.debug("Initializing fasttrips extension for process number %d" % process_number) # this may not be set yet if it is iter1 overcap_col = Trip.SIM_COL_VEH_OVERCAP if Assignment.MSA_RESULTS: overcap_col = Trip.SIM_COL_VEH_MSA_OVERCAP if overcap_col not in list(stop_times_df.columns.values): stop_times_df[overcap_col] = 0 FastTripsLogger.debug("initialize_fasttrips_extension() overcap sum: %d" % stop_times_df[overcap_col].sum()) FastTripsLogger.debug("initialize_fasttrips_extension() STOPTIMES_COLUMN_DEPARTURE_TIME_MIN len: %d mean: %f" % \ (len(stop_times_df), stop_times_df[Trip.STOPTIMES_COLUMN_DEPARTURE_TIME_MIN].mean())) _fasttrips.initialize_supply(output_dir, process_number, stop_times_df[[Trip.STOPTIMES_COLUMN_TRIP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, Trip.STOPTIMES_COLUMN_STOP_ID_NUM]].as_matrix().astype('int32'), stop_times_df[[Trip.STOPTIMES_COLUMN_ARRIVAL_TIME_MIN, Trip.STOPTIMES_COLUMN_DEPARTURE_TIME_MIN, Trip.STOPTIMES_COLUMN_SHAPE_DIST_TRAVELED, overcap_col]].as_matrix().astype('float64')) _fasttrips.initialize_parameters(Assignment.TIME_WINDOW.total_seconds()/ 60.0, Assignment.BUMP_BUFFER.total_seconds()/ 60.0, Assignment.UTILS_CONVERSION, PathSet.DEPART_EARLY_ALLOWED_MIN.total_seconds()/ 60.0, PathSet.ARRIVE_LATE_ALLOWED_MIN.total_seconds()/ 60.0, Assignment.STOCH_PATHSET_SIZE, Assignment.STOCH_DISPERSION, Assignment.STOCH_MAX_STOP_PROCESS_COUNT, 1 if Assignment.TRANSFER_FARE_IGNORE_PATHFINDING else 0, 1 if Assignment.TRANSFER_FARE_IGNORE_PATHENUM else 0, Assignment.MAX_NUM_PATHS, Assignment.MIN_PATH_PROBABILITY)
[docs] @staticmethod def set_fasttrips_bump_wait(bump_wait_df): """ Sends the bump wait information to the fasttrips extension """ # send a clear message? if type(bump_wait_df)==type(None): return if len(bump_wait_df) == 0: return _fasttrips.set_bump_wait(bump_wait_df[[Trip.STOPTIMES_COLUMN_TRIP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, Trip.STOPTIMES_COLUMN_STOP_ID_NUM]].as_matrix().astype('int32'), bump_wait_df[Passenger.PF_COL_PAX_A_TIME_MIN].values.astype('float64'))
[docs] @staticmethod def write_vehicle_trips(output_dir, iteration, pathfinding_iteration, simulation_iteration, veh_trips_df): """ """ columns = ["iteration", # we'll add "pathfinding_iteration", "simulation_iteration", Trip.TRIPS_COLUMN_DIRECTION_ID, Trip.TRIPS_COLUMN_SERVICE_ID, Trip.TRIPS_COLUMN_ROUTE_ID, Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, Trip.STOPTIMES_COLUMN_STOP_ID, Trip.STOPTIMES_COLUMN_ARRIVAL_TIME, Trip.STOPTIMES_COLUMN_ARRIVAL_TIME_MIN, Trip.STOPTIMES_COLUMN_DEPARTURE_TIME, Trip.STOPTIMES_COLUMN_DEPARTURE_TIME_MIN, Trip.STOPTIMES_COLUMN_TRAVEL_TIME_SEC, Trip.STOPTIMES_COLUMN_DWELL_TIME_SEC, Trip.VEHICLES_COLUMN_TOTAL_CAPACITY, Trip.SIM_COL_VEH_BOARDS, Trip.SIM_COL_VEH_ALIGHTS, Trip.SIM_COL_VEH_ONBOARD, Trip.SIM_COL_VEH_STANDEES, Trip.SIM_COL_VEH_FRICTION, Trip.SIM_COL_VEH_OVERCAP, # Trip.SIM_COL_VEH_MSA_BOARDS, # Trip.SIM_COL_VEH_MSA_ALIGHTS, # Trip.SIM_COL_VEH_MSA_ONBOARD, # Trip.SIM_COL_VEH_MSA_STANDEES, # Trip.SIM_COL_VEH_MSA_FRICTION, # Trip.SIM_COL_VEH_MSA_OVERCAP ] # these may not be in there since they're optional for optional_col in [Trip.TRIPS_COLUMN_DIRECTION_ID, Trip.VEHICLES_COLUMN_TOTAL_CAPACITY]: if optional_col not in veh_trips_df.columns.values: columns.remove(optional_col) veh_trips_df[ "iteration"] = iteration veh_trips_df["pathfinding_iteration"] = pathfinding_iteration veh_trips_df[ "simulation_iteration"] = simulation_iteration Util.write_dataframe(veh_trips_df[columns], "veh_trips_df", os.path.join(output_dir, "veh_trips.csv"), append=(iteration>0 or pathfinding_iteration>0)) veh_trips_df.drop(["iteration","pathfinding_iteration","simulation_iteration"], axis=1, inplace=True)
[docs] @staticmethod def merge_pathsets(pathfind_trip_list_df, pathset_paths_df, pathset_links_df, new_pathset_paths_df, new_pathset_links_df): """ Merge the given new pathset paths and links into the existing """ FastTripsLogger.debug("merge_pathsets(): pathset_paths_df len=%d head=\n%s" % (len( pathset_paths_df), pathset_paths_df.head().to_string())) FastTripsLogger.debug("merge_pathsets(): new_pathset_paths_df len=%d head=\n%s" % (len(new_pathset_paths_df), new_pathset_paths_df.head().to_string())) FastTripsLogger.debug("merge_pathsets() dtypes=\n%s" % str(pathset_paths_df.dtypes)) FastTripsLogger.debug("merge_pathsets(): pathset_links_df len=%d head=\n%s" % (len( pathset_links_df), pathset_links_df.head().to_string())) FastTripsLogger.debug("merge_pathsets(): new_pathset_links_df len=%d head=\n%s" % (len(new_pathset_links_df), new_pathset_links_df.head().to_string())) FastTripsLogger.debug("merge_pathsets() dtypes=\n%s" % str(pathset_links_df.dtypes)) FastTripsLogger.debug("merge_pathsets(): pathfind_trip_list_df len=%d head=\n%s" % (len( pathfind_trip_list_df), pathfind_trip_list_df.head().to_string())) # TODO: This might be inefficient... # filter out the new pathset person trips from pathset_paths_df pathset_paths_df = pd.merge(left =pathset_paths_df, right =pathfind_trip_list_df[[Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM]], how ="left", indicator=True) pathset_paths_df = pathset_paths_df.loc[pathset_paths_df["_merge"]=="left_only"] pathset_paths_df.drop(["_merge"], axis=1, inplace=True) FastTripsLogger.debug("Filtered to %d pathset_paths_df rows" % len(pathset_paths_df)) # TODO: error prone, make this cleaner with where it's initialized elsewhere new_pathset_paths_df[Assignment.SIM_COL_PAX_CHOSEN ] = pd.Categorical([Assignment.CHOSEN_NOT_CHOSEN_YET]*len(new_pathset_paths_df), categories=Assignment.CHOSEN_CATEGORIES, ordered=True) new_pathset_paths_df[Assignment.SIM_COL_MISSED_XFER] = 0 # append pathset_paths_df = pd.concat([pathset_paths_df, new_pathset_paths_df], axis=0) FastTripsLogger.debug("Concatenated so pathset_paths_df has %d rows" % len(pathset_paths_df)) # filter out the new pathset person trips from pathset_links_df pathset_links_df = pd.merge(left =pathset_links_df, right =pathfind_trip_list_df[[Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM]], how ="left", indicator=True) pathset_links_df = pathset_links_df.loc[pathset_links_df["_merge"]=="left_only"] pathset_links_df.drop(["_merge"], axis=1, inplace=True) FastTripsLogger.debug("Filtered to %d pathset_links_df rows" % len(pathset_links_df)) # append pathset_links_df = pd.concat([pathset_links_df, new_pathset_links_df], axis=0) FastTripsLogger.debug("Concatenated so pathset_links_df has %d rows" % len(pathset_links_df)) FastTripsLogger.debug("merge_pathsets(): pathset_paths_df len=%d head=\n%s\ntail=\n%s" % (len(pathset_paths_df), pathset_paths_df.head().to_string(),pathset_paths_df.tail().to_string())) FastTripsLogger.debug("merge_pathsets(): pathset_links_df len=%d head=\n%s\ntail=\n%s" % (len(pathset_links_df), pathset_links_df.head().to_string(),pathset_links_df.tail().to_string())) # done with this return (pathset_paths_df, pathset_links_df)
[docs] @staticmethod def number_of_pathsets(pathset_paths_df): """ Counts the number of passenger trips with pathsets and returns it. """ return len(pathset_paths_df.groupby([Passenger.PERSONS_COLUMN_PERSON_ID,Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID]))
[docs] @staticmethod def assign_paths(output_dir, FT): """ Finds the paths for the passengers. """ # clear any state _fasttrips.reset() # write the initial load profile, iteration 0 veh_trips_df = FT.trips.get_full_trips() pathset_paths_df = None pathset_links_df = None last_chosen_df = pd.DataFrame(columns=[ Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION ]) success_df = pd.DataFrame(columns=[ Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION, PathSet.SUCCESS_FLAG_COLUMN ]) bump_df = pd.DataFrame(columns=[ Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION, PathSet.BUMP_FLAG_COLUMN ]) success_df[PathSet.SUCCESS_FLAG_COLUMN] = success_df[PathSet.SUCCESS_FLAG_COLUMN].astype(np.float64) bump_df[PathSet.BUMP_FLAG_COLUMN] = bump_df[PathSet.BUMP_FLAG_COLUMN].astype(np.float64) # write 0-iter vehicle trips Assignment.write_vehicle_trips(output_dir, 0, 0, 0, veh_trips_df) for iteration in range(1, Assignment.MAX_ITERATIONS+1): for pathfinding_iteration in range(1, Assignment.MAX_PF_ITERATIONS + 1): # First pathfinding_iteration, find paths for everyone if pathfinding_iteration == 1: Assignment.PATHFINDING_EVERYONE = True # Subsequent: just find paths for those without paths else: Assignment.PATHFINDING_EVERYONE = False FastTripsLogger.info("***************************** ITERATION %d PATHFINDING ITERATION %d **************************************" % (iteration, pathfinding_iteration)) if (Assignment.PATHFINDING_TYPE == Assignment.PATHFINDING_TYPE_READ_FILE) and (iteration == 1) and (pathfinding_iteration == 1): FastTripsLogger.info("Reading paths from file") FT.performance.record_step_start(iteration, pathfinding_iteration, -1, "pathreading") (new_pathset_paths_df, new_pathset_links_df) = FT.passengers.read_passenger_pathsets(output_dir, FT.stops, FT.routes.modes_df, include_asgn=False) num_new_paths_found = Assignment.number_of_pathsets(new_pathset_paths_df) # todo: what about subsequent iterations Assignment.PATHFINDING_TYPE = Assignment.PATHFINDING_TYPE_STOCHASTIC else: FT.performance.record_step_start(iteration, pathfinding_iteration, -1, "pathfinding") num_new_paths_found = Assignment.generate_pathsets(FT, pathset_paths_df, veh_trips_df, output_dir, iteration, pathfinding_iteration) (new_pathset_paths_df, new_pathset_links_df) = FT.passengers.setup_passenger_pathsets(iteration, pathfinding_iteration, FT.stops, FT.trips.trip_id_df, FT.trips.trips_df, FT.routes.modes_df, FT.transfers, FT.tazs, Assignment.PREPEND_ROUTE_ID_TO_TRIP_ID) # write pathfinding results to special PF results file Passenger.write_paths(output_dir, iteration, pathfinding_iteration, -1, new_pathset_paths_df, False, Assignment.OUTPUT_PATHSET_PER_SIM_ITER, not Assignment.DEBUG_OUTPUT_COLUMNS, False) Passenger.write_paths(output_dir, iteration, pathfinding_iteration, -1, new_pathset_links_df, True, Assignment.OUTPUT_PATHSET_PER_SIM_ITER, not Assignment.DEBUG_OUTPUT_COLUMNS, False) # write performance info right away in case we crash, quit, etc FT.performance.write_pathfinding(output_dir, append=((iteration>1) or (pathfinding_iteration>1))) # If we found paths for everyone, excellent if Assignment.PATHFINDING_EVERYONE: pathset_paths_df = new_pathset_paths_df pathset_links_df = new_pathset_links_df # Otherwise, merge with those for whom we already have else: (pathset_paths_df, pathset_links_df) = Assignment.merge_pathsets(FT.passengers.pathfind_trip_list_df, pathset_paths_df, pathset_links_df, new_pathset_paths_df, new_pathset_links_df) # if we have new paths, simulate them if num_new_paths_found > 0: pathset_paths_df, pathset_links_df = Assignment.merge_prior_choices(pathset_paths_df, pathset_links_df, success_df) if Assignment.SIMULATION: FastTripsLogger.info("***************************** ITERATION %d PATHFINDING ITERATION %d *** SIMULATING ***********************" % (iteration, pathfinding_iteration)) FT.performance.record_step_start(iteration, pathfinding_iteration, -1, "simulating") (num_passengers_arrived, pathset_paths_df, pathset_links_df, veh_trips_df) = \ Assignment.simulate(FT, output_dir, iteration, pathfinding_iteration, pathset_paths_df, pathset_links_df, veh_trips_df) else: # if we're not simulating, we can still calculate costs and choose paths FastTripsLogger.info("***************************** ITERATION %d PATHFINDING ITERATION %d *****CHOOSING PATHS WITHOUT SIMULATING" % (iteration, pathfinding_iteration)) FT.performance.record_step_start(iteration, pathfinding_iteration, -1, "choosing_without_simulating") (num_passengers_arrived, pathset_paths_df, pathset_links_df) = \ Assignment.choose_paths_without_simulation(FT, output_dir, iteration, pathfinding_iteration, pathset_paths_df, pathset_links_df, veh_trips_df) FT.performance.record_step_start(iteration, pathfinding_iteration, -1, "output_per_pathfinding_iteration") # Set new schedule FT.trips.stop_times_df = veh_trips_df # todo: pass back correct simulation iteration? Assignment.write_vehicle_trips(output_dir, iteration, pathfinding_iteration, "final", veh_trips_df) if Assignment.OUTPUT_PASSENGER_TRAJECTORIES: PathSet.write_path_times(Passenger.get_chosen_links(pathset_links_df), output_dir) # capacity gap stuff num_paths_found = Assignment.number_of_pathsets(pathset_paths_df) num_bumped_passengers = num_paths_found - num_passengers_arrived FastTripsLogger.info("") FastTripsLogger.info(" Length of trip list: %10d" % len(FT.passengers.trip_list_df)) FastTripsLogger.info(" Number of pathsets found: %10d" % num_paths_found) FastTripsLogger.info(" ARRIVED PASSENGERS: %10d" % num_passengers_arrived) FastTripsLogger.info(" MISSED PASSENGERS: %10d" % num_bumped_passengers) FT.performance.record_step_end(iteration, pathfinding_iteration, -1) # if no new paths found, pathfinding_iteration loop is done if num_new_paths_found == 0: break success_df, bump_df = Assignment.save_choices(pathset_paths_df, success_df, bump_df) new_choices, last_chosen_df = Assignment.compare_choices(pathset_paths_df, last_chosen_df) capacity_gap = (1.0 * new_choices + num_bumped_passengers) / len(FT.passengers.trip_list_df) FastTripsLogger.info("###OUTER LOOP: Iteration {}###".format(iteration)) FastTripsLogger.info(" CAPACITY GAP: %10.5f" % capacity_gap) FastTripsLogger.info(" NEW CHOICE FROM PRIOR: %10d" % new_choices) success_df, bump_df = Assignment.save_choices(pathset_paths_df, success_df, bump_df) new_choices, last_chosen_df = Assignment.compare_choices(pathset_paths_df, last_chosen_df) capacity_gap = (1.0 * new_choices + num_bumped_passengers) / len(FT.passengers.trip_list_df) FastTripsLogger.info("###OUTER LOOP: Iteration {}###".format(iteration)) FastTripsLogger.info(" CAPACITY GAP: %10.5f" % capacity_gap) FastTripsLogger.info(" NEW CHOICE FROM PRIOR: %10d" % new_choices) success_df, bump_df = Assignment.save_choices(pathset_paths_df, success_df, bump_df) new_choices, last_chosen_df = Assignment.compare_choices(pathset_paths_df, last_chosen_df) capacity_gap = (1.0 * new_choices + num_bumped_passengers) / len(FT.passengers.trip_list_df) FastTripsLogger.info("###OUTER LOOP: Iteration {}###".format(iteration)) FastTripsLogger.info(" CAPACITY GAP: %10.5f" % capacity_gap) FastTripsLogger.info(" NEW CHOICE FROM PRIOR: %10d" % new_choices) # end condition for iterations loop if capacity_gap < Assignment.CONVERGENCE_GAP: break # end for loop return {"capacity_gap": capacity_gap, "paths_found": num_paths_found, "passengers_arrived": num_passengers_arrived, "passengers_missed": num_bumped_passengers, "passengers_demand": len(FT.passengers.trip_list_df) }
[docs] @staticmethod def compare_choices(pathset_paths_df, prior_choice): """ Reads in a pathset_path_df and the pathset_path_df of a prior iteration and returns the number of new choices made in the current iteration along with a simplified chosen df to use in the next iteration. :param pathset_paths_df: Current iteration of Pathset_Paths_DF :param prior_choice: Pathset_Paths_DF of a prior iteration. :return: new_choices_count: Total number of new choices in this iteration compared to the input prior choice df. chosen: Simplified DF of 'person_id', 'person_trip_id_num', and 'description' that can be used in subsequent invocations of this method. """ chosen = Passenger.get_chosen_links(pathset_paths_df)[ [Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION] ] match_choices = pd.merge(chosen, prior_choice, on=[ Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION], how='inner') new_choices_count = chosen.shape[0] - match_choices.shape[0] return new_choices_count, chosen
[docs] @staticmethod def save_choices(pathset_paths_df, success_df, bump_df): """ Returns an updated count of successfully chosen paths and paths that were bumped for each user. :param pathset_paths_df: Current iteration of pathset_paths_df after simulation. :param success_df: Count of chosen paths for each user across prior iterations :param bump_df: Count of bumped paths for each user across prior iterations :return: success_df: Updated count of chosen paths for each user across each iteration bump_df: Updated count of bumped paths for each user across each iteration """ chosen = Passenger.get_chosen_links(pathset_paths_df) iter_bump_df = chosen[chosen[Assignment.SIM_COL_PAX_BUMP_ITER] >= 0][ [Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION] ] iter_bump_df[PathSet.BUMP_FLAG_COLUMN] = 1 iter_bump_df = pd.concat([bump_df, iter_bump_df]) bump_df = iter_bump_df.groupby([Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION])[PathSet.BUMP_FLAG_COLUMN].sum().reset_index() iter_success_df = chosen[chosen[Assignment.SIM_COL_PAX_BUMP_ITER].isnull()][ [Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION] ] iter_success_df[PathSet.SUCCESS_FLAG_COLUMN] = 1 iter_success_df = pd.concat([success_df, iter_success_df]) success_df = iter_success_df.groupby([Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION] )[PathSet.SUCCESS_FLAG_COLUMN].sum().reset_index() return success_df, bump_df
[docs] @staticmethod def merge_prior_choices(pathset_paths_df, pathset_links_df, flag_df): """ Join success or bump flag count information onto a pathset_paths_df and pathset_links_df :param pathset_paths_df: Current iteration of pathset_paths_df. :param pathset_links_df: Current iteration of pathset_links_df :param flag_df: success or bump df :return: pathset_paths_df and pathset_links_df with additional flag column. """ assert((PathSet.SUCCESS_FLAG_COLUMN in flag_df) or (PathSet.BUMP_FLAG_COLUMN in flag_df)) flag_col = PathSet.SUCCESS_FLAG_COLUMN if PathSet.SUCCESS_FLAG_COLUMN in flag_df else PathSet.BUMP_FLAG_COLUMN if flag_col in pathset_paths_df: pathset_paths_df.drop(labels=[flag_col], axis=1, inplace=True) if flag_col in pathset_links_df: pathset_links_df.drop(labels=[flag_col], axis=1, inplace=True) pathset_paths_df = pd.merge(pathset_paths_df, flag_df, on = [ Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_DESCRIPTION], how = 'left') pathset_paths_df.loc[pathset_paths_df[flag_col].isnull(), flag_col] = 0 pathset_links_df = pd.merge(pathset_links_df,pathset_paths_df[ ['trip_list_id_num', Passenger.PF_COL_PATH_NUM, flag_col, ]], on = ['trip_list_id_num', Passenger.PF_COL_PATH_NUM], how = 'left') return pathset_paths_df, pathset_links_df
[docs] @staticmethod def filter_trip_list_to_not_arrived(trip_list_df, pathset_paths_df): """ Filter the given trip list to only those that have not arrived according to *pathset_paths_df*. """ FastTripsLogger.debug("filter_trip_list_to_not_arrived(): trip_list_df len=%d head()=\n%s" % (len(trip_list_df), trip_list_df.head().to_string())) FastTripsLogger.debug("filter_trip_list_to_not_arrived(): pathset_paths_df len=%d head()=\n%s" % (len(pathset_paths_df), pathset_paths_df.head().to_string())) FastTripsLogger.debug("pathset_paths_df.dtypes") # filter to only the chosen paths chosen_paths_df = pathset_paths_df.loc[pathset_paths_df[Assignment.SIM_COL_PAX_CHOSEN] > Assignment.CHOSEN_NOT_CHOSEN_YET, [Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM, Assignment.SIM_COL_PAX_CHOSEN]] # add chosen index trip_list_df_to_return = pd.merge(left =trip_list_df, right =chosen_paths_df, how ="left") # use it to filter to null chosen trip_list_df_to_return = trip_list_df_to_return.loc[pd.isnull(trip_list_df_to_return[Assignment.SIM_COL_PAX_CHOSEN])] # remove chosen column trip_list_df_to_return.drop([Assignment.SIM_COL_PAX_CHOSEN], axis=1, inplace=True) FastTripsLogger.debug("filter_trip_list_to_not_arrived(): trip_list_df_to_return len=%d head()=\n%s" % (len(trip_list_df_to_return), trip_list_df_to_return.head().to_string())) return trip_list_df_to_return
[docs] @staticmethod def generate_pathsets(FT, pathset_paths_df, veh_trips_df, output_dir, iteration, pathfinding_iteration): """ Figures out which person trips for whom to generate_pathsets, stored in :py:attr:`Passenger.pathfind_trip_list_df` Generates paths sets for those person trips using deterministic trip-based shortest path (TBSP) or stochastic trip-based hyperpath (TBHP). Returns the number of pathsets found. """ FastTripsLogger.info("**************************** GENERATING PATHS **********************************************************") start_time = datetime.datetime.now() process_dict = {} # workernum -> {"process":process, "alive":alive bool, "done":done bool, "working_on":(person_id, trip_list_num)} todo_queue = None done_queue = None # We only need to do this once if iteration == 1 and pathfinding_iteration == 1: if Assignment.DEBUG_TRACE_ONLY: FT.passengers.trip_list_df = FT.passengers.trip_list_df.loc[FT.passengers.trip_list_df[Passenger.TRIP_LIST_COLUMN_TRACE]==True] else: if Assignment.DEBUG_NUM_TRIPS > 0 and len(FT.passengers.trip_list_df) > Assignment.DEBUG_NUM_TRIPS: FastTripsLogger.info("Truncating trip list to %d trips" % Assignment.DEBUG_NUM_TRIPS) FT.passengers.trip_list_df = FT.passengers.trip_list_df.iloc[:Assignment.DEBUG_NUM_TRIPS] # Skip someone? if Assignment.SKIP_PERSON_IDS and len(Assignment.SKIP_PERSON_IDS) > 0: FT.passengers.trip_list_df = FT.passengers.trip_list_df.loc[~FT.passengers.trip_list_df[Passenger.TRIP_LIST_COLUMN_PERSON_ID].isin(Assignment.SKIP_PERSON_IDS)] # these are the trips for which we'll find paths FT.passengers.pathfind_trip_list_df = FT.passengers.trip_list_df if Assignment.PATHFINDING_EVERYONE: # we're starting over with empty vehicles Trip.reset_onboard(veh_trips_df) else: FastTripsLogger.info("Finding paths for trips for those that haven't arrived yet") FT.passengers.pathfind_trip_list_df = Assignment.filter_trip_list_to_not_arrived(FT.passengers.trip_list_df, pathset_paths_df) est_paths_to_find = len(FT.passengers.pathfind_trip_list_df) FastTripsLogger.info("Finding pathsets for %d trips" % est_paths_to_find) if est_paths_to_find == 0: return 0 info_freq = pow(10, int(math.log(est_paths_to_find+1,10)-1)) if info_freq < 1: info_freq = 1 # info_freq = 1 # DEBUG CRASH num_processes = Assignment.NUMBER_OF_PROCESSES if Assignment.NUMBER_OF_PROCESSES < 1: num_processes = multiprocessing.cpu_count() # it's not worth it unless each process does 3 if num_processes > est_paths_to_find*3: num_processes = int(est_paths_to_find//3) # this is probalby time consuming... put in a try block try: # Setup multiprocessing processes if num_processes > 1: todo_queue = multiprocessing.Queue() done_queue = multiprocessing.Queue() for process_idx in range(1, 1+num_processes): FastTripsLogger.info("Starting worker process %2d" % process_idx) process_dict[process_idx] = { "process":multiprocessing.Process(target=find_trip_based_paths_process_worker, args=(iteration, pathfinding_iteration, process_idx, Assignment.INPUT_NETWORK_ARCHIVE, Assignment.INPUT_DEMAND_DIR, Assignment.CONFIGURATION_FILE, Assignment.CONFIGURATION_FUNCTIONS_FILE, Assignment.OUTPUT_DIR, todo_queue, done_queue, Assignment.PATHFINDING_TYPE==Assignment.PATHFINDING_TYPE_STOCHASTIC, Assignment.bump_wait_df, veh_trips_df)), "alive":True, "done":False } process_dict[process_idx]["process"].start() else: Assignment.initialize_fasttrips_extension(0, output_dir, veh_trips_df) # process tasks or send tasks to workers for processing num_paths_found_prev = 0 num_paths_found_now = 0 num_paths_sought = 0 path_cols = list(FT.passengers.pathfind_trip_list_df.columns.values) for path_tuple in FT.passengers.pathfind_trip_list_df.itertuples(index=False): path_dict = dict(list(zip(path_cols, path_tuple))) trip_list_id = path_dict[Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM] person_id = path_dict[Passenger.TRIP_LIST_COLUMN_PERSON_ID] person_trip_id = path_dict[Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID] do_trace = path_dict[Passenger.TRIP_LIST_COLUMN_TRACE] if Assignment.DEBUG_TRACE_ONLY and not do_trace: continue # first iteration -- create path objects if iteration==1: trip_pathset = PathSet(path_dict) FT.passengers.add_pathset(trip_list_id, trip_pathset) else: trip_pathset = FT.passengers.get_pathset(trip_list_id) if not trip_pathset.goes_somewhere(): continue # find pathsets for everyone -- dwell times have changed # if iteration > 1 and trip_list_id not in Assignment.bumped_trip_list_nums: # num_paths_found_prev += 1 # continue if num_processes > 1: todo_queue.put( trip_pathset ) else: if do_trace: FastTripsLogger.debug("Tracing assignment of person_id %s and trip %s" % (person_id, person_trip_id)) # do the work (pathdict, perf_dict) = \ Assignment.find_trip_based_pathset(iteration, pathfinding_iteration, trip_pathset, Assignment.PATHFINDING_TYPE==Assignment.PATHFINDING_TYPE_STOCHASTIC, trace=do_trace) num_paths_sought += 1 trip_pathset.pathdict = pathdict FT.performance.add_info(iteration, pathfinding_iteration, person_id, person_trip_id, perf_dict) if trip_pathset.path_found(): num_paths_found_now += 1 if num_paths_sought % info_freq == 0: time_elapsed = datetime.datetime.now() - start_time FastTripsLogger.info(" %6d paths sought, %6d paths found of %d paths total. Time elapsed: %2dh:%2dm:%2ds" % ( num_paths_sought, num_paths_found_now, est_paths_to_find, int( time_elapsed.total_seconds()/ 3600), int( (time_elapsed.total_seconds() % 3600)/ 60), time_elapsed.total_seconds() % 60)) # multiprocessing follow-up if num_processes > 1: # we're done, let each process know for process_idx in list(process_dict.keys()): todo_queue.put('DONE') # get results done_procs = 0 # where done means not alive while done_procs < len(process_dict): try: result = done_queue.get(True, 30) worker_num = result[0] # FastTripsLogger.debug("Received %s" % str(result)) if result[1] == "DONE": FastTripsLogger.debug("Received done from process %d" % worker_num) process_dict[worker_num]["done"] = True elif result[1] == "STARTING": process_dict[worker_num]["working_on"] = (result[2],result[3]) elif result[1] == "COMPLETED": trip_list_id = result[2] pathset = FT.passengers.get_pathset(trip_list_id) pathset.pathdict= result[3] perf_dict = result[4] FT.performance.add_info(iteration, pathfinding_iteration, pathset.person_id, pathset.person_trip_id, perf_dict) num_paths_sought += 1 if pathset.path_found(): num_paths_found_now += 1 if num_paths_sought % info_freq == 0: time_elapsed = datetime.datetime.now() - start_time FastTripsLogger.info(" %6d paths sought, %6d paths found of %d paths total. Time elapsed: %2dh:%2dm:%2ds" % ( num_paths_sought, num_paths_found_now, est_paths_to_find, int( time_elapsed.total_seconds()/ 3600), int( (time_elapsed.total_seconds() % 3600)/ 60), time_elapsed.total_seconds() % 60)) del process_dict[worker_num]["working_on"] else: print("Unexpected done queue contents: " + str(result)) except queue.Empty: # This is normal pass except: FastTripsLogger.error("Caught exception: %s" % str(sys.exc_info())) pass # check if any processes are not alive for process_idx in list(process_dict.keys()): if process_dict[process_idx]["alive"] and not process_dict[process_idx]["process"].is_alive(): FastTripsLogger.debug("Process %d is not alive" % process_idx) process_dict[process_idx]["alive"] = False done_procs += 1 # join up my processes for process_idx in list(process_dict.keys()): process_dict[process_idx]["process"].join() # check if any processes crashed for process_idx in list(process_dict.keys()): if not process_dict[process_idx]["done"]: if "working_on" in process_dict[process_idx]: FastTripsLogger.info("Process %d appears to have crashed; it was working on %s" % \ (process_idx, str(process_dict[process_idx]["working_on"]))) else: FastTripsLogger.info("Process %d appears to have crashed; see ft_debug_worker%02d.log" % (process_idx, process_idx)) except (KeyboardInterrupt, SystemExit): exc_type, exc_value, exc_tb = sys.exc_info() FastTripsLogger.error("Exception caught: %s" % str(exc_type)) error_lines = traceback.format_exception(exc_type, exc_value, exc_tb) for e in error_lines: FastTripsLogger.error(e) FastTripsLogger.error("Terminating processes") # terminating my processes for proc in process_dict: proc.terminate() raise except: # some other error exc_type, exc_value, exc_tb = sys.exc_info() error_lines = traceback.format_exception(exc_type, exc_value, exc_tb) for e in error_lines: FastTripsLogger.error(e) raise time_elapsed = datetime.datetime.now() - start_time FastTripsLogger.info("Finished finding %6d passenger paths. Time elapsed: %2dh:%2dm:%2ds" % ( num_paths_found_now, int( time_elapsed.total_seconds()/ 3600), int( (time_elapsed.total_seconds() % 3600)/ 60), time_elapsed.total_seconds() % 60)) return num_paths_found_now + num_paths_found_prev
[docs] @staticmethod def find_trip_based_pathset(iteration, pathfinding_iteration, pathset, hyperpath, trace): """ Perform trip-based path set search. Will do so either backwards (destination to origin) if :py:attr:`PathSet.direction` is :py:attr:`PathSet.DIR_OUTBOUND` or forwards (origin to destination) if :py:attr:`PathSet.direction` is :py:attr:`PathSet.DIR_INBOUND`. Returns (pathdict, performance_dict) Where pathdict maps {pathnum:{PATH_KEY_COST:cost, PATH_KEY_PROBABILITY:probability, PATH_KEY_STATES:[state list]}} Where performance_dict includes: pathfinding return status, number of label iterations, max number of times a stop was processed, seconds spent in labeling, seconds spend in enumeration :param pathset: the path to fill in :type pathset: a :py:class:`PathSet` instance :param hyperpath: pass True to use a stochastic hyperpath-finding algorithm, otherwise a deterministic shortest path search algorithm will be use. :type hyperpath: bool :param trace: pass True if this path should be traced to the debug log :type trace: bool """ # FastTripsLogger.debug("C++ extension start") # send it to the C++ extension (ret_ints, ret_doubles, path_costs, process_num, pf_returnstatus, label_iterations, num_labeled_stops, max_label_process_count, ms_labeling, ms_enumerating, bytes_workingset, bytes_privateusage, mem_timestamp) = \ _fasttrips.find_pathset(iteration, pathfinding_iteration, hyperpath, pathset.person_id, pathset.person_trip_id, pathset.user_class, pathset.purpose, pathset.access_mode, pathset.transit_mode, pathset.egress_mode, pathset.o_taz_num, pathset.d_taz_num, 1 if pathset.outbound else 0, float(pathset.pref_time_min), pathset.vot, 1 if trace else 0) # FastTripsLogger.debug("C++ extension complete") FastTripsLogger.debug("Finished finding path for person %s trip %s" % (pathset.person_id, pathset.person_trip_id)) pathdict = {} row_num = 0 for path_num in range(path_costs.shape[0]): pathdict[path_num] = {} pathdict[path_num][PathSet.PATH_KEY_COST ] = path_costs[path_num, 0] pathdict[path_num][PathSet.PATH_KEY_FARE ] = path_costs[path_num, 1] pathdict[path_num][PathSet.PATH_KEY_PROBABILITY] = path_costs[path_num, 2] pathdict[path_num][PathSet.PATH_KEY_INIT_COST ] = path_costs[path_num, 3] pathdict[path_num][PathSet.PATH_KEY_INIT_FARE ] = path_costs[path_num, 4] # List of (stop_id, stop_state) pathdict[path_num][PathSet.PATH_KEY_STATES ] = [] # print "path_num %d" % path_num # while we have unprocessed rows and the row is still relevant for this path_num while (row_num < ret_ints.shape[0]) and (ret_ints[row_num, 0] == path_num): # print row_num mode = ret_ints[row_num,2] # todo if mode == -100: mode = PathSet.STATE_MODE_ACCESS elif mode == -101: mode = PathSet.STATE_MODE_EGRESS elif mode == -102: mode = PathSet.STATE_MODE_TRANSFER elif mode == -103: mode = Passenger.MODE_GENERIC_TRANSIT_NUM if hyperpath: pathdict[path_num][PathSet.PATH_KEY_STATES].append( (ret_ints[row_num, 1], [ ret_doubles[row_num,0], # label, Assignment.NETWORK_BUILD_DATE_START_TIME + datetime.timedelta(minutes=ret_doubles[row_num,1]), # departure/arrival time mode, # departure/arrival mode ret_ints[row_num,3], # trip id ret_ints[row_num,4], # successor/predecessor ret_ints[row_num,5], # sequence ret_ints[row_num,6], # sequence succ/pred datetime.timedelta(minutes=ret_doubles[row_num,2]), # link time ret_doubles[row_num,3], # link fare ret_doubles[row_num,4], # link cost ret_doubles[row_num,5], # link distance ret_doubles[row_num,6], # cost Assignment.NETWORK_BUILD_DATE_START_TIME + datetime.timedelta(minutes=ret_doubles[row_num,7]) # arrival/departure time ] ) ) else: pathdict[path_num][PathSet.PATH_KEY_STATES].append( (ret_ints[row_num, 1], [ datetime.timedelta(minutes=ret_doubles[row_num,0]), # label, Assignment.NETWORK_BUILD_DATE_START_TIME + datetime.timedelta(minutes=ret_doubles[row_num,1]), # departure/arrival time mode, # departure/arrival mode ret_ints[row_num,3], # trip id ret_ints[row_num,4], # successor/predecessor ret_ints[row_num,5], # sequence ret_ints[row_num,6], # sequence succ/pred datetime.timedelta(minutes=ret_doubles[row_num,2]), # link time ret_doubles[row_num,3], # link fare datetime.timedelta(minutes=ret_doubles[row_num,4]), # link cost ret_doubles[row_num,5], # link dist datetime.timedelta(minutes=ret_doubles[row_num,6]), # cost Assignment.NETWORK_BUILD_DATE_START_TIME + datetime.timedelta(minutes=ret_doubles[row_num,7]) # arrival/departure time ] ) ) row_num += 1 perf_dict = { \ Performance.PERFORMANCE_PF_COL_PROCESS_NUM : process_num, Performance.PERFORMANCE_PF_COL_PATHFINDING_STATUS : pf_returnstatus, Performance.PERFORMANCE_PF_COL_LABEL_ITERATIONS : label_iterations, Performance.PERFORMANCE_PF_COL_NUM_LABELED_STOPS : num_labeled_stops, Performance.PERFORMANCE_PF_COL_MAX_STOP_PROCESS_COUNT: max_label_process_count, Performance.PERFORMANCE_PF_COL_TIME_LABELING_MS : ms_labeling, Performance.PERFORMANCE_PF_COL_TIME_ENUMERATING_MS : ms_enumerating, Performance.PERFORMANCE_PF_COL_TRACED : trace, Performance.PERFORMANCE_PF_COL_WORKING_SET_BYTES : bytes_workingset, Performance.PERFORMANCE_PF_COL_PRIVATE_USAGE_BYTES : bytes_privateusage, Performance.PERFORMANCE_PF_COL_MEM_TIMESTAMP : datetime.datetime.fromtimestamp(mem_timestamp) } return (pathdict, perf_dict)
[docs] @staticmethod def find_passenger_vehicle_times(pathset_links_df, veh_trips_df): """ Given a dataframe of passenger links and a dataframe of vehicle trip links, adds two new columns to the passenger links for board and alight time. - Takes the trip links of pathset_links_df (columns: person_id, trip_list_id_num, pathnum, linkmode, trip_id_num, A_id_num, B_id_num, A_seq, B_seq, pf_A_time, pf_B_time, pf_linktime, A_id, B_id, trip_id) - Joins with vehicle trips on trip id num, A_id, A_seq to add: * board time (Assignment.SIM_COL_PAX_BOARD_TIME) * overcap (Assignment.SIM_COL_PAX_OVERCAP) * overcap_frac (Assignment.SIM_COL_PAX_OVERCAP_FRAC) - Joins with vehicle trips on trip id num, B_id, B_seq to add: * alight time (Assignment.SIM_COL_PAX_ALIGHT_TIME) Returns the same dataframe but with four additional columns (replacing them if they're already there). """ if False and len(Assignment.TRACE_IDS) > 0: FastTripsLogger.debug("find_passenger_vehicle_times(): input pathset_links_df len=%d\n%s" % \ (len(pathset_links_df), pathset_links_df.loc[pathset_links_df[Passenger.TRIP_LIST_COLUMN_TRACE]==True].to_string())) if Assignment.SIM_COL_PAX_BOARD_TIME in list(pathset_links_df.columns.values): pathset_links_df.drop([Assignment.SIM_COL_PAX_BOARD_TIME, Assignment.SIM_COL_PAX_ALIGHT_TIME, Assignment.SIM_COL_PAX_OVERCAP], axis=1, inplace=True) if Assignment.SIM_COL_PAX_OVERCAP_FRAC in list(pathset_links_df.columns.values): pathset_links_df.drop([Assignment.SIM_COL_PAX_OVERCAP_FRAC], axis=1, inplace=True) # FastTripsLogger.debug("pathset_links_df:\n%s\n" % pathset_links_df.head().to_string()) if False: FastTripsLogger.debug("veh_trips_df:\n%s\n" % veh_trips_df.head().to_string()) veh_trip_cols = [Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, Trip.STOPTIMES_COLUMN_STOP_ID, Trip.STOPTIMES_COLUMN_DEPARTURE_TIME, Trip.SIM_COL_VEH_OVERCAP, # TODO: what about msa_overcap? Trip.SIM_COL_VEH_OVERCAP_FRAC] # this one may not be here -- it's only present during capacity stuff if Trip.SIM_COL_VEH_OVERCAP_FRAC not in list(veh_trips_df.columns.values): veh_trip_cols.remove(Trip.SIM_COL_VEH_OVERCAP_FRAC) #This is a little long winded, but it cuts down on memory dramatically, but only copying #what is actually needed during the merges. intermediate = pd.merge(left=pathset_links_df[[Passenger.PERSONS_COLUMN_PERSON_ID, Trip.STOPTIMES_COLUMN_TRIP_ID,'A_id','A_seq', Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM, 'B_id', 'B_seq']], right=veh_trips_df[veh_trip_cols], left_on=[Trip.STOPTIMES_COLUMN_TRIP_ID, 'A_id', 'A_seq'], right_on=[Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE], how ='inner') intermediate = intermediate.drop(columns=[Trip.STOPTIMES_COLUMN_STOP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, 'A_id', 'A_seq']) intermediate = pd.merge(left=intermediate, right=veh_trips_df[[Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, Trip.STOPTIMES_COLUMN_STOP_ID, Trip.STOPTIMES_COLUMN_ARRIVAL_TIME]], left_on =[Trip.STOPTIMES_COLUMN_TRIP_ID,'B_id','B_seq'], right_on=[Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE], how ='inner',) intermediate = intermediate.drop(columns=[Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, 'B_id','B_seq']) intermediate = intermediate.rename(columns={ Trip.STOPTIMES_COLUMN_DEPARTURE_TIME:Assignment.SIM_COL_PAX_BOARD_TIME, # transit vehicle depart time (at A) = board time for pax Trip.STOPTIMES_COLUMN_ARRIVAL_TIME :Assignment.SIM_COL_PAX_ALIGHT_TIME, # transit vehicle arrive time (at B) = alight time for pax }) if False and len(Assignment.TRACE_IDS) > 0: FastTripsLogger.debug("find_passenger_vehicle_times(): output pathset_links_df len=%d\n%s" % \ (len(pathset_links_df), pathset_links_df.loc[pathset_links_df[Passenger.TRIP_LIST_COLUMN_TRACE]==True].to_string())) return pd.merge(pathset_links_df, intermediate, on=[Passenger.PERSONS_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM,], how='left')
[docs] @staticmethod def put_passengers_on_vehicles(pathset_links_df, veh_trips_df): """ Puts the chosen passenger trips specified in pathset_links_df onto the transit vehicle trips specified by veh_trip_df. Returns veh_trips_df but with updated columns - :py:attr:`Trip.SIM_COL_VEH_BOARDS` - :py:attr:`Trip.SIM_COL_VEH_ALIGHTS` - :py:attr:`Trip.SIM_COL_VEH_ONBOARD` - :py:attr:`Trip.SIM_COL_VEH_OVERCAP` - :py:attr:`Trip.SIM_COL_VEH_OVERCAP_FRAC` """ # drop these -- we'll set them if Trip.SIM_COL_VEH_BOARDS in list(veh_trips_df.columns.values): veh_trips_df.drop([Trip.SIM_COL_VEH_BOARDS, Trip.SIM_COL_VEH_ALIGHTS, Trip.SIM_COL_VEH_ONBOARD], axis=1, inplace=True) veh_trips_df_len = len(veh_trips_df) passengers_df = Passenger.get_chosen_links(pathset_links_df, transit_only=True, copy=False) # only care about trips #passengers_df = passengers_df.loc[passengers_df[Passenger.PF_COL_ROUTE_ID].notnull()] # Group to boards by counting trip_list_id_nums for a (trip_id, A_id as stop_id) passenger_trips_boards = passengers_df.loc[passengers_df[Assignment.SIM_COL_PAX_BUMP_ITER].isnull(), # unbumped passengers [Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM, Trip.STOPTIMES_COLUMN_TRIP_ID_NUM,'A_id_num','A_seq']].groupby([Trip.STOPTIMES_COLUMN_TRIP_ID_NUM,'A_id_num','A_seq']).count() passenger_trips_boards.index.names = [Trip.STOPTIMES_COLUMN_TRIP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE] # And alights by counting path_ids for a (trip_id, B_id as stop_id) passenger_trips_alights = passengers_df.loc[passengers_df[Assignment.SIM_COL_PAX_BUMP_ITER].isnull(), [Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM, Trip.TRIPS_COLUMN_TRIP_ID_NUM,'B_id_num','B_seq']].groupby([Trip.TRIPS_COLUMN_TRIP_ID_NUM,'B_id_num','B_seq']).count() passenger_trips_alights.index.names = [Trip.STOPTIMES_COLUMN_TRIP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE] # Join them to the transit vehicle trips so we can put people on vehicles (boards) veh_loaded_df = pd.merge(left = veh_trips_df, right = passenger_trips_boards, left_on = [Trip.STOPTIMES_COLUMN_TRIP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE], right_index = True, how = 'left') veh_loaded_df.rename(columns={Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM:Trip.SIM_COL_VEH_BOARDS}, inplace=True) # Join for alights veh_loaded_df = pd.merge(left = veh_loaded_df, right = passenger_trips_alights, left_on = [Trip.TRIPS_COLUMN_TRIP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE], right_index = True, how ='left') veh_loaded_df.rename(columns={Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM:Trip.SIM_COL_VEH_ALIGHTS}, inplace=True) veh_loaded_df.fillna(value=0, inplace=True) assert(len(veh_loaded_df)==veh_trips_df_len) # these are ints, not floats veh_loaded_df[[Trip.SIM_COL_VEH_BOARDS, Trip.SIM_COL_VEH_ALIGHTS]] = \ veh_loaded_df[[Trip.SIM_COL_VEH_BOARDS, Trip.SIM_COL_VEH_ALIGHTS]].astype(int) veh_loaded_df.set_index([Trip.TRIPS_COLUMN_TRIP_ID_NUM,Trip.STOPTIMES_COLUMN_STOP_SEQUENCE],inplace=True) veh_loaded_df[Trip.SIM_COL_VEH_ONBOARD ] = veh_loaded_df[Trip.SIM_COL_VEH_BOARDS ] - veh_loaded_df[Trip.SIM_COL_VEH_ALIGHTS ] # on board is the cumulative sum of boards - alights trips_cumsum = veh_loaded_df[[Trip.SIM_COL_VEH_ONBOARD]].groupby(level=[0]).cumsum() veh_loaded_df.drop([Trip.SIM_COL_VEH_ONBOARD], axis=1, inplace=True) # replace with cumsum veh_loaded_df = pd.merge(left = veh_loaded_df, right = trips_cumsum, left_index = True, right_index = True, how = 'left') assert(len(veh_loaded_df)==veh_trips_df_len) # print veh_trips_df.loc[5123368] veh_loaded_df.reset_index(inplace=True) # overcap = how many people are problematic, or onboard-totalcap. If negative, we have space. # overcap_frac = what percentage of boards are problematic veh_loaded_df[Trip.SIM_COL_VEH_OVERCAP ] = veh_loaded_df[Trip.SIM_COL_VEH_ONBOARD] - veh_loaded_df[Trip.VEHICLES_COLUMN_TOTAL_CAPACITY] veh_loaded_df[Trip.SIM_COL_VEH_OVERCAP_FRAC] = 0.0 veh_loaded_df.loc[veh_loaded_df[Trip.SIM_COL_VEH_BOARDS ]>0, Trip.SIM_COL_VEH_OVERCAP_FRAC] = veh_loaded_df[Trip.SIM_COL_VEH_OVERCAP] / veh_loaded_df[Trip.SIM_COL_VEH_BOARDS] FastTripsLogger.debug("veh_loaded_df with onboard>0: (showing head)\n" + \ veh_loaded_df.loc[veh_loaded_df[Trip.SIM_COL_VEH_ONBOARD]>0].head().to_string()) return veh_loaded_df
[docs] @staticmethod def flag_missed_transfers(pathset_paths_df, pathset_links_df): """ Given passenger pathset links with the vehicle board_time and alight_time attached to trip links, this method will add columns to determine if there are missed transfers. This works on all paths in the pathset rather than just the chosen paths because then we can choose a path without missed transfers. The following columns are used in pathset_links_df: * Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM, * Passenger.PF_COL_PATH_NUM, * Passenger.PF_COL_LINK_NUM, * Assignment.SIM_COL_PAX_BOARD_TIME * Assignment.SIM_COL_PAX_ALIGHT_TIME * Passenger.PF_COL_PAX_B_TIME In particular, the following columns are added (or replaced if they're already there) to *pathset_links_df*: ======================================================= ============================================================================== Column Name Column Description ======================================================= ============================================================================== :py:attr:`Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN` Delay in alight_time from original pathfinding understanding of alight time :py:attr:`Assignment.SIM_COL_PAX_A_TIME` New A time given the trip board/alight times for the trip links :py:attr:`Assignment.SIM_COL_PAX_B_TIME` New B time given the trip board/alight times for the trip links :py:attr:`Assignment.SIM_COL_PAX_LINK_TIME` New link time from B time - A time :py:attr:`Assignment.SIM_COL_PAX_WAIT_TIME` New waittime given the trip board/alight times for the trip links :py:attr:`Assignment.SIM_COL_PAX_MISSED_XFER` Is this link a missed xfer ======================================================= ============================================================================== The column, :py:attr:`AssignmentSIM_COL_PAX_MISSED_XFER`, is also added to *pathset_paths_df*. """ # Drop these, we'll set them again if Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN in list(pathset_links_df.columns.values): pathset_links_df.drop([Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN, Assignment.SIM_COL_PAX_A_TIME, Assignment.SIM_COL_PAX_B_TIME, Assignment.SIM_COL_PAX_LINK_TIME, Assignment.SIM_COL_PAX_WAIT_TIME, Assignment.SIM_COL_MISSED_XFER], axis=1, inplace=True) # Set alight delay (min) FastTripsLogger.debug("flag_missed_transfers() pathset_links_df (%d):\n%s" % (len(pathset_links_df), pathset_links_df.head().to_string())) pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN] = 0.0 pathset_links_df.loc[pd.notnull(pathset_links_df[Trip.TRIPS_COLUMN_TRIP_ID]), Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN] = \ ((pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_TIME]-pathset_links_df[Passenger.PF_COL_PAX_B_TIME]) / np.timedelta64(1, 'm')) #: todo: is there a more elegant way to take care of this? some trips have times after midnight so they're the next day pathset_links_df.loc[pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN]>22*60, Assignment.SIM_COL_PAX_BOARD_TIME ] = pathset_links_df[Assignment.SIM_COL_PAX_BOARD_TIME] - np.timedelta64(24, 'h') pathset_links_df.loc[pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN]>22*60, Assignment.SIM_COL_PAX_ALIGHT_TIME] = pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_TIME] - np.timedelta64(24, 'h') pathset_links_df.loc[pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN]>22*60, Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN] = \ ((pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_TIME]-pathset_links_df[Passenger.PF_COL_PAX_B_TIME]) / np.timedelta64(1, 'm')) max_alight_delay_min = pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN].max() FastTripsLogger.debug("Biggest alight_delay = %f" % max_alight_delay_min) if max_alight_delay_min > 0: FastTripsLogger.debug("\n%s" % pathset_links_df.sort_values(by=Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN, ascending=False).head().to_string()) # For trips, alight_time is the new B_time # Set A_time for links AFTER trip links by joining to next leg next_trips = pathset_links_df[[ Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM, Assignment.SIM_COL_PAX_ALIGHT_TIME]].copy() next_trips[Passenger.PF_COL_LINK_NUM] = next_trips[Passenger.PF_COL_LINK_NUM] + 1 next_trips.rename(columns={Assignment.SIM_COL_PAX_ALIGHT_TIME:Assignment.SIM_COL_PAX_A_TIME}, inplace=True) # Add it to passenger trips. Now A time is set for links after trip links (note this will never be a trip link) pathset_links_df = pd.merge(left =pathset_links_df, right=next_trips, how ="left", on =[Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM]) FastTripsLogger.debug(str(pathset_links_df.dtypes)) # Set the new B time for those links -- link time for access/egress/xfer is travel time since wait times are in trip links pathset_links_df[Assignment.SIM_COL_PAX_B_TIME] = pathset_links_df[Assignment.SIM_COL_PAX_A_TIME] + pathset_links_df[Passenger.PF_COL_LINK_TIME] # For trip links, it's alight time pathset_links_df.loc[pathset_links_df[Passenger.PF_COL_LINK_MODE]==PathSet.STATE_MODE_TRIP, Assignment.SIM_COL_PAX_B_TIME] = pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_TIME] # For access links, it doesn't change from the original pathfinding result pathset_links_df.loc[pathset_links_df[Passenger.PF_COL_LINK_MODE]==PathSet.STATE_MODE_ACCESS, Assignment.SIM_COL_PAX_A_TIME] = pathset_links_df[Passenger.PF_COL_PAX_A_TIME] pathset_links_df.loc[pathset_links_df[Passenger.PF_COL_LINK_MODE]==PathSet.STATE_MODE_ACCESS, Assignment.SIM_COL_PAX_B_TIME] = pathset_links_df[Passenger.PF_COL_PAX_B_TIME] # Now we only need to set the trip link's A time from the previous link's new_B_time next_trips = pathset_links_df[[ Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM, Assignment.SIM_COL_PAX_B_TIME]].copy() next_trips[Passenger.PF_COL_LINK_NUM] = next_trips[Passenger.PF_COL_LINK_NUM] + 1 next_trips.rename(columns={Assignment.SIM_COL_PAX_B_TIME:"new_trip_A_time"}, inplace=True) # Add it to passenger trips. Now new_trip_A_time is set for trip links pathset_links_df = pd.merge(left =pathset_links_df, right =next_trips, how ="left", on =[Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM]) pathset_links_df.loc[pathset_links_df[Passenger.PF_COL_LINK_MODE]==PathSet.STATE_MODE_TRIP, Assignment.SIM_COL_PAX_A_TIME] = pathset_links_df["new_trip_A_time"] pathset_links_df.drop(["new_trip_A_time"], axis=1, inplace=True) pathset_links_df[Assignment.SIM_COL_PAX_LINK_TIME] = pathset_links_df[Assignment.SIM_COL_PAX_B_TIME] - pathset_links_df[Assignment.SIM_COL_PAX_A_TIME] #: todo: is there a more elegant way to take care of this? some trips have times after midnight so they're the next day #: if the linktime > 22 hours then the trip time is probably off by a day, so it's right after midnight -- back it up pathset_links_df.loc[pathset_links_df[Assignment.SIM_COL_PAX_LINK_TIME]/ np.timedelta64(1, 'h') > 22, Assignment.SIM_COL_PAX_B_TIME ] = pathset_links_df[Assignment.SIM_COL_PAX_B_TIME] - np.timedelta64(24, 'h') pathset_links_df.loc[pathset_links_df[Assignment.SIM_COL_PAX_LINK_TIME]/ np.timedelta64(1, 'h') > 22, Assignment.SIM_COL_PAX_LINK_TIME] = pathset_links_df[Assignment.SIM_COL_PAX_B_TIME] - pathset_links_df[Assignment.SIM_COL_PAX_A_TIME] #: if the linktime < -22 hours then the trip time is probably off by a day, so it's right before midnight -- back it up pathset_links_df.loc[pathset_links_df[Assignment.SIM_COL_PAX_LINK_TIME]/ np.timedelta64(-1, 'h') < -22, Assignment.SIM_COL_PAX_A_TIME ] = pathset_links_df[Assignment.SIM_COL_PAX_A_TIME] - np.timedelta64(24, 'h') pathset_links_df.loc[pathset_links_df[Assignment.SIM_COL_PAX_LINK_TIME]/ np.timedelta64(-1, 'h') < -22, Assignment.SIM_COL_PAX_LINK_TIME] = pathset_links_df[Assignment.SIM_COL_PAX_B_TIME] - pathset_links_df[Assignment.SIM_COL_PAX_A_TIME] # new wait time pathset_links_df.loc[pd.notnull(pathset_links_df[Trip.TRIPS_COLUMN_TRIP_ID]), Assignment.SIM_COL_PAX_WAIT_TIME] = pathset_links_df[Assignment.SIM_COL_PAX_BOARD_TIME] - pathset_links_df[Assignment.SIM_COL_PAX_A_TIME] # invalid trips have negative wait time pathset_links_df[Assignment.SIM_COL_MISSED_XFER] = 0 pathset_links_df.loc[pathset_links_df[Assignment.SIM_COL_PAX_WAIT_TIME]/ np.timedelta64(1,'m') < 0, Assignment.SIM_COL_MISSED_XFER] = 1 # count how many are valid (sum of invalid = 0 for the trip list id + path) pathset_links_df_grouped = pathset_links_df.groupby([Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM, # sort by this Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM]).aggregate({Assignment.SIM_COL_MISSED_XFER:"sum" }) pathset_links_df_grouped.loc[pathset_links_df_grouped[Assignment.SIM_COL_MISSED_XFER]> 0, Assignment.SIM_COL_MISSED_XFER] = 1 FastTripsLogger.info(" flag_missed_transfers found %d missed transfer trip legs for %d paths" % \ (pathset_links_df[Assignment.SIM_COL_MISSED_XFER].sum(), pathset_links_df_grouped[Assignment.SIM_COL_MISSED_XFER].sum())) # add missed_xfer to pathset_paths_df (replacing if it was there already) if Assignment.SIM_COL_MISSED_XFER in list(pathset_paths_df.columns.values): pathset_paths_df.drop([Assignment.SIM_COL_MISSED_XFER], axis=1, inplace=True) pathset_paths_df = pd.merge(left =pathset_paths_df, right =pathset_links_df_grouped.reset_index()[[Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Assignment.SIM_COL_MISSED_XFER]], how ="left") FastTripsLogger.debug("flag_missed_transfers() pathset_paths_df (%d):\n%s" % (len(pathset_paths_df), pathset_paths_df.head(30).to_string())) return (pathset_paths_df, pathset_links_df)
[docs] @staticmethod def load_passengers_on_vehicles_with_cap(iteration, pathfinding_iteration, simulation_iteration, trips, pathset_paths_df, pathset_links_df, veh_loaded_df): """ Check if we have boards on over-capacity vehicles. Mark them and mark the boards. If :py:attr:`Assignment.CAPACITY_CONSTRAINT`, then bump off overcapacity passengers. The process is: 1) Look at which vehicle links are over capacity, adding columns named :py:attr:`Trip.SIM_COL_VEH_OVERCAP` and py:attr:`Trip.SIM_COL_VEH_OVERCAP_FRAC` to *veh_loaded_df* 2) Look at the stops where the first people board after we're at capacity (impossible boards) if any 3) If :py:attr:`Assignment.BUMP_ONE_AT_A_TIME`, select the first such stop by arrival time Otherwise, select the first such stop for each vehicle trip 4) Join these stops to pathset_links_df, so pathset_links_df now has column Assignment.SIM_COL_PAX_OVERCAP_FRAC 5) If not :py:attr:`Assignment.CAPACITY_CONSTRAINT`, return (and drop the column named :py:attr:`Trip.SIM_COL_VEH_OVERCAP` from veh_loaded_df) 6) Figure out which passenger trips are actually getting bumped. Some people can get on at these stops, but not all, so let the first ones that arrive at the stop get on and filter to the ones we'll actually bump. Update the column named :py:attr:`Assignmment.SIM_COL_PAX_BUMP_ITER`. If non-null, this represents the iteration the passenger got bumped. Return (chosen_paths_bumped, pathset_paths_df, pathset_links_df, veh_loaded_df) """ # these are the relevant vehicle columns vehicle_trip_debug_columns = [ \ Trip.TRIPS_COLUMN_ROUTE_ID, Trip.TRIPS_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, Trip.STOPTIMES_COLUMN_STOP_ID, Trip.VEHICLES_COLUMN_TOTAL_CAPACITY, Trip.SIM_COL_VEH_BOARDS, Trip.SIM_COL_VEH_ALIGHTS, Trip.SIM_COL_VEH_ONBOARD, Trip.SIM_COL_VEH_OVERCAP, Trip.SIM_COL_VEH_OVERCAP_FRAC ] # these are the relevant pathset links colums pax_links_debug_columns = [ \ Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM, Passenger.PF_COL_ROUTE_ID, Passenger.PF_COL_TRIP_ID, Passenger.PF_COL_PAX_A_TIME, "A_id","A_id_num","A_seq", Assignment.SIM_COL_PAX_A_TIME, Assignment.SIM_COL_PAX_OVERCAP, Assignment.SIM_COL_PAX_OVERCAP_FRAC, Assignment.SIM_COL_PAX_BOARD_STATE, Assignment.SIM_COL_PAX_BUMP_ITER, Assignment.SIM_COL_PAX_CHOSEN, ] current_pf_iter = 0.01*pathfinding_iteration + iteration current_sim_iter = "iter%.2f sim%d" % (current_pf_iter, simulation_iteration) # this will involve looping # no one is bumped yet bump_iter = 0 pathset_links_df[Assignment.SIM_COL_PAX_OVERCAP_FRAC] = np.NaN if simulation_iteration==0: # For those we just found paths for, no one is bumped or going on overcap vehicles yet # pathset_paths_df.loc[pathset_paths_df[Passenger.PF_COL_PF_ITERATION]==current_pf_iter, Assignment.SIM_COL_PAX_BUMP_ITER ] = np.NaN # pathset_links_df.loc[pathset_links_df[Passenger.PF_COL_PF_ITERATION]==current_pf_iter, Assignment.SIM_COL_PAX_BUMP_ITER ] = np.NaN # pathset_links_df.loc[pathset_links_df[Passenger.PF_COL_PF_ITERATION]==current_pf_iter, Assignment.SIM_COL_PAX_BOARD_STATE ] = np.NaN # anyone can be bumped, including those from previous pathfinding iters. Otherwise, we wouldn't be able to ride to an earlier stop and bump them pathset_paths_df[ Assignment.SIM_COL_PAX_BUMP_ITER ] = np.NaN pathset_links_df[ Assignment.SIM_COL_PAX_BUMP_ITER ] = np.NaN pathset_links_df[ Assignment.SIM_COL_PAX_BOARD_STATE ] = np.NaN # make sure BOARD_STATE and CHOSEN are categorical pathset_paths_df[Assignment.SIM_COL_PAX_CHOSEN ] = pd.Categorical( pathset_paths_df[Assignment.SIM_COL_PAX_CHOSEN ], categories=Assignment.CHOSEN_CATEGORIES, ordered=True) pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN ] = pd.Categorical( pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN ], categories=Assignment.CHOSEN_CATEGORIES, ordered=True) pathset_links_df[Assignment.SIM_COL_PAX_BOARD_STATE] = pd.Categorical( pathset_links_df[Assignment.SIM_COL_PAX_BOARD_STATE], categories=Assignment.BOARD_STATE_CATEGORICAL) while True: # loop for capacity constraint FastTripsLogger.info(" Step 5.1 Put passengers on transit vehicles.") # Put passengers on vehicles, updating the vehicle's boards, alights, onboard, overcap, overcap_frac veh_loaded_df = Assignment.put_passengers_on_vehicles(pathset_links_df, veh_loaded_df) FastTripsLogger.debug("after putting passengers on vehicles, veh_loaded_df with onboard.head(30) = \n%s" % veh_loaded_df.loc[ veh_loaded_df[Trip.SIM_COL_VEH_ONBOARD]>0, vehicle_trip_debug_columns].head(30).to_string()) if not Assignment.CAPACITY_CONSTRAINT: # We can't do anything about capacity so assume everyone boarded pathset_links_df.loc[ (pathset_links_df[Passenger.PF_COL_PF_ITERATION]==current_pf_iter)& (pathset_links_df[Passenger.PF_COL_TRIP_ID].notnull()), Assignment.SIM_COL_PAX_BOARD_STATE ] = "board_easy" break FastTripsLogger.info(" Step 5.2 Capacity constraints on transit vehicles.") if bump_iter == 0: FastTripsLogger.info(" Bumping one at a time? %s" % ("true" if Assignment.BUMP_ONE_AT_A_TIME else "false")) # This will update board time, alight time, overcap, overcap_frac pathset_links_df = Assignment.find_passenger_vehicle_times(pathset_links_df, veh_loaded_df) FastTripsLogger.debug("pathset_links_df.head(20)=\n%s" % pathset_links_df[pax_links_debug_columns].head(20).to_string()) # make sure BOARD_STATE and CHOSEN are categorical pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN ] = pd.Categorical( pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN ], categories=Assignment.CHOSEN_CATEGORIES, ordered=True) pathset_links_df[Assignment.SIM_COL_PAX_BOARD_STATE] = pd.Categorical( pathset_links_df[Assignment.SIM_COL_PAX_BOARD_STATE], categories=Assignment.BOARD_STATE_CATEGORICAL) # CHOSEN: Everyone who can board easily, do so pathset_links_df.loc[ pathset_links_df[Passenger.PF_COL_ROUTE_ID].notnull() & # trip links only pathset_links_df[Assignment.SIM_COL_PAX_BUMP_ITER].isnull() & # not already bumped (pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN]>Assignment.CHOSEN_NOT_CHOSEN_YET)& # chosen (pathset_links_df[Assignment.SIM_COL_PAX_OVERCAP]<0), # can board Assignment.SIM_COL_PAX_BOARD_STATE ] = "board_easy" # CHOSEN: Everyone who can squeeze in, do so pathset_links_df.loc[ pathset_links_df[Passenger.PF_COL_ROUTE_ID].notnull() & # trip links only pathset_links_df[Assignment.SIM_COL_PAX_BUMP_ITER].isnull() & # not already bumped (pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN]>Assignment.CHOSEN_NOT_CHOSEN_YET)& # chosen (pathset_links_df[Assignment.SIM_COL_PAX_OVERCAP]==0), # can barely board Assignment.SIM_COL_PAX_BOARD_STATE ] = "boarded" # UNCHOSEN: paths that are overcap -- nope pathset_links_df.loc[ pathset_links_df[Passenger.PF_COL_ROUTE_ID].notnull() & # trip links only pathset_links_df[Assignment.SIM_COL_PAX_BUMP_ITER].isnull() & # not already bumped (pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN]==Assignment.CHOSEN_NOT_CHOSEN_YET)& # unchosen (pathset_links_df[Assignment.SIM_COL_PAX_OVERCAP]>=0), # overcap Assignment.SIM_COL_PAX_BOARD_STATE ] = "bumped_unchosen" pathset_links_df.loc[ pathset_links_df[Passenger.PF_COL_ROUTE_ID].notnull() & # trip links only pathset_links_df[Assignment.SIM_COL_PAX_BUMP_ITER].isnull() & # not already bumped (pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN]==Assignment.CHOSEN_NOT_CHOSEN_YET)& # unchosen (pathset_links_df[Assignment.SIM_COL_PAX_OVERCAP]>=0), # overcap Assignment.SIM_COL_PAX_BUMP_ITER ] = bump_iter # For those trying to board overcap, choose the winners and losers # These are trips/stops over capacity overcap_df = veh_loaded_df.loc[veh_loaded_df[Trip.SIM_COL_VEH_OVERCAP] > 0] FastTripsLogger.debug("load_passengers_on_vehicles_with_cap() %d vehicle trip/stops over capacity: (showing head)\n%s" % \ (len(overcap_df), overcap_df[vehicle_trip_debug_columns].head().to_string())) # If none, we're done if len(overcap_df) == 0: FastTripsLogger.info(" No over-capacity vehicles") break # 2) Look at the trip-stops where the *first people* board after we're at capacity (impossible boards) if any bump_stops_df = overcap_df.groupby([Trip.STOPTIMES_COLUMN_TRIP_ID]).aggregate('first').reset_index() FastTripsLogger.debug("load_passengers_on_vehicles_with_cap() bump_stops_df iter=%d pf_iter=%d sim_iter=%d bump_iter=%d (%d rows, showing head):\n%s" % (iteration, pathfinding_iteration, simulation_iteration, bump_iter, len(bump_stops_df), bump_stops_df[vehicle_trip_debug_columns].head().to_string())) if Assignment.BUMP_ONE_AT_A_TIME: bump_stops_df.sort_values(by=[Trip.STOPTIMES_COLUMN_ARRIVAL_TIME], inplace=True) bump_stops_df = bump_stops_df.iloc[:1] FastTripsLogger.info(" Need to bump %d passengers from %d trip-stops" % (bump_stops_df.overcap.sum(), len(bump_stops_df))) # debug -- see the whole trip if True: FastTripsLogger.debug("load_passengers_on_vehicles_with_cap() Trips with bump stops:\n%s\n" % \ pd.merge( left=veh_loaded_df[vehicle_trip_debug_columns], right=bump_stops_df[[Trip.STOPTIMES_COLUMN_TRIP_ID]], how='inner').to_string()) # make sure CHOSEN is categorical pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN ] = pd.Categorical( pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN ], categories=Assignment.CHOSEN_CATEGORIES, ordered=True) # join CHOSEN pathset links to bump_stops_df; now passenger links boarding at a bump stop will have Trip.STOPTIMES_COLUMN_STOP_SEQUENCE set bumpstop_boards = pd.merge(left =pathset_links_df.loc[ pathset_links_df[Passenger.PF_COL_ROUTE_ID].notnull() & # trip links only pathset_links_df[Assignment.SIM_COL_PAX_BUMP_ITER].isnull() & # not already bumped (pathset_links_df[Assignment.SIM_COL_PAX_CHOSEN]>Assignment.CHOSEN_NOT_CHOSEN_YET) ], # chosen left_on =[Trip.STOPTIMES_COLUMN_TRIP_ID, "A_seq"], right =bump_stops_df[[Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE]], right_on=[Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE], how ="left") # bump candidates: boarding at bump stops, chosen paths bumpstop_boards = bumpstop_boards.loc[ bumpstop_boards[Trip.STOPTIMES_COLUMN_STOP_SEQUENCE].notnull(), # board at bump_stops_df stop pax_links_debug_columns].copy() # bump off later arrivals, later trip_list_num bumpstop_boards.sort_values(by=[ \ Assignment.SIM_COL_PAX_A_TIME, # I think this is correct Trip.STOPTIMES_COLUMN_TRIP_ID, "A_seq", Passenger.PF_COL_PAX_A_TIME, Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM], ascending=[True, True, True, False, False], inplace=True) bumpstop_boards.reset_index(drop=True, inplace=True) # For each trip_id, stop_seq, stop_id, we want the first *overcap* rows # group to trip_id, stop_seq, stop_id and count off bpb_count = bumpstop_boards.groupby([Trip.STOPTIMES_COLUMN_TRIP_ID,"A_seq","A_id_num"]).cumcount() bpb_count.name = 'bump_index' # Add the bump index to our passenger-paths/stops bumpstop_boards = pd.concat([bumpstop_boards, bpb_count], axis=1) # bump or board them bumpstop_boards[ Assignment.SIM_COL_PAX_BOARD_STATE ] = pd.Categorical(["boarded"]*len(bumpstop_boards), categories=Assignment.BOARD_STATE_CATEGORICAL) bumpstop_boards.loc[ bumpstop_boards["bump_index"] < bumpstop_boards[Trip.SIM_COL_VEH_OVERCAP], Assignment.SIM_COL_PAX_BOARD_STATE ] = "bumped" # these folks got bumped bumpstop_boards.loc[ bumpstop_boards["bump_index"] < bumpstop_boards[Trip.SIM_COL_VEH_OVERCAP], Assignment.SIM_COL_PAX_BUMP_ITER ] = bump_iter # these folks got bumped FastTripsLogger.debug("load_passengers_on_vehicles_with_cap() bumpstop_boards (%d rows, showing head):\n%s" % \ (len(bumpstop_boards), bumpstop_boards.head(50).to_string())) # filter to unique passengers/paths who got bumped bump_paths = bumpstop_boards.loc[ bumpstop_boards[ Assignment.SIM_COL_PAX_BOARD_STATE ] == "bumped", [Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_TRIP_LIST_ID_NUM, Passenger.PF_COL_PATH_NUM]].drop_duplicates() chosen_paths_bumped = len(bump_paths) # figure when the wait time starts for the bump stops new_bump_wait = bumpstop_boards[[Trip.STOPTIMES_COLUMN_TRIP_ID, "A_seq", "A_id_num", Passenger.PF_COL_PAX_A_TIME]].groupby( \ [Trip.STOPTIMES_COLUMN_TRIP_ID, "A_seq","A_id_num"]).first().reset_index(drop=False) new_bump_wait.rename(columns={"A_seq" :Trip.STOPTIMES_COLUMN_STOP_SEQUENCE, "A_id_num":Trip.STOPTIMES_COLUMN_STOP_ID_NUM}, inplace=True) # need trip id num new_bump_wait = trips.add_numeric_trip_id(new_bump_wait, Trip.STOPTIMES_COLUMN_TRIP_ID, Trip.STOPTIMES_COLUMN_TRIP_ID_NUM) FastTripsLogger.debug("new_bump_wait (%d rows, showing head):\n%s" % (len(new_bump_wait), new_bump_wait.head().to_string())) # incorporate it into the bump wait df if type(Assignment.bump_wait_df) == type(None): Assignment.bump_wait_df = new_bump_wait else: Assignment.bump_wait_df = pd.concat([Assignment.bump_wait_df, new_bump_wait], axis=0) FastTripsLogger.debug("load_passengers_on_vehicles_with_cap() bump_wait_df (%d rows, showing head):\n%s" % (len(Assignment.bump_wait_df), Assignment.bump_wait_df.head().to_string())) Assignment.bump_wait_df.drop_duplicates(subset=[Trip.STOPTIMES_COLUMN_TRIP_ID_NUM, Trip.STOPTIMES_COLUMN_STOP_SEQUENCE], inplace=True) # finally, incorporate the board state and bump_iter to the full pathset_links_df pathset_links_df = pd.merge(left =pathset_links_df, right =bumpstop_boards[[Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM, Assignment.SIM_COL_PAX_BOARD_STATE, Assignment.SIM_COL_PAX_BUMP_ITER]], on =[Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM, Passenger.PF_COL_LINK_NUM], how ="left", suffixes=[""," bb"], indicator=True) pathset_links_df.loc[ pathset_links_df["_merge"]=="both", Assignment.SIM_COL_PAX_BOARD_STATE ] = pathset_links_df["%s bb" % Assignment.SIM_COL_PAX_BOARD_STATE] pathset_links_df.loc[ pathset_links_df["_merge"]=="both", Assignment.SIM_COL_PAX_BUMP_ITER ] = pathset_links_df["%s bb" % Assignment.SIM_COL_PAX_BUMP_ITER ] pathset_links_df.drop(["_merge","%s bb" % Assignment.SIM_COL_PAX_BOARD_STATE, "%s bb" % Assignment.SIM_COL_PAX_BUMP_ITER], axis=1, inplace=True) FastTripsLogger.debug(pathset_links_df[pax_links_debug_columns].head()) # bump the whole path bump_paths_df = pathset_links_df.loc[ pathset_links_df[Assignment.SIM_COL_PAX_BUMP_ITER]==bump_iter, [Passenger.TRIP_LIST_COLUMN_PERSON_ID, Passenger.TRIP_LIST_COLUMN_PERSON_TRIP_ID, Passenger.PF_COL_PATH_NUM]].drop_duplicates() pathset_paths_df = pd.merge(left =pathset_paths_df, right =bump_paths_df, how ="left", indicator=True) pathset_paths_df.loc[ pathset_paths_df["_merge"]=="both", Assignment.SIM_COL_PAX_BUMP_ITER ] = bump_iter pathset_paths_df.drop(["_merge"], axis=1, inplace=True) # communicate back to other links in the same path too pathset_links_df = pd.merge(left =pathset_links_df, right =bump_paths_df, how ="left", indicator=True) pathset_links_df.loc[ pathset_links_df["_merge"]=="both", Assignment.SIM_COL_PAX_BUMP_ITER ] = bump_iter pathset_links_df.loc[ (pathset_links_df["_merge"]=="both")& (pathset_links_df[Assignment.SIM_COL_PAX_BOARD_STATE].isnull()| (pathset_links_df[Assignment.SIM_COL_PAX_BOARD_STATE]=="boarded")| (pathset_links_df[Assignment.SIM_COL_PAX_BOARD_STATE]=="board_easy"))& pathset_links_df[Passenger.PF_COL_ROUTE_ID].notnull(), Assignment.SIM_COL_PAX_BOARD_STATE ] = "bumped_othertrip" pathset_links_df.drop(["_merge"], axis=1, inplace=True) FastTripsLogger.info(" -> completed loop bump_iter %d and bumped %d chosen paths" % (bump_iter, chosen_paths_bumped)) if chosen_paths_bumped == 0: break bump_iter += 1 if type(Assignment.bump_wait_df) == pd.DataFrame and len(Assignment.bump_wait_df) > 0: Assignment.bump_wait_df[Passenger.PF_COL_PAX_A_TIME_MIN] = \ Assignment.bump_wait_df[Passenger.PF_COL_PAX_A_TIME].map(lambda x: (60.0*x.hour) + x.minute + (x.second/60.0)) if type(Assignment.bump_wait_df) == pd.DataFrame and len(Assignment.bump_wait_df) > 0: FastTripsLogger.debug("Bump_wait_df:\n%s" % Assignment.bump_wait_df.to_string()) return (pathset_paths_df, pathset_links_df, veh_loaded_df)
[docs] @staticmethod def choose_paths_without_simulation(FT, output_dir, iteration, pathfinding_iteration, pathset_paths_df, pathset_links_df, veh_trips_df): """ Given a pathset for each passernger, choose a path (if relevant). That's it. Returns (valid_linked_trips, pathset_paths_df, pathset_links_df) """ simulation_iteration = 0 num_passengers_arrived = 0 ###################################################################################################### FastTripsLogger.info(" Step 1. Find out board/alight times for all pathset links from vehicle times") # could do this just to chosen path links but let's do this to the whole pathset pathset_links_df = Assignment.find_passenger_vehicle_times(pathset_links_df, veh_trips_df) # instead of flag_missed_transfers(), set these to pathfinding results pathset_links_df[Assignment.SIM_COL_PAX_ALIGHT_DELAY_MIN] = 0 pathset_links_df[Assignment.SIM_COL_PAX_A_TIME ] = pathset_links_df[Passenger.PF_COL_PAX_A_TIME] pathset_links_df[Assignment.SIM_COL_PAX_B_TIME ] = pathset_links_df[Passenger.PF_COL_PAX_B_TIME] pathset_links_df[Assignment.SIM_COL_PAX_LINK_TIME ] = pathset_links_df[Passenger.PF_COL_LINK_TIME] pathset_links_df[Assignment.SIM_COL_PAX_WAIT_TIME ] = pathset_links_df[Passenger.PF_COL_WAIT_TIME] pathset_links_df[Assignment.SIM_COL_PAX_MISSED_XFER ] = 0 ###################################################################################################### FastTripsLogger.info(" Step 2. Calculate costs and probabilities for all pathset paths") (pathset_paths_df, pathset_links_df) = PathSet.calculate_cost( Assignment.STOCH_DISPERSION, pathset_paths_df, pathset_links_df, FT.veh_trips_df, FT.passengers.trip_list_df, FT.routes, FT.tazs, FT.transfers, stops=FT.stops, reset_bump_iter=simulation_iteration==0) ###################################################################################################### FastTripsLogger.info(" Step 3. Choose a path for each passenger from their pathset") # Choose path for each passenger -- pathset_paths_df and pathset_links_df will now have # SIM_COL_PAX_CHOSEN >=0 for chosen paths/path links (num_passengers_arrived, num_chosen, pathset_paths_df, pathset_links_df) = Passenger.choose_paths( True, # choose for everyone iteration, pathfinding_iteration, simulation_iteration, pathset_paths_df, pathset_links_df) # Write the pathsets Passenger.write_paths(output_dir, iteration, pathfinding_iteration, simulation_iteration, pathset_paths_df, False, Assignment.OUTPUT_PATHSET_PER_SIM_ITER, not Assignment.DEBUG_OUTPUT_COLUMNS, not Assignment.DEBUG_OUTPUT_COLUMNS) Passenger.write_paths(output_dir, iteration, pathfinding_iteration, simulation_iteration, pathset_links_df, True, Assignment.OUTPUT_PATHSET_PER_SIM_ITER, not Assignment.DEBUG_OUTPUT_COLUMNS, not Assignment.DEBUG_OUTPUT_COLUMNS) # write the final chosen paths for this iteration chosen_links_df = Passenger.get_chosen_links(pathset_links_df) chosen_links_df["iteration"] = iteration Util.write_dataframe(chosen_links_df, "chosen_links_df", os.path.join(output_dir, "chosenpaths_links.csv"), append=(iteration>1), drop_debug_columns =not Assignment.DEBUG_OUTPUT_COLUMNS, drop_pathfinding_columns=not Assignment.DEBUG_OUTPUT_COLUMNS) chosen_links_df.drop(["iteration"], axis=1, inplace=True) chosen_paths_df = Passenger.get_chosen_links(pathset_paths_df) chosen_paths_df["iteration"] = iteration Util.write_dataframe(chosen_paths_df, "chosen_paths_df", os.path.join(output_dir, "chosenpaths_paths.csv"), append=(iteration>1), drop_debug_columns =not Assignment.DEBUG_OUTPUT_COLUMNS, drop_pathfinding_columns=not Assignment.DEBUG_OUTPUT_COLUMNS) chosen_paths_df.drop(["iteration"], axis=1, inplace=True) return (num_passengers_arrived, pathset_paths_df, pathset_links_df)
[docs] @staticmethod def simulate(FT, output_dir, iteration, pathfinding_iteration, pathset_paths_df, pathset_links_df, veh_trips_df): """ Given a pathset for each passenger, choose a path (if relevant) and then actually assign the passengers trips to the vehicles. Returns (valid_linked_trips, pathset_paths_df, pathset_links_df, veh_loaded_df) """ simulation_iteration = 0 num_passengers_arrived = 0 # will get returned from choose_paths while True: FT.performance.record_step_start(iteration, pathfinding_iteration, simulation_iteration, "simulation iteration") FastTripsLogger.info("Simulation Iteration %d" % simulation_iteration) # for trace_tuple in Assignment.TRACE_PERSON_IDS: # FastTripsLogger.debug("Initial pathset_links_df for %s\n%s" % \ # (str(trace_pax), pathset_links_df.loc[pathset_links_df.person_id==trace_pax].to_string())) # FastTripsLogger.debug("Initial pathset_paths_df for %s\n%s" % \ # (str(trace_pax), pathset_paths_df.loc[pathset_paths_df.person_id==trace_pax].to_string())) ###################################################################################################### FastTripsLogger.info(" Step 1. Find out board/alight times for all pathset links from vehicle times") # could do this just to chosen path links but let's do this to the whole pathset pathset_links_df = Assignment.find_passenger_vehicle_times(pathset_links_df, veh_trips_df) ###################################################################################################### FastTripsLogger.info(" Step 2. Flag missed transfer links and paths in the pathsets") (pathset_paths_df, pathset_links_df) = Assignment.flag_missed_transfers(pathset_paths_df, pathset_links_df) ###################################################################################################### FastTripsLogger.info(" Step 3. Calculate costs and probabilities for all pathset paths") (pathset_paths_df, pathset_links_df) = PathSet.calculate_cost( Assignment.STOCH_DISPERSION, pathset_paths_df, pathset_links_df, veh_trips_df, FT.passengers.trip_list_df, FT.routes, FT.tazs, FT.transfers, stops=FT.stops, reset_bump_iter=simulation_iteration == 0) ###################################################################################################### FastTripsLogger.info(" Step 4. Choose a path for each passenger from their pathset") # Choose path for each passenger -- pathset_paths_df and pathset_links_df will now have # SIM_COL_PAX_CHOSEN >=0 for chosen paths/path links (num_passengers_arrived, num_chosen, pathset_paths_df, pathset_links_df) = Passenger.choose_paths( Assignment.PATHFINDING_EVERYONE and simulation_iteration==0, # choose for everyone if we just re-found all paths iteration, pathfinding_iteration, simulation_iteration, pathset_paths_df, pathset_links_df) ###################################################################################################### FastTripsLogger.info(" Step 5. Put passenger paths on transit vehicles to get vehicle boards/alights/load and assess capacity constraints") (pathset_paths_df, pathset_links_df, veh_trips_df) = Assignment.load_passengers_on_vehicles_with_cap( iteration, pathfinding_iteration, simulation_iteration, FT.trips, pathset_paths_df, pathset_links_df, veh_trips_df) ###################################################################################################### FastTripsLogger.info(" Step 6. Update dwell and travel times for transit vehicles") # update the trip times -- accel/decel rates + stops affect travel times, and boards/alights affect dwell times veh_trips_df = Trip.update_trip_times(veh_trips_df, Assignment.MSA_RESULTS) ###################################################################################################### if Assignment.OUTPUT_PATHSET_PER_SIM_ITER: FastTripsLogger.info(" Step 7. Write pathsets (paths and links)") Passenger.write_paths(output_dir, iteration, pathfinding_iteration, simulation_iteration, pathset_paths_df, False, Assignment.OUTPUT_PATHSET_PER_SIM_ITER, not Assignment.DEBUG_OUTPUT_COLUMNS, not Assignment.DEBUG_OUTPUT_COLUMNS) Passenger.write_paths(output_dir, iteration, pathfinding_iteration, simulation_iteration, pathset_links_df, True, Assignment.OUTPUT_PATHSET_PER_SIM_ITER, not Assignment.DEBUG_OUTPUT_COLUMNS, not Assignment.DEBUG_OUTPUT_COLUMNS) # and vehicle trips Assignment.write_vehicle_trips(output_dir, iteration, pathfinding_iteration, simulation_iteration, veh_trips_df) FT.performance.record_step_end(iteration, pathfinding_iteration, simulation_iteration) simulation_iteration += 1 if num_chosen <= 0: FastTripsLogger.info(" No more path choices to make => Ending simulation loop") break if simulation_iteration > Assignment.MAX_SIMULATION_ITERS: FastTripsLogger.info(" Maximum simulation iterations reached (%d) => Ending simulation loop" % Assignment.MAX_SIMULATION_ITERS) break FT.performance.record_step_start(iteration, pathfinding_iteration, simulation_iteration, "output_per_simulation") # Write the pathsets (if we haven't been already) if Assignment.OUTPUT_PATHSET_PER_SIM_ITER == False: Passenger.write_paths(output_dir, iteration, pathfinding_iteration, simulation_iteration, pathset_paths_df, False, Assignment.OUTPUT_PATHSET_PER_SIM_ITER, not Assignment.DEBUG_OUTPUT_COLUMNS, not Assignment.DEBUG_OUTPUT_COLUMNS) Passenger.write_paths(output_dir, iteration, pathfinding_iteration, simulation_iteration, pathset_links_df, True, Assignment.OUTPUT_PATHSET_PER_SIM_ITER, not Assignment.DEBUG_OUTPUT_COLUMNS, not Assignment.DEBUG_OUTPUT_COLUMNS) # write the final chosen paths for this iteration chosen_links_df = Passenger.get_chosen_links(pathset_links_df) chosen_links_df["iteration"] = iteration chosen_links_df["pathfinding_iteration"] = pathfinding_iteration Util.write_dataframe(chosen_links_df, "chosen_links_df", os.path.join(output_dir, "chosenpaths_links.csv"), append=((iteration>1) or (pathfinding_iteration>1)), drop_debug_columns =not Assignment.DEBUG_OUTPUT_COLUMNS, drop_pathfinding_columns=not Assignment.DEBUG_OUTPUT_COLUMNS) chosen_links_df.drop(["iteration", "pathfinding_iteration"], axis=1, inplace=True) chosen_paths_df = Passenger.get_chosen_links(pathset_paths_df) chosen_paths_df["iteration" ] = iteration chosen_paths_df["pathfinding_iteration"] = pathfinding_iteration Util.write_dataframe(chosen_paths_df, "chosen_paths_df", os.path.join(output_dir, "chosenpaths_paths.csv"), append=((iteration>1) or (pathfinding_iteration>1)), drop_debug_columns =not Assignment.DEBUG_OUTPUT_COLUMNS, drop_pathfinding_columns=not Assignment.DEBUG_OUTPUT_COLUMNS) chosen_paths_df.drop(["iteration", "pathfinding_iteration"], axis=1, inplace=True) FT.performance.record_step_end(iteration, pathfinding_iteration, simulation_iteration) return (num_passengers_arrived, pathset_paths_df, pathset_links_df, veh_trips_df)
def find_trip_based_paths_process_worker(iteration, pathfinding_iteration, worker_num, input_network_dir, input_demand_dir, run_config, func_file, output_dir, todo_pathset_queue, done_queue, hyperpath, bump_wait_df, stop_times_df): """ Process worker function. Processes all the paths in queue. todo_queue has (passenger_id, path object) """ worker_str = "_worker%02d" % worker_num from .FastTrips import FastTrips setupLogging(infoLogFilename = None, debugLogFilename = os.path.join(output_dir, FastTrips.DEBUG_LOG % worker_str), logToConsole = False, append = False if ((iteration==1) and (pathfinding_iteration==1)) else True) FastTripsLogger.info("Iteration %d Pathfinding Iteration %d Worker %2d starting" % (iteration, pathfinding_iteration, worker_num)) # the child process doesn't have these set so read them Assignment.CONFIGURATION_FILE = run_config Assignment.CONFIGURATION_FUNCTIONS_FILE = func_file Assignment.read_functions(func_file) Assignment.read_configuration(run_config) # this passes those read parameters and the stop times to the C++ extension Assignment.initialize_fasttrips_extension(worker_num, output_dir, stop_times_df) # the extension has it now, so we're done stop_times_df = None if iteration > 1: Assignment.set_fasttrips_bump_wait(bump_wait_df) while True: # go through my queue -- check if we're done todo = todo_pathset_queue.get() if todo == 'DONE': done_queue.put( (worker_num, 'DONE') ) FastTripsLogger.debug("Received DONE from the todo_pathset_queue") return # do the work pathset = todo FastTripsLogger.info("Processing person %20s trip %20s" % (pathset.person_id, pathset.person_trip_id)) # communicate it to the parent done_queue.put( (worker_num, "STARTING", pathset.person_id, pathset.person_trip_id )) trace_person = False if (pathset.person_id, pathset.person_trip_id) in Assignment.TRACE_IDS: FastTripsLogger.debug("Tracing assignment of person %s trip %s" % (pathset.person_id, pathset.person_trip_id)) trace_person = True try: (pathdict, perf_dict) = Assignment.find_trip_based_pathset(iteration, pathfinding_iteration, pathset, hyperpath, trace=trace_person) done_queue.put( (worker_num, "COMPLETED", pathset.trip_list_id_num, pathdict, perf_dict) ) except: FastTripsLogger.exception("Exception") # call it a day done_queue.put( (worker_num, "EXCEPTION", str(sys.exc_info()) ) ) return