mosstool.trip.generator
1from .AIGC import AigcGenerator 2from .generate_from_od import TripGenerator 3from .gravity import GravityGenerator 4from .random import PositionMode, RandomGenerator 5from .template import (CalibratedTemplateGenerator, GaussianTemplateGenerator, 6 ProbabilisticTemplateGenerator, 7 UniformTemplateGenerator, 8 default_bus_template_generator, 9 default_person_template_generator) 10 11__all__ = [ 12 "ProbabilisticTemplateGenerator", 13 "GaussianTemplateGenerator", 14 "UniformTemplateGenerator", 15 "CalibratedTemplateGenerator", 16 "default_person_template_generator", 17 "default_bus_template_generator", 18 "RandomGenerator", 19 "PositionMode", 20 "GravityGenerator", 21 "AigcGenerator", 22 "TripGenerator", 23]
class
ProbabilisticTemplateGenerator:
93class ProbabilisticTemplateGenerator: 94 def __init__( 95 self, 96 max_speed_values: Optional[List[float]] = None, 97 max_speed_probabilities: Optional[List[float]] = None, 98 max_acceleration_values: Optional[List[float]] = None, 99 max_acceleration_probabilities: Optional[List[float]] = None, 100 max_braking_acceleration_values: Optional[List[float]] = None, 101 max_braking_acceleration_probabilities: Optional[List[float]] = None, 102 usual_braking_acceleration_values: Optional[List[float]] = None, 103 usual_braking_acceleration_probabilities: Optional[List[float]] = None, 104 headway_values: Optional[List[float]] = None, 105 headway_probabilities: Optional[List[float]] = None, 106 min_gap_values: Optional[List[float]] = None, 107 min_gap_probabilities: Optional[List[float]] = None, 108 seed: int = 0, 109 template: Optional[Person] = None, 110 ): 111 """ 112 Args: 113 - max_speed_values (Optional[List[float]]): A list of possible maximum speeds. 114 - max_speed_probabilities (Optional[List[float]]): Probabilities corresponding to max_speed_values. 115 - max_acceleration_values (Optional[List[float]]): A list of possible maximum accelerations. 116 - max_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_acceleration_values. 117 - max_braking_acceleration_values (Optional[List[float]]): A list of possible maximum braking accelerations. 118 - max_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_braking_acceleration_values. 119 - usual_braking_acceleration_values (Optional[List[float]]): A list of usual braking accelerations. 120 - usual_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to usual_braking_acceleration_values. 121 - headway_values (Optional[List[float]]): A list of safe time headways. 122 - headway_probabilities (Optional[List[float]]): Probabilities corresponding to headway_values. 123 - min_gap_values (Optional[List[float]]): A list of minimum gaps. 124 - min_gap_probabilities (Optional[List[float]]): Probabilities corresponding to min_gap_values. 125 - seed (int): Seed value for the random number generator. 126 - template (Optional[Person]): The template function of generated person object. 127 """ 128 self.template_p = template 129 if self.template_p is None: 130 self.template_p = default_person_template_generator() 131 self.template_p.ClearField("schedules") 132 # max speed 133 self.max_speed_values = max_speed_values 134 self.max_speed_probabilities = max_speed_probabilities 135 if max_speed_probabilities is not None and max_speed_values is not None: 136 _sum = sum(max_speed_probabilities) 137 self.max_speed_probabilities = np.array( 138 [d / _sum for d in max_speed_probabilities] 139 ) 140 assert len(max_speed_probabilities) == len( 141 max_speed_values 142 ), f"Inconsistent length between max speed values and probabilities" 143 else: 144 self.max_speed_values = None 145 self.max_speed_probabilities = None 146 # max acceleration 147 self.max_acceleration_values = max_acceleration_values 148 self.max_acceleration_probabilities = max_acceleration_probabilities 149 if ( 150 max_acceleration_probabilities is not None 151 and max_acceleration_values is not None 152 ): 153 _sum = sum(max_acceleration_probabilities) 154 self.max_acceleration_probabilities = np.array( 155 [d / _sum for d in max_acceleration_probabilities] 156 ) 157 assert len(max_acceleration_probabilities) == len( 158 max_acceleration_values 159 ), f"Inconsistent length between max acceleration values and probabilities" 160 else: 161 self.max_acceleration_values = None 162 self.max_acceleration_probabilities = None 163 # max braking acceleration 164 self.max_braking_acceleration_values = max_braking_acceleration_values 165 self.max_braking_acceleration_probabilities = ( 166 max_braking_acceleration_probabilities 167 ) 168 if ( 169 max_braking_acceleration_probabilities is not None 170 and max_braking_acceleration_values is not None 171 ): 172 _sum = sum(max_braking_acceleration_probabilities) 173 self.max_braking_acceleration_probabilities = np.array( 174 [d / _sum for d in max_braking_acceleration_probabilities] 175 ) 176 assert len(max_braking_acceleration_probabilities) == len( 177 max_braking_acceleration_values 178 ), f"Inconsistent length between max braking acceleration values and probabilities" 179 else: 180 self.max_braking_acceleration_values = None 181 self.max_braking_acceleration_probabilities = None 182 # usual braking acceleration 183 self.usual_braking_acceleration_values = usual_braking_acceleration_values 184 self.usual_braking_acceleration_probabilities = ( 185 usual_braking_acceleration_probabilities 186 ) 187 if ( 188 usual_braking_acceleration_probabilities is not None 189 and usual_braking_acceleration_values is not None 190 ): 191 _sum = sum(usual_braking_acceleration_probabilities) 192 self.usual_braking_acceleration_probabilities = np.array( 193 [d / _sum for d in usual_braking_acceleration_probabilities] 194 ) 195 assert len(usual_braking_acceleration_probabilities) == len( 196 usual_braking_acceleration_values 197 ), f"Inconsistent length between usual braking acceleration values and probabilities" 198 else: 199 self.usual_braking_acceleration_values = None 200 self.usual_braking_acceleration_probabilities = None 201 # safe time headway 202 self.headway_values = headway_values 203 self.headway_probabilities = headway_probabilities 204 if headway_probabilities is not None and headway_values is not None: 205 _sum = sum(headway_probabilities) 206 self.headway_probabilities = np.array( 207 [d / _sum for d in headway_probabilities] 208 ) 209 assert len(headway_probabilities) == len( 210 headway_values 211 ), f"Inconsistent length between headway values and probabilities" 212 else: 213 self.headway_values = None 214 self.headway_probabilities = None 215 # min gap 216 self.min_gap_values = min_gap_values 217 self.min_gap_probabilities = min_gap_probabilities 218 if min_gap_probabilities is not None and min_gap_values is not None: 219 _sum = sum(min_gap_probabilities) 220 self.min_gap_probabilities = np.array( 221 [d / _sum for d in min_gap_probabilities] 222 ) 223 assert len(min_gap_probabilities) == len( 224 min_gap_values 225 ), f"Inconsistent length between minGap values and probabilities" 226 else: 227 self.min_gap_values = None 228 self.min_gap_probabilities = None 229 # radom engine 230 self.rng = np.random.default_rng(seed) 231 232 def template_generator( 233 self, 234 ) -> Person: 235 rng = self.rng 236 p = Person() 237 assert self.template_p is not None 238 p.CopyFrom(self.template_p) 239 # max speed 240 if ( 241 self.max_speed_probabilities is not None 242 and self.max_speed_values is not None 243 ): 244 p.vehicle_attribute.max_speed = rng.choice( 245 self.max_speed_values, p=self.max_speed_probabilities 246 ) 247 # max acceleration 248 if ( 249 self.max_acceleration_probabilities is not None 250 and self.max_acceleration_values is not None 251 ): 252 p.vehicle_attribute.max_acceleration = rng.choice( 253 self.max_acceleration_values, p=self.max_acceleration_probabilities 254 ) 255 # max braking acceleration 256 if ( 257 self.max_braking_acceleration_probabilities is not None 258 and self.max_braking_acceleration_values is not None 259 ): 260 p.vehicle_attribute.max_braking_acceleration = rng.choice( 261 self.max_braking_acceleration_values, 262 p=self.max_braking_acceleration_probabilities, 263 ) 264 # usual braking acceleration 265 if ( 266 self.usual_braking_acceleration_probabilities is not None 267 and self.usual_braking_acceleration_values is not None 268 ): 269 p.vehicle_attribute.usual_braking_acceleration = rng.choice( 270 self.usual_braking_acceleration_values, 271 p=self.usual_braking_acceleration_probabilities, 272 ) 273 # safe time headway 274 if self.headway_probabilities is not None and self.headway_values is not None: 275 p.vehicle_attribute.headway = rng.choice( 276 self.headway_values, p=self.headway_probabilities 277 ) 278 # min gap 279 if self.min_gap_probabilities is not None and self.min_gap_values is not None: 280 p.vehicle_attribute.min_gap = rng.choice( 281 self.min_gap_values, p=self.min_gap_probabilities 282 ) 283 return p
ProbabilisticTemplateGenerator( max_speed_values: Optional[List[float]] = None, max_speed_probabilities: Optional[List[float]] = None, max_acceleration_values: Optional[List[float]] = None, max_acceleration_probabilities: Optional[List[float]] = None, max_braking_acceleration_values: Optional[List[float]] = None, max_braking_acceleration_probabilities: Optional[List[float]] = None, usual_braking_acceleration_values: Optional[List[float]] = None, usual_braking_acceleration_probabilities: Optional[List[float]] = None, headway_values: Optional[List[float]] = None, headway_probabilities: Optional[List[float]] = None, min_gap_values: Optional[List[float]] = None, min_gap_probabilities: Optional[List[float]] = None, seed: int = 0, template: Optional[city.person.v2.person_pb2.Person] = None)
94 def __init__( 95 self, 96 max_speed_values: Optional[List[float]] = None, 97 max_speed_probabilities: Optional[List[float]] = None, 98 max_acceleration_values: Optional[List[float]] = None, 99 max_acceleration_probabilities: Optional[List[float]] = None, 100 max_braking_acceleration_values: Optional[List[float]] = None, 101 max_braking_acceleration_probabilities: Optional[List[float]] = None, 102 usual_braking_acceleration_values: Optional[List[float]] = None, 103 usual_braking_acceleration_probabilities: Optional[List[float]] = None, 104 headway_values: Optional[List[float]] = None, 105 headway_probabilities: Optional[List[float]] = None, 106 min_gap_values: Optional[List[float]] = None, 107 min_gap_probabilities: Optional[List[float]] = None, 108 seed: int = 0, 109 template: Optional[Person] = None, 110 ): 111 """ 112 Args: 113 - max_speed_values (Optional[List[float]]): A list of possible maximum speeds. 114 - max_speed_probabilities (Optional[List[float]]): Probabilities corresponding to max_speed_values. 115 - max_acceleration_values (Optional[List[float]]): A list of possible maximum accelerations. 116 - max_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_acceleration_values. 117 - max_braking_acceleration_values (Optional[List[float]]): A list of possible maximum braking accelerations. 118 - max_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_braking_acceleration_values. 119 - usual_braking_acceleration_values (Optional[List[float]]): A list of usual braking accelerations. 120 - usual_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to usual_braking_acceleration_values. 121 - headway_values (Optional[List[float]]): A list of safe time headways. 122 - headway_probabilities (Optional[List[float]]): Probabilities corresponding to headway_values. 123 - min_gap_values (Optional[List[float]]): A list of minimum gaps. 124 - min_gap_probabilities (Optional[List[float]]): Probabilities corresponding to min_gap_values. 125 - seed (int): Seed value for the random number generator. 126 - template (Optional[Person]): The template function of generated person object. 127 """ 128 self.template_p = template 129 if self.template_p is None: 130 self.template_p = default_person_template_generator() 131 self.template_p.ClearField("schedules") 132 # max speed 133 self.max_speed_values = max_speed_values 134 self.max_speed_probabilities = max_speed_probabilities 135 if max_speed_probabilities is not None and max_speed_values is not None: 136 _sum = sum(max_speed_probabilities) 137 self.max_speed_probabilities = np.array( 138 [d / _sum for d in max_speed_probabilities] 139 ) 140 assert len(max_speed_probabilities) == len( 141 max_speed_values 142 ), f"Inconsistent length between max speed values and probabilities" 143 else: 144 self.max_speed_values = None 145 self.max_speed_probabilities = None 146 # max acceleration 147 self.max_acceleration_values = max_acceleration_values 148 self.max_acceleration_probabilities = max_acceleration_probabilities 149 if ( 150 max_acceleration_probabilities is not None 151 and max_acceleration_values is not None 152 ): 153 _sum = sum(max_acceleration_probabilities) 154 self.max_acceleration_probabilities = np.array( 155 [d / _sum for d in max_acceleration_probabilities] 156 ) 157 assert len(max_acceleration_probabilities) == len( 158 max_acceleration_values 159 ), f"Inconsistent length between max acceleration values and probabilities" 160 else: 161 self.max_acceleration_values = None 162 self.max_acceleration_probabilities = None 163 # max braking acceleration 164 self.max_braking_acceleration_values = max_braking_acceleration_values 165 self.max_braking_acceleration_probabilities = ( 166 max_braking_acceleration_probabilities 167 ) 168 if ( 169 max_braking_acceleration_probabilities is not None 170 and max_braking_acceleration_values is not None 171 ): 172 _sum = sum(max_braking_acceleration_probabilities) 173 self.max_braking_acceleration_probabilities = np.array( 174 [d / _sum for d in max_braking_acceleration_probabilities] 175 ) 176 assert len(max_braking_acceleration_probabilities) == len( 177 max_braking_acceleration_values 178 ), f"Inconsistent length between max braking acceleration values and probabilities" 179 else: 180 self.max_braking_acceleration_values = None 181 self.max_braking_acceleration_probabilities = None 182 # usual braking acceleration 183 self.usual_braking_acceleration_values = usual_braking_acceleration_values 184 self.usual_braking_acceleration_probabilities = ( 185 usual_braking_acceleration_probabilities 186 ) 187 if ( 188 usual_braking_acceleration_probabilities is not None 189 and usual_braking_acceleration_values is not None 190 ): 191 _sum = sum(usual_braking_acceleration_probabilities) 192 self.usual_braking_acceleration_probabilities = np.array( 193 [d / _sum for d in usual_braking_acceleration_probabilities] 194 ) 195 assert len(usual_braking_acceleration_probabilities) == len( 196 usual_braking_acceleration_values 197 ), f"Inconsistent length between usual braking acceleration values and probabilities" 198 else: 199 self.usual_braking_acceleration_values = None 200 self.usual_braking_acceleration_probabilities = None 201 # safe time headway 202 self.headway_values = headway_values 203 self.headway_probabilities = headway_probabilities 204 if headway_probabilities is not None and headway_values is not None: 205 _sum = sum(headway_probabilities) 206 self.headway_probabilities = np.array( 207 [d / _sum for d in headway_probabilities] 208 ) 209 assert len(headway_probabilities) == len( 210 headway_values 211 ), f"Inconsistent length between headway values and probabilities" 212 else: 213 self.headway_values = None 214 self.headway_probabilities = None 215 # min gap 216 self.min_gap_values = min_gap_values 217 self.min_gap_probabilities = min_gap_probabilities 218 if min_gap_probabilities is not None and min_gap_values is not None: 219 _sum = sum(min_gap_probabilities) 220 self.min_gap_probabilities = np.array( 221 [d / _sum for d in min_gap_probabilities] 222 ) 223 assert len(min_gap_probabilities) == len( 224 min_gap_values 225 ), f"Inconsistent length between minGap values and probabilities" 226 else: 227 self.min_gap_values = None 228 self.min_gap_probabilities = None 229 # radom engine 230 self.rng = np.random.default_rng(seed)
Args:
- max_speed_values (Optional[List[float]]): A list of possible maximum speeds.
- max_speed_probabilities (Optional[List[float]]): Probabilities corresponding to max_speed_values.
- max_acceleration_values (Optional[List[float]]): A list of possible maximum accelerations.
- max_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_acceleration_values.
- max_braking_acceleration_values (Optional[List[float]]): A list of possible maximum braking accelerations.
- max_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_braking_acceleration_values.
- usual_braking_acceleration_values (Optional[List[float]]): A list of usual braking accelerations.
- usual_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to usual_braking_acceleration_values.
- headway_values (Optional[List[float]]): A list of safe time headways.
- headway_probabilities (Optional[List[float]]): Probabilities corresponding to headway_values.
- min_gap_values (Optional[List[float]]): A list of minimum gaps.
- min_gap_probabilities (Optional[List[float]]): Probabilities corresponding to min_gap_values.
- seed (int): Seed value for the random number generator.
- template (Optional[Person]): The template function of generated person object.
def
template_generator(self) -> city.person.v2.person_pb2.Person:
232 def template_generator( 233 self, 234 ) -> Person: 235 rng = self.rng 236 p = Person() 237 assert self.template_p is not None 238 p.CopyFrom(self.template_p) 239 # max speed 240 if ( 241 self.max_speed_probabilities is not None 242 and self.max_speed_values is not None 243 ): 244 p.vehicle_attribute.max_speed = rng.choice( 245 self.max_speed_values, p=self.max_speed_probabilities 246 ) 247 # max acceleration 248 if ( 249 self.max_acceleration_probabilities is not None 250 and self.max_acceleration_values is not None 251 ): 252 p.vehicle_attribute.max_acceleration = rng.choice( 253 self.max_acceleration_values, p=self.max_acceleration_probabilities 254 ) 255 # max braking acceleration 256 if ( 257 self.max_braking_acceleration_probabilities is not None 258 and self.max_braking_acceleration_values is not None 259 ): 260 p.vehicle_attribute.max_braking_acceleration = rng.choice( 261 self.max_braking_acceleration_values, 262 p=self.max_braking_acceleration_probabilities, 263 ) 264 # usual braking acceleration 265 if ( 266 self.usual_braking_acceleration_probabilities is not None 267 and self.usual_braking_acceleration_values is not None 268 ): 269 p.vehicle_attribute.usual_braking_acceleration = rng.choice( 270 self.usual_braking_acceleration_values, 271 p=self.usual_braking_acceleration_probabilities, 272 ) 273 # safe time headway 274 if self.headway_probabilities is not None and self.headway_values is not None: 275 p.vehicle_attribute.headway = rng.choice( 276 self.headway_values, p=self.headway_probabilities 277 ) 278 # min gap 279 if self.min_gap_probabilities is not None and self.min_gap_values is not None: 280 p.vehicle_attribute.min_gap = rng.choice( 281 self.min_gap_values, p=self.min_gap_probabilities 282 ) 283 return p
class
GaussianTemplateGenerator:
286class GaussianTemplateGenerator: 287 def __init__( 288 self, 289 max_speed_mean: Optional[float] = None, 290 max_speed_std: Optional[float] = None, 291 max_acceleration_mean: Optional[float] = None, 292 max_acceleration_std: Optional[float] = None, 293 max_braking_acceleration_mean: Optional[float] = None, 294 max_braking_acceleration_std: Optional[float] = None, 295 usual_braking_acceleration_mean: Optional[float] = None, 296 usual_braking_acceleration_std: Optional[float] = None, 297 headway_mean: Optional[float] = None, 298 headway_std: Optional[float] = None, 299 min_gap_mean: Optional[float] = None, 300 min_gap_std: Optional[float] = None, 301 seed: int = 0, 302 template: Optional[Person] = None, 303 ) -> None: 304 """ 305 Args 306 - max_speed_mean (Optional[float]): Mean of the Gaussian distribution for maximum speeds. 307 - max_speed_std (Optional[float]): Standard deviation corresponding to max_speed_mean. 308 - max_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum accelerations. 309 - max_acceleration_std (Optional[float]): Standard deviation corresponding to max_acceleration_mean. 310 - max_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum braking accelerations. 311 - max_braking_acceleration_std (Optional[float]): Standard deviation corresponding to max_braking_acceleration_mean. 312 - usual_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for usual braking accelerations. 313 - usual_braking_acceleration_std (Optional[float]): Standard deviation corresponding to usual_braking_acceleration_mean. 314 - headway_mean (Optional[float]): Mean of the Gaussian distribution for safe time headways. 315 - headway_std (Optional[float]): Standard deviation corresponding to headway_mean. 316 - min_gap_mean (Optional[float]): Mean of the Gaussian distribution for minimum gaps. 317 - min_gap_std (Optional[float]): Standard deviation corresponding to min_gap_mean. 318 - seed (int): Seed value for the random number generator. 319 - template (Optional[Person]): The template function of generated person object. 320 """ 321 self.template_p = template 322 if self.template_p is None: 323 self.template_p = default_person_template_generator() 324 self.template_p.ClearField("schedules") 325 # max speed 326 self.max_speed_mean = max_speed_mean 327 self.max_speed_std = max_speed_std 328 # max acceleration 329 self.max_acceleration_mean = max_acceleration_mean 330 self.max_acceleration_std = max_acceleration_std 331 # max braking acceleration 332 self.max_braking_acceleration_mean = max_braking_acceleration_mean 333 self.max_braking_acceleration_std = max_braking_acceleration_std 334 # usual braking acceleration 335 self.usual_braking_acceleration_mean = usual_braking_acceleration_mean 336 self.usual_braking_acceleration_std = usual_braking_acceleration_std 337 # safe time headway 338 self.headway_mean = headway_mean 339 self.headway_std = headway_std 340 # min gap 341 self.min_gap_mean = min_gap_mean 342 self.min_gap_std = min_gap_std 343 # radom engine 344 self.rng = np.random.default_rng(seed) 345 346 def template_generator( 347 self, 348 ) -> Person: 349 rng = self.rng 350 p = Person() 351 assert self.template_p is not None 352 p.CopyFrom(self.template_p) 353 # max speed 354 if self.max_speed_mean is not None and self.max_speed_std is not None: 355 p.vehicle_attribute.max_speed = np.abs( 356 np.sqrt(self.max_speed_std) * rng.normal() + self.max_speed_mean 357 ) 358 # max acceleration 359 if ( 360 self.max_acceleration_mean is not None 361 and self.max_acceleration_std is not None 362 ): 363 p.vehicle_attribute.max_acceleration = np.abs( 364 np.sqrt(self.max_acceleration_std) * rng.normal() 365 + self.max_acceleration_mean 366 ) 367 # max braking acceleration 368 if ( 369 self.max_braking_acceleration_mean is not None 370 and self.max_braking_acceleration_std is not None 371 ): 372 p.vehicle_attribute.max_braking_acceleration = -np.abs( 373 np.sqrt(self.max_braking_acceleration_std) * rng.normal() 374 + self.max_braking_acceleration_std 375 ) 376 # usual braking acceleration 377 if ( 378 self.usual_braking_acceleration_mean is not None 379 and self.usual_braking_acceleration_std is not None 380 ): 381 p.vehicle_attribute.usual_braking_acceleration = -np.abs( 382 np.sqrt(self.usual_braking_acceleration_std) * rng.normal() 383 + self.usual_braking_acceleration_mean 384 ) 385 # safe time headway 386 if self.headway_mean is not None and self.headway_std is not None: 387 p.vehicle_attribute.headway = np.abs( 388 np.sqrt(self.headway_std) * rng.normal() + self.headway_mean 389 ) 390 # min gap 391 if self.min_gap_mean is not None and self.min_gap_std is not None: 392 p.vehicle_attribute.min_gap = np.abs( 393 np.sqrt(self.min_gap_std) * rng.normal() + self.min_gap_mean 394 ) 395 return p
GaussianTemplateGenerator( max_speed_mean: Optional[float] = None, max_speed_std: Optional[float] = None, max_acceleration_mean: Optional[float] = None, max_acceleration_std: Optional[float] = None, max_braking_acceleration_mean: Optional[float] = None, max_braking_acceleration_std: Optional[float] = None, usual_braking_acceleration_mean: Optional[float] = None, usual_braking_acceleration_std: Optional[float] = None, headway_mean: Optional[float] = None, headway_std: Optional[float] = None, min_gap_mean: Optional[float] = None, min_gap_std: Optional[float] = None, seed: int = 0, template: Optional[city.person.v2.person_pb2.Person] = None)
287 def __init__( 288 self, 289 max_speed_mean: Optional[float] = None, 290 max_speed_std: Optional[float] = None, 291 max_acceleration_mean: Optional[float] = None, 292 max_acceleration_std: Optional[float] = None, 293 max_braking_acceleration_mean: Optional[float] = None, 294 max_braking_acceleration_std: Optional[float] = None, 295 usual_braking_acceleration_mean: Optional[float] = None, 296 usual_braking_acceleration_std: Optional[float] = None, 297 headway_mean: Optional[float] = None, 298 headway_std: Optional[float] = None, 299 min_gap_mean: Optional[float] = None, 300 min_gap_std: Optional[float] = None, 301 seed: int = 0, 302 template: Optional[Person] = None, 303 ) -> None: 304 """ 305 Args 306 - max_speed_mean (Optional[float]): Mean of the Gaussian distribution for maximum speeds. 307 - max_speed_std (Optional[float]): Standard deviation corresponding to max_speed_mean. 308 - max_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum accelerations. 309 - max_acceleration_std (Optional[float]): Standard deviation corresponding to max_acceleration_mean. 310 - max_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum braking accelerations. 311 - max_braking_acceleration_std (Optional[float]): Standard deviation corresponding to max_braking_acceleration_mean. 312 - usual_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for usual braking accelerations. 313 - usual_braking_acceleration_std (Optional[float]): Standard deviation corresponding to usual_braking_acceleration_mean. 314 - headway_mean (Optional[float]): Mean of the Gaussian distribution for safe time headways. 315 - headway_std (Optional[float]): Standard deviation corresponding to headway_mean. 316 - min_gap_mean (Optional[float]): Mean of the Gaussian distribution for minimum gaps. 317 - min_gap_std (Optional[float]): Standard deviation corresponding to min_gap_mean. 318 - seed (int): Seed value for the random number generator. 319 - template (Optional[Person]): The template function of generated person object. 320 """ 321 self.template_p = template 322 if self.template_p is None: 323 self.template_p = default_person_template_generator() 324 self.template_p.ClearField("schedules") 325 # max speed 326 self.max_speed_mean = max_speed_mean 327 self.max_speed_std = max_speed_std 328 # max acceleration 329 self.max_acceleration_mean = max_acceleration_mean 330 self.max_acceleration_std = max_acceleration_std 331 # max braking acceleration 332 self.max_braking_acceleration_mean = max_braking_acceleration_mean 333 self.max_braking_acceleration_std = max_braking_acceleration_std 334 # usual braking acceleration 335 self.usual_braking_acceleration_mean = usual_braking_acceleration_mean 336 self.usual_braking_acceleration_std = usual_braking_acceleration_std 337 # safe time headway 338 self.headway_mean = headway_mean 339 self.headway_std = headway_std 340 # min gap 341 self.min_gap_mean = min_gap_mean 342 self.min_gap_std = min_gap_std 343 # radom engine 344 self.rng = np.random.default_rng(seed)
Args
- max_speed_mean (Optional[float]): Mean of the Gaussian distribution for maximum speeds.
- max_speed_std (Optional[float]): Standard deviation corresponding to max_speed_mean.
- max_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum accelerations.
- max_acceleration_std (Optional[float]): Standard deviation corresponding to max_acceleration_mean.
- max_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum braking accelerations.
- max_braking_acceleration_std (Optional[float]): Standard deviation corresponding to max_braking_acceleration_mean.
- usual_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for usual braking accelerations.
- usual_braking_acceleration_std (Optional[float]): Standard deviation corresponding to usual_braking_acceleration_mean.
- headway_mean (Optional[float]): Mean of the Gaussian distribution for safe time headways.
- headway_std (Optional[float]): Standard deviation corresponding to headway_mean.
- min_gap_mean (Optional[float]): Mean of the Gaussian distribution for minimum gaps.
- min_gap_std (Optional[float]): Standard deviation corresponding to min_gap_mean.
- seed (int): Seed value for the random number generator.
- template (Optional[Person]): The template function of generated person object.
def
template_generator(self) -> city.person.v2.person_pb2.Person:
346 def template_generator( 347 self, 348 ) -> Person: 349 rng = self.rng 350 p = Person() 351 assert self.template_p is not None 352 p.CopyFrom(self.template_p) 353 # max speed 354 if self.max_speed_mean is not None and self.max_speed_std is not None: 355 p.vehicle_attribute.max_speed = np.abs( 356 np.sqrt(self.max_speed_std) * rng.normal() + self.max_speed_mean 357 ) 358 # max acceleration 359 if ( 360 self.max_acceleration_mean is not None 361 and self.max_acceleration_std is not None 362 ): 363 p.vehicle_attribute.max_acceleration = np.abs( 364 np.sqrt(self.max_acceleration_std) * rng.normal() 365 + self.max_acceleration_mean 366 ) 367 # max braking acceleration 368 if ( 369 self.max_braking_acceleration_mean is not None 370 and self.max_braking_acceleration_std is not None 371 ): 372 p.vehicle_attribute.max_braking_acceleration = -np.abs( 373 np.sqrt(self.max_braking_acceleration_std) * rng.normal() 374 + self.max_braking_acceleration_std 375 ) 376 # usual braking acceleration 377 if ( 378 self.usual_braking_acceleration_mean is not None 379 and self.usual_braking_acceleration_std is not None 380 ): 381 p.vehicle_attribute.usual_braking_acceleration = -np.abs( 382 np.sqrt(self.usual_braking_acceleration_std) * rng.normal() 383 + self.usual_braking_acceleration_mean 384 ) 385 # safe time headway 386 if self.headway_mean is not None and self.headway_std is not None: 387 p.vehicle_attribute.headway = np.abs( 388 np.sqrt(self.headway_std) * rng.normal() + self.headway_mean 389 ) 390 # min gap 391 if self.min_gap_mean is not None and self.min_gap_std is not None: 392 p.vehicle_attribute.min_gap = np.abs( 393 np.sqrt(self.min_gap_std) * rng.normal() + self.min_gap_mean 394 ) 395 return p
class
UniformTemplateGenerator:
398class UniformTemplateGenerator: 399 def __init__( 400 self, 401 max_speed_min: Optional[float] = None, 402 max_speed_max: Optional[float] = None, 403 max_acceleration_min: Optional[float] = None, 404 max_acceleration_max: Optional[float] = None, 405 max_braking_acceleration_min: Optional[float] = None, 406 max_braking_acceleration_max: Optional[float] = None, 407 usual_braking_acceleration_min: Optional[float] = None, 408 usual_braking_acceleration_max: Optional[float] = None, 409 headway_min: Optional[float] = None, 410 headway_max: Optional[float] = None, 411 min_gap_min: Optional[float] = None, 412 min_gap_max: Optional[float] = None, 413 seed: int = 0, 414 template: Optional[Person] = None, 415 ) -> None: 416 """ 417 Args 418 - max_speed_mean (Optional[float]): Lower bound of the Uniform distribution for maximum speeds. 419 - max_speed_std (Optional[float]): Higher bound of the Uniform distribution for maximum speeds. 420 - max_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum accelerations. 421 - max_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum accelerations. 422 - max_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum braking accelerations. 423 - max_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum braking accelerations. 424 - usual_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for usual braking accelerations. 425 - usual_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for usual braking accelerations. 426 - headway_mean (Optional[float]): Lower bound of the Uniform distribution for safe time headways. 427 - headway_std (Optional[float]): Higher bound of the Uniform distribution for safe time headways. 428 - min_gap_mean (Optional[float]): Lower bound of the Uniform distribution for minimum gaps. 429 - min_gap_std (Optional[float]): Higher bound of the Uniform distribution for minimum gaps. 430 - seed (int): Seed value for the random number generator. 431 - template (Optional[Person]): The template function of generated person object. 432 """ 433 self.template_p = template 434 if self.template_p is None: 435 self.template_p = default_person_template_generator() 436 self.template_p.ClearField("schedules") 437 # max speed 438 self.max_speed_min = max_speed_min 439 self.max_speed_max = max_speed_max 440 # max acceleration 441 self.max_acceleration_min = max_acceleration_min 442 self.max_acceleration_max = max_acceleration_max 443 # max braking acceleration 444 self.max_braking_acceleration_min = max_braking_acceleration_min 445 self.max_braking_acceleration_max = max_braking_acceleration_max 446 # usual braking acceleration 447 self.usual_braking_acceleration_min = usual_braking_acceleration_min 448 self.usual_braking_acceleration_max = usual_braking_acceleration_max 449 # safe time headway 450 self.headway_min = headway_min 451 self.headway_max = headway_max 452 # min gap 453 self.min_gap_min = min_gap_min 454 self.min_gap_max = min_gap_max 455 # radom engine 456 self.rng = np.random.default_rng(seed) 457 458 def template_generator( 459 self, 460 ) -> Person: 461 rng = self.rng 462 p = Person() 463 assert self.template_p is not None 464 p.CopyFrom(self.template_p) 465 # max speed 466 if self.max_speed_min is not None and self.max_speed_max is not None: 467 p.vehicle_attribute.max_speed = np.abs( 468 rng.uniform(self.max_speed_min, self.max_speed_max) 469 ) 470 # max acceleration 471 if ( 472 self.max_acceleration_min is not None 473 and self.max_acceleration_max is not None 474 ): 475 p.vehicle_attribute.max_acceleration = np.abs( 476 rng.uniform(self.max_acceleration_min, self.max_acceleration_max) 477 ) 478 # max braking acceleration 479 if ( 480 self.max_braking_acceleration_min is not None 481 and self.max_braking_acceleration_max is not None 482 ): 483 p.vehicle_attribute.max_braking_acceleration = -np.abs( 484 rng.uniform( 485 self.max_braking_acceleration_min, self.max_braking_acceleration_max 486 ) 487 ) 488 # usual braking acceleration 489 if ( 490 self.usual_braking_acceleration_min is not None 491 and self.usual_braking_acceleration_max is not None 492 ): 493 p.vehicle_attribute.usual_braking_acceleration = -np.abs( 494 rng.uniform( 495 self.usual_braking_acceleration_min, 496 self.usual_braking_acceleration_max, 497 ) 498 ) 499 # safe time headway 500 if self.headway_min is not None and self.headway_max is not None: 501 p.vehicle_attribute.headway = np.abs( 502 rng.uniform(self.headway_min, self.headway_max) 503 ) 504 # min gap 505 if self.min_gap_min is not None and self.min_gap_max is not None: 506 p.vehicle_attribute.min_gap = np.abs( 507 rng.uniform(self.min_gap_min, self.min_gap_max) 508 ) 509 return p
UniformTemplateGenerator( max_speed_min: Optional[float] = None, max_speed_max: Optional[float] = None, max_acceleration_min: Optional[float] = None, max_acceleration_max: Optional[float] = None, max_braking_acceleration_min: Optional[float] = None, max_braking_acceleration_max: Optional[float] = None, usual_braking_acceleration_min: Optional[float] = None, usual_braking_acceleration_max: Optional[float] = None, headway_min: Optional[float] = None, headway_max: Optional[float] = None, min_gap_min: Optional[float] = None, min_gap_max: Optional[float] = None, seed: int = 0, template: Optional[city.person.v2.person_pb2.Person] = None)
399 def __init__( 400 self, 401 max_speed_min: Optional[float] = None, 402 max_speed_max: Optional[float] = None, 403 max_acceleration_min: Optional[float] = None, 404 max_acceleration_max: Optional[float] = None, 405 max_braking_acceleration_min: Optional[float] = None, 406 max_braking_acceleration_max: Optional[float] = None, 407 usual_braking_acceleration_min: Optional[float] = None, 408 usual_braking_acceleration_max: Optional[float] = None, 409 headway_min: Optional[float] = None, 410 headway_max: Optional[float] = None, 411 min_gap_min: Optional[float] = None, 412 min_gap_max: Optional[float] = None, 413 seed: int = 0, 414 template: Optional[Person] = None, 415 ) -> None: 416 """ 417 Args 418 - max_speed_mean (Optional[float]): Lower bound of the Uniform distribution for maximum speeds. 419 - max_speed_std (Optional[float]): Higher bound of the Uniform distribution for maximum speeds. 420 - max_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum accelerations. 421 - max_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum accelerations. 422 - max_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum braking accelerations. 423 - max_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum braking accelerations. 424 - usual_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for usual braking accelerations. 425 - usual_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for usual braking accelerations. 426 - headway_mean (Optional[float]): Lower bound of the Uniform distribution for safe time headways. 427 - headway_std (Optional[float]): Higher bound of the Uniform distribution for safe time headways. 428 - min_gap_mean (Optional[float]): Lower bound of the Uniform distribution for minimum gaps. 429 - min_gap_std (Optional[float]): Higher bound of the Uniform distribution for minimum gaps. 430 - seed (int): Seed value for the random number generator. 431 - template (Optional[Person]): The template function of generated person object. 432 """ 433 self.template_p = template 434 if self.template_p is None: 435 self.template_p = default_person_template_generator() 436 self.template_p.ClearField("schedules") 437 # max speed 438 self.max_speed_min = max_speed_min 439 self.max_speed_max = max_speed_max 440 # max acceleration 441 self.max_acceleration_min = max_acceleration_min 442 self.max_acceleration_max = max_acceleration_max 443 # max braking acceleration 444 self.max_braking_acceleration_min = max_braking_acceleration_min 445 self.max_braking_acceleration_max = max_braking_acceleration_max 446 # usual braking acceleration 447 self.usual_braking_acceleration_min = usual_braking_acceleration_min 448 self.usual_braking_acceleration_max = usual_braking_acceleration_max 449 # safe time headway 450 self.headway_min = headway_min 451 self.headway_max = headway_max 452 # min gap 453 self.min_gap_min = min_gap_min 454 self.min_gap_max = min_gap_max 455 # radom engine 456 self.rng = np.random.default_rng(seed)
Args
- max_speed_mean (Optional[float]): Lower bound of the Uniform distribution for maximum speeds.
- max_speed_std (Optional[float]): Higher bound of the Uniform distribution for maximum speeds.
- max_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum accelerations.
- max_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum accelerations.
- max_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum braking accelerations.
- max_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum braking accelerations.
- usual_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for usual braking accelerations.
- usual_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for usual braking accelerations.
- headway_mean (Optional[float]): Lower bound of the Uniform distribution for safe time headways.
- headway_std (Optional[float]): Higher bound of the Uniform distribution for safe time headways.
- min_gap_mean (Optional[float]): Lower bound of the Uniform distribution for minimum gaps.
- min_gap_std (Optional[float]): Higher bound of the Uniform distribution for minimum gaps.
- seed (int): Seed value for the random number generator.
- template (Optional[Person]): The template function of generated person object.
def
template_generator(self) -> city.person.v2.person_pb2.Person:
458 def template_generator( 459 self, 460 ) -> Person: 461 rng = self.rng 462 p = Person() 463 assert self.template_p is not None 464 p.CopyFrom(self.template_p) 465 # max speed 466 if self.max_speed_min is not None and self.max_speed_max is not None: 467 p.vehicle_attribute.max_speed = np.abs( 468 rng.uniform(self.max_speed_min, self.max_speed_max) 469 ) 470 # max acceleration 471 if ( 472 self.max_acceleration_min is not None 473 and self.max_acceleration_max is not None 474 ): 475 p.vehicle_attribute.max_acceleration = np.abs( 476 rng.uniform(self.max_acceleration_min, self.max_acceleration_max) 477 ) 478 # max braking acceleration 479 if ( 480 self.max_braking_acceleration_min is not None 481 and self.max_braking_acceleration_max is not None 482 ): 483 p.vehicle_attribute.max_braking_acceleration = -np.abs( 484 rng.uniform( 485 self.max_braking_acceleration_min, self.max_braking_acceleration_max 486 ) 487 ) 488 # usual braking acceleration 489 if ( 490 self.usual_braking_acceleration_min is not None 491 and self.usual_braking_acceleration_max is not None 492 ): 493 p.vehicle_attribute.usual_braking_acceleration = -np.abs( 494 rng.uniform( 495 self.usual_braking_acceleration_min, 496 self.usual_braking_acceleration_max, 497 ) 498 ) 499 # safe time headway 500 if self.headway_min is not None and self.headway_max is not None: 501 p.vehicle_attribute.headway = np.abs( 502 rng.uniform(self.headway_min, self.headway_max) 503 ) 504 # min gap 505 if self.min_gap_min is not None and self.min_gap_max is not None: 506 p.vehicle_attribute.min_gap = np.abs( 507 rng.uniform(self.min_gap_min, self.min_gap_max) 508 ) 509 return p
class
CalibratedTemplateGenerator:
512class CalibratedTemplateGenerator: 513 def __init__(self, seed: int = 0, template: Optional[Person] = None) -> None: 514 """ 515 Args 516 - seed (int): Seed value for the random number generator. 517 - template (Optional[Person]): The template function of generated person object. 518 """ 519 self.template_p = template 520 if self.template_p is None: 521 self.template_p = default_person_template_generator() 522 self.template_p.ClearField("schedules") 523 # radom engine 524 self.rng = np.random.default_rng(seed) 525 526 def template_generator( 527 self, 528 ) -> Person: 529 rng = self.rng 530 p = Person() 531 assert self.template_p is not None 532 p.CopyFrom(self.template_p) 533 # max acceleration 534 p.vehicle_attribute.max_acceleration = rng.choice( 535 CALIBRATED_MAX_ACC_VALUES, p=CALIBRATED_MAX_ACC_PDF 536 ) 537 # usual braking acceleration 538 p.vehicle_attribute.usual_braking_acceleration = rng.choice( 539 CALIBRATED_BRAKE_ACC_VALUES, p=CALIBRATED_BRAKE_ACC_PDF 540 ) 541 # safe time headway 542 p.vehicle_attribute.headway = rng.choice( 543 CALIBRATED_HEADWAY_VALUES, p=CALIBRATED_HEADWAY_PDF 544 ) 545 return p
CalibratedTemplateGenerator( seed: int = 0, template: Optional[city.person.v2.person_pb2.Person] = None)
513 def __init__(self, seed: int = 0, template: Optional[Person] = None) -> None: 514 """ 515 Args 516 - seed (int): Seed value for the random number generator. 517 - template (Optional[Person]): The template function of generated person object. 518 """ 519 self.template_p = template 520 if self.template_p is None: 521 self.template_p = default_person_template_generator() 522 self.template_p.ClearField("schedules") 523 # radom engine 524 self.rng = np.random.default_rng(seed)
Args
- seed (int): Seed value for the random number generator.
- template (Optional[Person]): The template function of generated person object.
def
template_generator(self) -> city.person.v2.person_pb2.Person:
526 def template_generator( 527 self, 528 ) -> Person: 529 rng = self.rng 530 p = Person() 531 assert self.template_p is not None 532 p.CopyFrom(self.template_p) 533 # max acceleration 534 p.vehicle_attribute.max_acceleration = rng.choice( 535 CALIBRATED_MAX_ACC_VALUES, p=CALIBRATED_MAX_ACC_PDF 536 ) 537 # usual braking acceleration 538 p.vehicle_attribute.usual_braking_acceleration = rng.choice( 539 CALIBRATED_BRAKE_ACC_VALUES, p=CALIBRATED_BRAKE_ACC_PDF 540 ) 541 # safe time headway 542 p.vehicle_attribute.headway = rng.choice( 543 CALIBRATED_HEADWAY_VALUES, p=CALIBRATED_HEADWAY_PDF 544 ) 545 return p
def
default_person_template_generator() -> city.person.v2.person_pb2.Person:
25def default_person_template_generator() -> Person: 26 return Person( 27 attribute=PersonAttribute(), 28 type=PersonType.PERSON_TYPE_NORMAL, 29 vehicle_attribute=VehicleAttribute( 30 length=5, 31 width=2, 32 max_speed=150 / 3.6, 33 max_acceleration=3, 34 max_braking_acceleration=-10, 35 usual_acceleration=2, 36 usual_braking_acceleration=-4.5, 37 headway=1.5, 38 lane_max_speed_recognition_deviation=1.0, 39 lane_change_length=10, 40 min_gap=1, 41 emission_attribute=EmissionAttribute( 42 weight=2100, 43 type=VehicleEngineType.VEHICLE_ENGINE_TYPE_FUEL, 44 coefficient_drag=0.251, 45 lambda_s=0.29, 46 frontal_area=2.52, 47 fuel_efficiency=VehicleEngineEfficiency( 48 energy_conversion_efficiency=0.27 * 0.049, 49 c_ef=66.98, 50 ), 51 ), 52 model="normal", 53 ), 54 pedestrian_attribute=PedestrianAttribute(speed=1.34, model="normal"), 55 bike_attribute=BikeAttribute(speed=5, model="normal"), 56 )
def
default_bus_template_generator() -> city.person.v2.person_pb2.Person:
59def default_bus_template_generator() -> Person: 60 return Person( 61 attribute=PersonAttribute(), 62 type=PersonType.PERSON_TYPE_NORMAL, 63 vehicle_attribute=VehicleAttribute( 64 length=15, 65 width=2, 66 max_speed=150 / 3.6, 67 max_acceleration=3, 68 max_braking_acceleration=-10, 69 usual_acceleration=2, 70 usual_braking_acceleration=-4.5, 71 headway=1.5, 72 lane_max_speed_recognition_deviation=1.0, 73 lane_change_length=10, 74 min_gap=1, 75 emission_attribute=EmissionAttribute( 76 weight=18000, 77 type=VehicleEngineType.VEHICLE_ENGINE_TYPE_FUEL, 78 coefficient_drag=0.251, 79 lambda_s=0.29, 80 frontal_area=8.67, 81 fuel_efficiency=VehicleEngineEfficiency( 82 energy_conversion_efficiency=0.27 * 0.049, 83 c_ef=70.27, 84 ), 85 ), 86 model="normal", 87 ), 88 pedestrian_attribute=PedestrianAttribute(speed=1.34, model="normal"), 89 bike_attribute=BikeAttribute(speed=5, model="normal"), 90 )
class
RandomGenerator:
23class RandomGenerator: 24 """ 25 Random trip generator 26 """ 27 28 def __init__( 29 self, 30 m: Map, 31 position_modes: List[PositionMode], 32 trip_mode: TripMode, 33 template_func: Callable[[], Person] = default_person_template_generator, 34 ): 35 """ 36 Args: 37 - m (Map): The Map. 38 - position_modes (List[PositionMode]): The schedules generated will follow the position modes in this list. For example, if the list is [PositionMode.AOI, PositionMode.LANE, PositionMode.AOI], the generated person will start from an AOI, then go to a lane, and finally go to an AOI. 39 - trip_mode (TripMode): The target trip mode. 40 - template_func (Callable[[],Person]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. 41 """ 42 self.m = m 43 self.modes = position_modes 44 if len(self.modes) <= 1: 45 raise ValueError("position_modes should have at least 2 elements") 46 self.trip_mode = trip_mode 47 self.template = template_func 48 49 # Pre-process the map to build the randomly selected candidate set 50 walk = is_walking(trip_mode) 51 self._aoi_candidates = [ 52 aoi 53 for aoi in self.m.aois 54 if len(aoi.walking_positions if walk else aoi.driving_positions) > 0 55 ] 56 self._lane_candidates = [ 57 lane 58 for lane in self.m.lanes 59 if lane.parent_id < JUNC_START_ID 60 and lane.type 61 == (LaneType.LANE_TYPE_WALKING if walk else LaneType.LANE_TYPE_DRIVING) 62 ] 63 if PositionMode.AOI in position_modes and len(self._aoi_candidates) == 0: 64 raise ValueError("No available AOI") 65 if PositionMode.LANE in position_modes and len(self._lane_candidates) == 0: 66 raise ValueError("No available lane") 67 68 def _rand_position(self, candidates: Union[List[Aoi], List[Lane]]): 69 index = np.random.randint(0, len(candidates)) 70 candidate = candidates[index] 71 if isinstance(candidate, Aoi): 72 return Position(aoi_position=AoiPosition(aoi_id=candidate.id)) 73 else: 74 return Position( 75 lane_position=LanePosition( 76 lane_id=candidate.id, s=np.random.rand() * candidate.length 77 ) 78 ) 79 80 def uniform( 81 self, 82 num: int, 83 first_departure_time_range: Tuple[float, float], 84 schedule_interval_range: Tuple[float, float], 85 seed: Optional[int] = None, 86 start_id: Optional[int] = None, 87 ) -> List[Person]: 88 """ 89 Generate a person object by uniform random sampling 90 91 Args: 92 - num (int): The number of person objects to generate. 93 - first_departure_time_range (Tuple[float, float]): The range of the first departure time (uniform random sampling). 94 - schedule_interval_range (Tuple[float, float]): The range of the interval between schedules (uniform random sampling). 95 - seed (Optional[int], optional): The random seed. Defaults to None. 96 - start_id (Optional[int], optional): The start id of the generated person objects. Defaults to None. If None, the `id` will be NOT set. 97 98 Returns: 99 - List[Person]: The generated person objects. 100 """ 101 if seed is not None: 102 np.random.seed(seed) 103 persons = [] 104 for i in range(num): 105 p = Person() 106 p.CopyFrom(self.template()) 107 p.ClearField("schedules") 108 if start_id is not None: 109 p.id = start_id + i 110 p.home.CopyFrom( 111 self._rand_position( 112 self._aoi_candidates 113 if self.modes[0] == PositionMode.AOI 114 else self._lane_candidates 115 ) 116 ) 117 departure_time = np.random.uniform(*first_departure_time_range) 118 for mode in self.modes[1:]: 119 schedule = cast(Schedule, p.schedules.add()) 120 schedule.departure_time = departure_time 121 schedule.loop_count = 1 122 trip = Trip( 123 mode=self.trip_mode, 124 end=self._rand_position( 125 self._aoi_candidates 126 if mode == PositionMode.AOI 127 else self._lane_candidates 128 ), 129 ) 130 schedule.trips.append(trip) 131 departure_time += np.random.uniform(*schedule_interval_range) 132 persons.append(p) 133 return persons
Random trip generator
RandomGenerator( m: city.map.v2.map_pb2.Map, position_modes: List[PositionMode], trip_mode: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object>, template_func: Callable[[], city.person.v2.person_pb2.Person] = <function default_person_template_generator>)
28 def __init__( 29 self, 30 m: Map, 31 position_modes: List[PositionMode], 32 trip_mode: TripMode, 33 template_func: Callable[[], Person] = default_person_template_generator, 34 ): 35 """ 36 Args: 37 - m (Map): The Map. 38 - position_modes (List[PositionMode]): The schedules generated will follow the position modes in this list. For example, if the list is [PositionMode.AOI, PositionMode.LANE, PositionMode.AOI], the generated person will start from an AOI, then go to a lane, and finally go to an AOI. 39 - trip_mode (TripMode): The target trip mode. 40 - template_func (Callable[[],Person]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. 41 """ 42 self.m = m 43 self.modes = position_modes 44 if len(self.modes) <= 1: 45 raise ValueError("position_modes should have at least 2 elements") 46 self.trip_mode = trip_mode 47 self.template = template_func 48 49 # Pre-process the map to build the randomly selected candidate set 50 walk = is_walking(trip_mode) 51 self._aoi_candidates = [ 52 aoi 53 for aoi in self.m.aois 54 if len(aoi.walking_positions if walk else aoi.driving_positions) > 0 55 ] 56 self._lane_candidates = [ 57 lane 58 for lane in self.m.lanes 59 if lane.parent_id < JUNC_START_ID 60 and lane.type 61 == (LaneType.LANE_TYPE_WALKING if walk else LaneType.LANE_TYPE_DRIVING) 62 ] 63 if PositionMode.AOI in position_modes and len(self._aoi_candidates) == 0: 64 raise ValueError("No available AOI") 65 if PositionMode.LANE in position_modes and len(self._lane_candidates) == 0: 66 raise ValueError("No available lane")
Args:
- m (Map): The Map.
- position_modes (List[PositionMode]): The schedules generated will follow the position modes in this list. For example, if the list is [PositionMode.AOI, PositionMode.LANE, PositionMode.AOI], the generated person will start from an AOI, then go to a lane, and finally go to an AOI.
- trip_mode (TripMode): The target trip mode.
- template_func (Callable[[],Person]): The template function of generated person object, whose
schedules
,home
will be replaced and others will be copied.
def
uniform( self, num: int, first_departure_time_range: Tuple[float, float], schedule_interval_range: Tuple[float, float], seed: Optional[int] = None, start_id: Optional[int] = None) -> List[city.person.v2.person_pb2.Person]:
80 def uniform( 81 self, 82 num: int, 83 first_departure_time_range: Tuple[float, float], 84 schedule_interval_range: Tuple[float, float], 85 seed: Optional[int] = None, 86 start_id: Optional[int] = None, 87 ) -> List[Person]: 88 """ 89 Generate a person object by uniform random sampling 90 91 Args: 92 - num (int): The number of person objects to generate. 93 - first_departure_time_range (Tuple[float, float]): The range of the first departure time (uniform random sampling). 94 - schedule_interval_range (Tuple[float, float]): The range of the interval between schedules (uniform random sampling). 95 - seed (Optional[int], optional): The random seed. Defaults to None. 96 - start_id (Optional[int], optional): The start id of the generated person objects. Defaults to None. If None, the `id` will be NOT set. 97 98 Returns: 99 - List[Person]: The generated person objects. 100 """ 101 if seed is not None: 102 np.random.seed(seed) 103 persons = [] 104 for i in range(num): 105 p = Person() 106 p.CopyFrom(self.template()) 107 p.ClearField("schedules") 108 if start_id is not None: 109 p.id = start_id + i 110 p.home.CopyFrom( 111 self._rand_position( 112 self._aoi_candidates 113 if self.modes[0] == PositionMode.AOI 114 else self._lane_candidates 115 ) 116 ) 117 departure_time = np.random.uniform(*first_departure_time_range) 118 for mode in self.modes[1:]: 119 schedule = cast(Schedule, p.schedules.add()) 120 schedule.departure_time = departure_time 121 schedule.loop_count = 1 122 trip = Trip( 123 mode=self.trip_mode, 124 end=self._rand_position( 125 self._aoi_candidates 126 if mode == PositionMode.AOI 127 else self._lane_candidates 128 ), 129 ) 130 schedule.trips.append(trip) 131 departure_time += np.random.uniform(*schedule_interval_range) 132 persons.append(p) 133 return persons
Generate a person object by uniform random sampling
Args:
- num (int): The number of person objects to generate.
- first_departure_time_range (Tuple[float, float]): The range of the first departure time (uniform random sampling).
- schedule_interval_range (Tuple[float, float]): The range of the interval between schedules (uniform random sampling).
- seed (Optional[int], optional): The random seed. Defaults to None.
- start_id (Optional[int], optional): The start id of the generated person objects. Defaults to None. If None, the
id
will be NOT set.
Returns:
- List[Person]: The generated person objects.
class
PositionMode(enum.Enum):
An enumeration.
AOI =
<PositionMode.AOI: 0>
LANE =
<PositionMode.LANE: 1>
class
GravityGenerator:
9class GravityGenerator: 10 """ 11 Generate OD matrix inside the given area, based on the gravity model. 12 """ 13 14 def __init__( 15 self, 16 Lambda: float, 17 Alpha: float, 18 Beta: float, 19 Gamma: float, 20 ): 21 """ 22 Args: 23 - pops (list[int]): The population of each area. 24 - dists (np.ndarray): The distance matrix between each pair of areas. 25 """ 26 self._lambda = Lambda 27 self._alpha = Alpha 28 self._beta = Beta 29 self._gamma = Gamma 30 31 def load_area( 32 self, 33 area: gpd.GeoDataFrame, 34 ): 35 """ 36 Load the area data. 37 38 Args: 39 - area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string. 40 """ 41 self._area = area 42 43 def _get_one_point(self): 44 """ 45 get one point from the shapefile 46 """ 47 first_geometry = self._area.geometry.iloc[0] 48 49 # for different types of geometry, get the first point 50 if first_geometry.geom_type == "Polygon": 51 first_point = first_geometry.exterior.coords[0] 52 elif first_geometry.geom_type == "MultiPolygon": 53 first_polygon = list(first_geometry.geoms)[0] 54 first_point = first_polygon.exterior.coords[0] 55 else: 56 raise ValueError("Geometry type not supported") 57 58 pointx, pointy = first_point[0], first_point[1] 59 60 return pointx, pointy 61 62 def _calculate_utm_epsg(self, longitude: float, latitude: float): 63 """ 64 Calculate the UTM zone and corresponding EPSG code for a given longitude and latitude. 65 66 Args: 67 longitude (float): The longitude of the location. 68 latitude (float): The latitude of the location. 69 70 Returns: 71 int: The EPSG code for the UTM zone. 72 """ 73 # Determine the UTM zone from the longitude 74 utm_zone = int((longitude + 180) / 6) + 1 75 76 # Determine the hemisphere and construct the EPSG code 77 if latitude >= 0: 78 # Northern Hemisphere 79 epsg_code = 32600 + utm_zone 80 else: 81 # Southern Hemisphere 82 epsg_code = 32700 + utm_zone 83 84 return epsg_code 85 86 def cal_distance(self): 87 """ 88 Euclidean distance matrix 89 get the distance matrix for regions in the area 90 based on the shapefile of the area 91 """ 92 pointx, pointy = self._get_one_point() 93 epsg = self._calculate_utm_epsg(pointx, pointy) 94 area = self._area.to_crs(f"EPSG:{epsg}") 95 area["centroid"] = area["geometry"].centroid # type:ignore 96 97 # dis 98 points = area["centroid"].apply(lambda p: [p.x, p.y]).tolist() # type:ignore 99 dist_matrix = distance_matrix(points, points) 100 101 distance = dist_matrix.astype(np.float32) 102 103 return distance 104 105 def generate(self, pops): #: list[int] 106 """ 107 Generate the OD matrix based on the gravity model. 108 109 Args: 110 - pops (list[int]): The population of each area. 111 - dists (np.ndarray): The distance matrix between each pair of areas. 112 113 Returns: 114 - np.ndarray: The generated OD matrix. 115 """ 116 dists = self.cal_distance() 117 118 N = np.power(pops, self._alpha) 119 M = np.power(pops, self._beta) 120 D = np.power(dists, self._gamma) 121 122 N = N.reshape(-1, 1).repeat(len(pops), axis=1) 123 M = M.reshape(1, -1).repeat(len(pops), axis=0) 124 125 od_matrix = self._lambda * N * M / (D + 1e-8) 126 od_matrix = od_matrix.astype(np.int64) 127 od_matrix[od_matrix < 0] = 0 128 for i in range(od_matrix.shape[0]): 129 od_matrix[i, i] = 0 130 131 return od_matrix
Generate OD matrix inside the given area, based on the gravity model.
GravityGenerator(Lambda: float, Alpha: float, Beta: float, Gamma: float)
14 def __init__( 15 self, 16 Lambda: float, 17 Alpha: float, 18 Beta: float, 19 Gamma: float, 20 ): 21 """ 22 Args: 23 - pops (list[int]): The population of each area. 24 - dists (np.ndarray): The distance matrix between each pair of areas. 25 """ 26 self._lambda = Lambda 27 self._alpha = Alpha 28 self._beta = Beta 29 self._gamma = Gamma
Args:
- pops (list[int]): The population of each area.
- dists (np.ndarray): The distance matrix between each pair of areas.
def
load_area(self, area: geopandas.geodataframe.GeoDataFrame):
31 def load_area( 32 self, 33 area: gpd.GeoDataFrame, 34 ): 35 """ 36 Load the area data. 37 38 Args: 39 - area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string. 40 """ 41 self._area = area
Load the area data.
Args:
- area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined
crs
string.
def
cal_distance(self):
86 def cal_distance(self): 87 """ 88 Euclidean distance matrix 89 get the distance matrix for regions in the area 90 based on the shapefile of the area 91 """ 92 pointx, pointy = self._get_one_point() 93 epsg = self._calculate_utm_epsg(pointx, pointy) 94 area = self._area.to_crs(f"EPSG:{epsg}") 95 area["centroid"] = area["geometry"].centroid # type:ignore 96 97 # dis 98 points = area["centroid"].apply(lambda p: [p.x, p.y]).tolist() # type:ignore 99 dist_matrix = distance_matrix(points, points) 100 101 distance = dist_matrix.astype(np.float32) 102 103 return distance
Euclidean distance matrix get the distance matrix for regions in the area based on the shapefile of the area
def
generate(self, pops):
105 def generate(self, pops): #: list[int] 106 """ 107 Generate the OD matrix based on the gravity model. 108 109 Args: 110 - pops (list[int]): The population of each area. 111 - dists (np.ndarray): The distance matrix between each pair of areas. 112 113 Returns: 114 - np.ndarray: The generated OD matrix. 115 """ 116 dists = self.cal_distance() 117 118 N = np.power(pops, self._alpha) 119 M = np.power(pops, self._beta) 120 D = np.power(dists, self._gamma) 121 122 N = N.reshape(-1, 1).repeat(len(pops), axis=1) 123 M = M.reshape(1, -1).repeat(len(pops), axis=0) 124 125 od_matrix = self._lambda * N * M / (D + 1e-8) 126 od_matrix = od_matrix.astype(np.int64) 127 od_matrix[od_matrix < 0] = 0 128 for i in range(od_matrix.shape[0]): 129 od_matrix[i, i] = 0 130 131 return od_matrix
Generate the OD matrix based on the gravity model.
Args:
- pops (list[int]): The population of each area.
- dists (np.ndarray): The distance matrix between each pair of areas.
Returns:
- np.ndarray: The generated OD matrix.
class
AigcGenerator:
8class AigcGenerator: 9 """ 10 Generate OD matrix inside the given area. 11 """ 12 13 def __init__(self): 14 self.generator = generator.Generator() 15 16 def set_satetoken(self, satetoken: str): 17 """ 18 Set the satetoken for the generator. 19 """ 20 self.generator.set_satetoken(satetoken) 21 22 def load_area( 23 self, 24 area: gpd.GeoDataFrame, 25 ): 26 """ 27 Load the area data. 28 29 Args: 30 - area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string. 31 """ 32 self.generator.load_area(area) 33 34 def generate(self): 35 """ 36 Generate the OD matrix. 37 """ 38 return self.generator.generate()
Generate OD matrix inside the given area.
def
set_satetoken(self, satetoken: str):
16 def set_satetoken(self, satetoken: str): 17 """ 18 Set the satetoken for the generator. 19 """ 20 self.generator.set_satetoken(satetoken)
Set the satetoken for the generator.
def
load_area(self, area: geopandas.geodataframe.GeoDataFrame):
22 def load_area( 23 self, 24 area: gpd.GeoDataFrame, 25 ): 26 """ 27 Load the area data. 28 29 Args: 30 - area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string. 31 """ 32 self.generator.load_area(area)
Load the area data.
Args:
- area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined
crs
string.
class
TripGenerator:
400class TripGenerator: 401 """ 402 generate trip from OD matrix. 403 """ 404 405 def __init__( 406 self, 407 m: Map, 408 pop_tif_path: Optional[str] = None, 409 activity_distributions: Optional[dict] = None, 410 driving_speed: float = 30 / 3.6, 411 parking_fee: float = 20.0, 412 driving_penalty: float = 0.0, 413 subway_speed: float = 35 / 3.6, 414 subway_penalty: float = 600.0, 415 subway_expense: float = 10.0, 416 bus_speed: float = 15 / 3.6, 417 bus_penalty: float = 600.0, 418 bus_expense: float = 5.0, 419 bike_speed: float = 10 / 3.6, 420 bike_penalty: float = 0.0, 421 template_func: Callable[[], Person] = default_person_template_generator, 422 add_pop: bool = False, 423 multiprocessing_chunk_size: int = 500, 424 workers: int = cpu_count(), 425 ): 426 """ 427 Args: 428 - m (Map): The Map. 429 - pop_tif_path (str): path to population tif file. 430 - activity_distributions (dict): human mobility mode and its probability. e.g. {"HWH": 18.0, "HWH+": 82.0,}. H for go home, W for go to work, O or + for other activities 431 - driving_speed (float): vehicle speed(m/s) for traffic mode assignment. 432 - parking_fee (float): money cost(ï¿¥) of parking a car for traffic mode assignment. 433 - driving_penalty (float): extra cost(s) of vehicle for traffic mode assignment. 434 - subway_speed (float): subway speed(m/s) for traffic mode assignment. 435 - subway_penalty (float): extra cost(s) of subway for traffic mode assignment. 436 - subway_expense (float): money cost(ï¿¥) of subway for traffic mode assignment. 437 - bus_speed (float): bus speed(m/s) for traffic mode assignment. 438 - bus_penalty (float): extra cost(s) of bus for traffic mode assignment. 439 - bus_expense (float): money cost(ï¿¥) of bus for traffic mode assignment. 440 - bike_speed (float): extra cost(s) of bike for traffic mode assignment. 441 - bike_penalty (float): money cost(ï¿¥) of bike for traffic mode assignment. 442 - template_func (Callable[[],Person]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. 443 - add_pop (bool): Add population to aois. 444 - multiprocessing_chunk_size (int): the maximum size of each multiprocessing chunk 445 - workers (int): number of workers. 446 """ 447 global SUBWAY_EXPENSE, BUS_EXPENSE, DRIVING_SPEED, DRIVING_PENALTY, SUBWAY_SPEED, SUBWAY_PENALTY, BUS_SPEED, BUS_PENALTY, BIKE_SPEED, BIKE_PENALTY, PARKING_FEE 448 SUBWAY_EXPENSE, BUS_EXPENSE = subway_expense, bus_expense 449 DRIVING_SPEED, DRIVING_PENALTY, PARKING_FEE = ( 450 driving_speed, 451 driving_penalty, 452 parking_fee, 453 ) 454 SUBWAY_SPEED, SUBWAY_PENALTY = subway_speed, subway_penalty 455 BUS_SPEED, BUS_PENALTY = bus_speed, bus_penalty 456 BIKE_SPEED, BIKE_PENALTY = bike_speed, bike_penalty 457 self.m = m 458 self.pop_tif_path = pop_tif_path 459 self.add_pop = add_pop 460 self.projector = pyproj.Proj(m.header.projection) 461 self.workers = workers 462 self.template = template_func 463 self.persons = [] 464 global MAX_CHUNK_SIZE 465 MAX_CHUNK_SIZE = multiprocessing_chunk_size 466 # activity proportion 467 if activity_distributions is not None: 468 ori_modes_stat = { 469 k: float(v) 470 for k, v in activity_distributions.items() 471 if k in HUMAN_MODE_STATS 472 } 473 else: 474 ori_modes_stat = HUMAN_MODE_STATS 475 self.modes = [mode for mode in ori_modes_stat.keys()] 476 self.p = np.array([prob for prob in ori_modes_stat.values()]) 477 self.p_mode = self.p / sum(self.p) 478 479 def _read_aois(self): 480 aois = [] 481 # read aois 482 for i in self.m.aois: 483 a = {} 484 a["id"] = i.id 485 a["external"] = { 486 "area": i.area, 487 "urban_land_use": i.urban_land_use, 488 } 489 a["geo"] = [self.projector(p.x, p.y, inverse=True) for p in i.positions] 490 a["shapely"] = geometry.Polygon( 491 [ 492 p.x, 493 p.y, 494 ] 495 for p in i.positions 496 ) 497 a["has_driving_gates"] = len(i.driving_gates) > 0 498 aois.append(a) 499 500 def get_aoi_catg(urban_land_use: str): 501 if urban_land_use in {"R"}: 502 return "residential" 503 elif urban_land_use in {"B29"}: 504 return "business" 505 elif urban_land_use in { 506 "B1", 507 "B", 508 }: 509 return "commercial" 510 elif urban_land_use in {"M"}: 511 return "industrial" 512 elif urban_land_use in {"S4", "S"}: 513 return "transportation" 514 elif urban_land_use in {"A", "A1"}: 515 return "administrative" 516 elif urban_land_use in {"A3"}: 517 return "education" 518 elif urban_land_use in {"A5"}: 519 return "medical" 520 elif urban_land_use in {"B32", "A4", "A2"}: 521 return "sport and cultual" 522 elif urban_land_use in {"B31", "G1", "B3", "B13"}: 523 return "park and leisure" 524 return "other" 525 526 # add catg 527 for aoi in aois: 528 aoi["external"]["catg"] = get_aoi_catg(aoi["external"]["urban_land_use"]) 529 # population 530 if self.add_pop and self.pop_tif_path is not None: 531 raise NotImplementedError( 532 "Adding population to AOIs in trip_generator has been removed!" 533 ) 534 # geos = [] 535 # for aoi_data in aois: 536 # geos.append( 537 # Feature( 538 # geometry=Polygon([[list(c) for c in aoi_data["geo"]]]), 539 # properties={ 540 # "id": aoi_data["id"], 541 # }, 542 # ) 543 # ) 544 # geos = FeatureCollection(geos) 545 # geos = geo2pop(geos, self.pop_tif_path) 546 547 # geos = cast(FeatureCollection, geos) 548 # aoi_id2pop = defaultdict(int) 549 # for feature in geos["features"]: 550 # aoi_id = feature["properties"]["id"] 551 # pop = feature["properties"]["population"] 552 # aoi_id2pop[aoi_id] = pop 553 # for aoi in aois: 554 # aoi_id = aoi["id"] 555 # aoi["external"]["population"] = aoi_id2pop.get(aoi_id, 0) 556 else: 557 for aoi in aois: 558 aoi["external"]["population"] = aoi["external"]["area"] 559 self.aois = aois 560 561 def _read_regions(self): 562 self.regions = [] 563 for i, poly in enumerate(self.areas.geometry.to_crs(self.m.header.projection)): 564 self.area_shapes.append(poly) 565 r = { 566 "geometry": geo_coords(poly), # xy coords 567 "ori_id": i, 568 "region_id": i, 569 } 570 self.regions.append(r) 571 572 def _read_od_matrix(self): 573 logging.info("Reading original ods") 574 n_region = len(self.regions) 575 assert ( 576 n_region == self.od_matrix.shape[0] and n_region == self.od_matrix.shape[1] 577 ) 578 # OD-matrix contains time axis = 2 579 if len(self.od_matrix.shape) > 2: 580 od = self.od_matrix 581 self.LEN_OD_TIMES = od.shape[2] 582 else: 583 orig_od = np.expand_dims(self.od_matrix, axis=2) 584 self.LEN_OD_TIMES = 1 585 od = np.broadcast_to( 586 orig_od, (n_region, n_region, self.LEN_OD_TIMES) 587 ).astype(np.int64) 588 sum_od_j = np.sum(od, axis=1) 589 od_prob = od / sum_od_j[:, np.newaxis] 590 od_prob = np.nan_to_num(od_prob) 591 self.od = od 592 self.od_prob = od_prob 593 594 def _match_aoi2region(self): 595 global shapes 596 shapes = self.area_shapes 597 aois = self.aois 598 results = [] 599 _match_aoi_unit_with_arg = partial(_match_aoi_unit, self.projector) 600 for i in range(0, len(aois), MAX_BATCH_SIZE): 601 aois_batch = aois[i : i + MAX_BATCH_SIZE] 602 with Pool(processes=self.workers) as pool: 603 results += pool.map( 604 _match_aoi_unit_with_arg, 605 aois_batch, 606 chunksize=min(ceil(len(aois_batch) / self.workers), MAX_CHUNK_SIZE), 607 ) 608 results = [r for r in results if r is not None] 609 for r in results: 610 aoi_id, reg_id = r[:2] 611 self.aoi2region[aoi_id] = reg_id 612 self.region2aoi[reg_id].append(aoi_id) 613 logging.info(f"AOI matched: {len(self.aoi2region)}") 614 615 def _generate_mobi( 616 self, 617 agent_num: int = 10000, 618 area_pops: Optional[list] = None, 619 person_profiles: Optional[list] = None, 620 seed: int = 0, 621 ): 622 global region2aoi, aoi_map, aoi_type2ids 623 global home_dist, work_od, other_od, educate_od 624 global available_trip_modes 625 available_trip_modes = self.available_trip_modes 626 if "bus" in available_trip_modes and "subway" in available_trip_modes: 627 available_trip_modes.append("bus_subway") 628 region2aoi = self.region2aoi 629 aoi_map = {d["id"]: d for d in self.aois} 630 n_region = len(self.regions) 631 home_dist, work_od, educate_od, other_od = extract_HWEO_from_od_matrix( 632 self.aois, 633 n_region, 634 self.aoi2region, 635 self.aoi_type2ids, 636 self.od_prob, 637 self.LEN_OD_TIMES, 638 ) 639 aoi_type2ids = self.aoi_type2ids 640 agent_args = [] 641 a_home_regions = [] 642 if area_pops is not None: 643 for ii, pop in enumerate(area_pops): 644 pop_num = int(pop) 645 if pop_num > 0: 646 a_home_regions += [ii for _ in range(pop_num)] 647 agent_num = sum(a_home_regions) 648 else: 649 a_home_regions = [None for _ in range(agent_num)] 650 rng = np.random.default_rng(seed) 651 if person_profiles is not None: 652 a_profiles = person_profiles 653 else: 654 a_profiles = gen_profiles(agent_num, self.workers, MAX_CHUNK_SIZE) 655 a_modes = [self.modes for _ in range(agent_num)] 656 a_p_modes = [self.p_mode for _ in range(agent_num)] 657 for i, (a_home_region, a_profile, a_mode, a_p_mode) in enumerate( 658 zip(a_home_regions, a_profiles, a_modes, a_p_modes) 659 ): 660 agent_args.append( 661 ( 662 i, 663 rng.integers(0, 2**16 - 1), 664 a_home_region, 665 a_profile, 666 a_mode, 667 a_p_mode, 668 ) 669 ) 670 partial_args = ( 671 len(self.regions), 672 self.projector, 673 self.departure_prob, 674 self.LEN_OD_TIMES, 675 ) 676 _process_agent_unit_with_arg = partial(_process_agent_unit, partial_args) 677 raw_persons = [] 678 for i in range(0, len(agent_args), MAX_BATCH_SIZE): 679 agent_args_batch = agent_args[i : i + MAX_BATCH_SIZE] 680 with Pool(processes=self.workers) as pool: 681 raw_persons += pool.map( 682 _process_agent_unit_with_arg, 683 agent_args_batch, 684 chunksize=min( 685 ceil(len(agent_args_batch) / self.workers), MAX_CHUNK_SIZE 686 ), 687 ) 688 raw_persons = [r for r in raw_persons] 689 for agent_id, ( 690 aoi_list, 691 person_home, 692 person_work, 693 a_profile, 694 times, 695 trip_modes, 696 trip_models, 697 activities, 698 ) in enumerate(raw_persons): 699 times = np.array(times) * 3600 # hour->second 700 p = Person() 701 p.CopyFrom(self.template()) 702 p.ClearField("schedules") 703 p.id = agent_id 704 p.home.CopyFrom(Position(aoi_position=AoiPosition(aoi_id=person_home))) 705 p.work.CopyFrom(Position(aoi_position=AoiPosition(aoi_id=person_work))) 706 p.profile.CopyFrom(dict2pb(a_profile, PersonProfile())) 707 for time, aoi_id, trip_mode, activity, trip_model in zip( 708 times, aoi_list[1:], trip_modes, activities, trip_models 709 ): 710 schedule = cast(Schedule, p.schedules.add()) 711 schedule.departure_time = time 712 schedule.loop_count = 1 713 trip = Trip( 714 mode=cast( 715 TripMode, 716 trip_mode, 717 ), 718 end=Position(aoi_position=AoiPosition(aoi_id=aoi_id)), 719 activity=activity, 720 model=trip_model, 721 ) 722 schedule.trips.append(trip) 723 self.persons.append(p) 724 725 def generate_persons( 726 self, 727 od_matrix: np.ndarray, 728 areas: GeoDataFrame, 729 available_trip_modes: List[str] = ["drive", "walk", "bus", "subway", "taxi"], 730 departure_time_curve: Optional[list[float]] = None, 731 area_pops: Optional[list] = None, 732 person_profiles: Optional[list[dict]] = None, 733 seed: int = 0, 734 agent_num: Optional[int] = None, 735 ) -> List[Person]: 736 """ 737 Args: 738 - od_matrix (numpy.ndarray): The OD matrix. 739 - areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string. 740 - available_trip_modes (list[str]): available trip modes for person schedules. 741 - departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h. 742 - area_pops (list): list of populations in each area. If is not None, # of the persons departs from each home position is exactly equal to the given pop num. 743 - person_profiles (Optional[List[dict]]): list of profiles in dict format. 744 - seed (int): The random seed. 745 - agent_num (int): number of agents to generate. 746 747 Returns: 748 - List[Person]: The generated person objects. 749 """ 750 # init 751 self.area_shapes = [] 752 self.aoi2region = defaultdict(list) 753 self.region2aoi = defaultdict(list) 754 self.aoi_type2ids = defaultdict(list) 755 self.persons = [] 756 # user input time curve 757 if departure_time_curve is not None: 758 assert len(departure_time_curve) >= 24 759 sum_times = sum(departure_time_curve) 760 self.departure_prob = np.array( 761 [d / sum_times for d in departure_time_curve] 762 ) 763 else: 764 self.departure_prob = None 765 self.od_matrix = od_matrix 766 self.areas = areas 767 self.available_trip_modes = available_trip_modes 768 self._read_aois() 769 self._read_regions() 770 self._read_od_matrix() 771 self._match_aoi2region() 772 if agent_num is None: 773 agent_num = int(np.sum(self.od) / self.LEN_OD_TIMES) 774 if not agent_num >= 1: 775 logging.warning("agent_num should >=1") 776 return [] 777 self._generate_mobi(agent_num, area_pops, person_profiles, seed) 778 return self.persons 779 780 def _get_driving_pos_dict(self) -> Dict[Tuple[int, int], LanePosition]: 781 road_aoi_id2d_pos = {} 782 road_id2d_lane_ids = {} 783 lane_id2parent_road_id = {} 784 m_lanes = {l.id: l for l in self.m.lanes} 785 m_roads = {r.id: r for r in self.m.roads} 786 m_aois = {a.id: a for a in self.m.aois} 787 for road_id, road in m_roads.items(): 788 d_lane_ids = [ 789 lid 790 for lid in road.lane_ids 791 if m_lanes[lid].type == mapv2.LANE_TYPE_DRIVING 792 ] 793 road_id2d_lane_ids[road_id] = d_lane_ids 794 for lane_id, lane in m_lanes.items(): 795 parent_id = lane.parent_id 796 # road lane 797 if parent_id in m_roads: 798 lane_id2parent_road_id[lane_id] = parent_id 799 for aoi_id, aoi in m_aois.items(): 800 for d_pos in aoi.driving_positions: 801 pos_lane_id, _ = d_pos.lane_id, d_pos.s 802 # junction lane 803 assert ( 804 pos_lane_id in lane_id2parent_road_id 805 ), f"Bad lane position {d_pos} at AOI {aoi_id}" 806 parent_road_id = lane_id2parent_road_id[pos_lane_id] 807 road_aoi_key = (parent_road_id, aoi_id) 808 road_aoi_id2d_pos[road_aoi_key] = d_pos 809 return road_aoi_id2d_pos 810 811 def generate_public_transport_drivers( 812 self, 813 template_func: Optional[Callable[[], Person]] = None, 814 stop_duration_time: float = 30.0, 815 seed: int = 0, 816 ) -> List[Person]: 817 """ 818 Args: 819 - template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. If not provided, the `temp_func` provided in `__init__` will be utilized. 820 - stop_duration_time (float): The duration time (in second) for bus at each stop. 821 - seed (int): The random seed. 822 823 Returns: 824 - List[Person]: The generated driver objects. 825 """ 826 self.persons = [] 827 road_aoi_id2d_pos = self._get_driving_pos_dict() 828 person_id = PT_START_ID 829 _template = template_func if template_func is not None else self.template 830 for sl in self.m.sublines: 831 departure_times = list(sl.schedules.departure_times) 832 if not sl.type in {mapv2.SUBLINE_TYPE_BUS, mapv2.SUBLINE_TYPE_SUBWAY}: 833 continue 834 person_id, generated_drivers = gen_bus_drivers( 835 person_id, 836 _template, 837 departure_times, 838 stop_duration_time, 839 road_aoi_id2d_pos, 840 sl, 841 ) 842 self.persons.extend(generated_drivers) 843 return self.persons 844 845 def _generate_schedules(self, input_persons: List[Person], seed: int): 846 global region2aoi, aoi_map, aoi_type2ids 847 global home_dist, work_od, other_od, educate_od 848 global available_trip_modes 849 available_trip_modes = self.available_trip_modes 850 if "bus" in available_trip_modes and "subway" in available_trip_modes: 851 available_trip_modes.append("bus_subway") 852 region2aoi = self.region2aoi 853 aoi_map = {d["id"]: d for d in self.aois} 854 n_region = len(self.regions) 855 home_dist, work_od, educate_od, other_od = extract_HWEO_from_od_matrix( 856 self.aois, 857 n_region, 858 self.aoi2region, 859 self.aoi_type2ids, 860 self.od_prob, 861 self.LEN_OD_TIMES, 862 ) 863 orig_persons = deepcopy(input_persons) 864 person_args = [] 865 bad_person_indexes = set() 866 rng = np.random.default_rng(seed) 867 for idx, p in enumerate(orig_persons): 868 try: 869 p_home = p.home.aoi_position.aoi_id 870 if not p_home >= AOI_START_ID: 871 logging.warning( 872 f"Person {p.id} has no home AOI ID, use random home instead!" 873 ) 874 p_home = rng.choice(self.aoi_type2ids["home"]) 875 p_work = p.work.aoi_position.aoi_id 876 if not p_work >= AOI_START_ID: 877 logging.warning( 878 f"Person {p.id} has no work AOI ID, use random work instead!" 879 ) 880 p_work = rng.choice(self.aoi_type2ids["other"]) 881 p_profile = pb2dict(p.profile) 882 person_args.append( 883 [ 884 p.id, 885 p_home, 886 self.aoi2region[p_home], # possibly key error 887 p_work, 888 self.aoi2region[p_work], # possibly key error 889 p_profile, 890 self.modes, 891 self.p_mode, 892 rng.integers(0, 2**16 - 1), 893 ] 894 ) 895 except Exception as e: 896 bad_person_indexes.add(idx) 897 logging.warning(f"{e} when handling Person {p.id}, Skip!") 898 to_process_persons = [ 899 p for idx, p in enumerate(orig_persons) if idx not in bad_person_indexes 900 ] 901 # return directly 902 no_process_persons = [ 903 p for idx, p in enumerate(orig_persons) if idx in bad_person_indexes 904 ] 905 partial_args = ( 906 len(self.regions), 907 self.projector, 908 self.departure_prob, 909 self.LEN_OD_TIMES, 910 ) 911 filled_schedules = [] 912 _fill_person_schedule_unit_with_arg = partial( 913 _fill_person_schedule_unit, partial_args 914 ) 915 for i in range(0, len(person_args), MAX_BATCH_SIZE): 916 person_args_batch = person_args[i : i + MAX_BATCH_SIZE] 917 with Pool(processes=self.workers) as pool: 918 filled_schedules += pool.map( 919 _fill_person_schedule_unit_with_arg, 920 person_args_batch, 921 chunksize=min( 922 ceil(len(person_args_batch) / self.workers), MAX_CHUNK_SIZE 923 ), 924 ) 925 for ( 926 aoi_list, 927 times, 928 trip_modes, 929 trip_models, 930 activities, 931 ), orig_p in zip(filled_schedules, to_process_persons): 932 times = np.array(times) * 3600 # hour->second 933 p = Person() 934 p.CopyFrom(orig_p) 935 p.ClearField("schedules") 936 for time, aoi_id, trip_mode, activity, trip_model in zip( 937 times, aoi_list[1:], trip_modes, activities, trip_models 938 ): 939 schedule = cast(Schedule, p.schedules.add()) 940 schedule.departure_time = time 941 schedule.loop_count = 1 942 trip = Trip( 943 mode=cast( 944 TripMode, 945 trip_mode, 946 ), 947 end=Position(aoi_position=AoiPosition(aoi_id=aoi_id)), 948 activity=activity, 949 model=trip_model, 950 ) 951 schedule.trips.append(trip) 952 self.persons.append(p) 953 len_filled_persons, len_no_process_persons = len(to_process_persons), len( 954 no_process_persons 955 ) 956 logging.info(f"Filled schedules of {len_filled_persons} persons") 957 if len_no_process_persons > 0: 958 logging.warning( 959 f"Unprocessed persons: {len_no_process_persons}, index range [{len_filled_persons}:] in returned results" 960 ) 961 self.persons.extend(no_process_persons) 962 963 def fill_person_schedules( 964 self, 965 input_persons: List[Person], 966 od_matrix: np.ndarray, 967 areas: GeoDataFrame, 968 available_trip_modes: List[str] = ["drive", "walk", "bus", "subway", "taxi"], 969 departure_time_curve: Optional[list[float]] = None, 970 seed: int = 0, 971 ) -> List[Person]: 972 """ 973 Generate person schedules. 974 975 Args: 976 - input_persons (List[Person]): Input Person objects. 977 - od_matrix (numpy.ndarray): The OD matrix. 978 - areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string. 979 - available_trip_modes (Optional[List[str]]): available trip modes for person schedules. 980 - departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h. 981 - seed (int): The random seed. 982 983 Returns: 984 - List[Person]: The person objects with generated schedules. 985 """ 986 # init 987 self.area_shapes = [] 988 self.aoi2region = defaultdict(list) 989 self.region2aoi = defaultdict(list) 990 self.aoi_type2ids = defaultdict(list) 991 self.persons = [] 992 # user input time curve 993 if departure_time_curve is not None: 994 assert len(departure_time_curve) >= 24 995 sum_times = sum(departure_time_curve) 996 self.departure_prob = np.array( 997 [d / sum_times for d in departure_time_curve] 998 ) 999 else: 1000 self.departure_prob = None 1001 self.od_matrix = od_matrix 1002 self.areas = areas 1003 self.available_trip_modes = available_trip_modes 1004 self._read_aois() 1005 self._read_regions() 1006 self._read_od_matrix() 1007 self._match_aoi2region() 1008 self._generate_schedules(input_persons, seed) 1009 1010 return self.persons 1011 1012 def generate_taxi_drivers( 1013 self, 1014 template_func: Optional[Callable[[], Person]] = None, 1015 parking_positions: Optional[List[Union[LanePosition, AoiPosition]]] = None, 1016 agent_num: Optional[int] = None, 1017 seed: int = 0, 1018 ) -> List[Person]: 1019 """ 1020 Args: 1021 - template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. If not provided, the `temp_func` provided in `__init__` will be utilized. 1022 - parking_positions (Optional[List[Union[LanePosition,AoiPosition]]]): The parking positions of each taxi. 1023 - agent_num (Optional[int]): The taxi driver num. 1024 - seed (int): The random seed. 1025 1026 Returns: 1027 - List[Person]: The generated driver objects. 1028 """ 1029 self.persons = [] 1030 person_id = TAXI_START_ID 1031 _template = template_func if template_func is not None else self.template 1032 self._read_aois() 1033 if parking_positions is not None: 1034 logging.info(f"") 1035 _taxi_home_positions = parking_positions 1036 elif agent_num is not None: 1037 logging.info(f"") 1038 if not agent_num >= 1: 1039 logging.warning("agent_num should >=1") 1040 return [] 1041 rng = np.random.default_rng(seed) 1042 has_driving_aoi_ids = [a["id"] for a in self.aois if a["has_driving_gates"]] 1043 _taxi_home_positions = [ 1044 AoiPosition(aoi_id=_id) 1045 for _id in rng.choice(has_driving_aoi_ids, int(agent_num)) 1046 ] 1047 else: 1048 logging.warning( 1049 "Either `agent_num` or `parking_positions` should be provided!" 1050 ) 1051 return [] 1052 for _pos in _taxi_home_positions: 1053 person_id, generated_drivers = gen_taxi_drivers( 1054 person_id, 1055 _template, 1056 _pos, 1057 ) 1058 self.persons.extend(generated_drivers) 1059 return self.persons
generate trip from OD matrix.
TripGenerator( m: city.map.v2.map_pb2.Map, pop_tif_path: Optional[str] = None, activity_distributions: Optional[dict] = None, driving_speed: float = 8.333333333333334, parking_fee: float = 20.0, driving_penalty: float = 0.0, subway_speed: float = 9.722222222222221, subway_penalty: float = 600.0, subway_expense: float = 10.0, bus_speed: float = 4.166666666666667, bus_penalty: float = 600.0, bus_expense: float = 5.0, bike_speed: float = 2.7777777777777777, bike_penalty: float = 0.0, template_func: Callable[[], city.person.v2.person_pb2.Person] = <function default_person_template_generator>, add_pop: bool = False, multiprocessing_chunk_size: int = 500, workers: int = 4)
405 def __init__( 406 self, 407 m: Map, 408 pop_tif_path: Optional[str] = None, 409 activity_distributions: Optional[dict] = None, 410 driving_speed: float = 30 / 3.6, 411 parking_fee: float = 20.0, 412 driving_penalty: float = 0.0, 413 subway_speed: float = 35 / 3.6, 414 subway_penalty: float = 600.0, 415 subway_expense: float = 10.0, 416 bus_speed: float = 15 / 3.6, 417 bus_penalty: float = 600.0, 418 bus_expense: float = 5.0, 419 bike_speed: float = 10 / 3.6, 420 bike_penalty: float = 0.0, 421 template_func: Callable[[], Person] = default_person_template_generator, 422 add_pop: bool = False, 423 multiprocessing_chunk_size: int = 500, 424 workers: int = cpu_count(), 425 ): 426 """ 427 Args: 428 - m (Map): The Map. 429 - pop_tif_path (str): path to population tif file. 430 - activity_distributions (dict): human mobility mode and its probability. e.g. {"HWH": 18.0, "HWH+": 82.0,}. H for go home, W for go to work, O or + for other activities 431 - driving_speed (float): vehicle speed(m/s) for traffic mode assignment. 432 - parking_fee (float): money cost(ï¿¥) of parking a car for traffic mode assignment. 433 - driving_penalty (float): extra cost(s) of vehicle for traffic mode assignment. 434 - subway_speed (float): subway speed(m/s) for traffic mode assignment. 435 - subway_penalty (float): extra cost(s) of subway for traffic mode assignment. 436 - subway_expense (float): money cost(ï¿¥) of subway for traffic mode assignment. 437 - bus_speed (float): bus speed(m/s) for traffic mode assignment. 438 - bus_penalty (float): extra cost(s) of bus for traffic mode assignment. 439 - bus_expense (float): money cost(ï¿¥) of bus for traffic mode assignment. 440 - bike_speed (float): extra cost(s) of bike for traffic mode assignment. 441 - bike_penalty (float): money cost(ï¿¥) of bike for traffic mode assignment. 442 - template_func (Callable[[],Person]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. 443 - add_pop (bool): Add population to aois. 444 - multiprocessing_chunk_size (int): the maximum size of each multiprocessing chunk 445 - workers (int): number of workers. 446 """ 447 global SUBWAY_EXPENSE, BUS_EXPENSE, DRIVING_SPEED, DRIVING_PENALTY, SUBWAY_SPEED, SUBWAY_PENALTY, BUS_SPEED, BUS_PENALTY, BIKE_SPEED, BIKE_PENALTY, PARKING_FEE 448 SUBWAY_EXPENSE, BUS_EXPENSE = subway_expense, bus_expense 449 DRIVING_SPEED, DRIVING_PENALTY, PARKING_FEE = ( 450 driving_speed, 451 driving_penalty, 452 parking_fee, 453 ) 454 SUBWAY_SPEED, SUBWAY_PENALTY = subway_speed, subway_penalty 455 BUS_SPEED, BUS_PENALTY = bus_speed, bus_penalty 456 BIKE_SPEED, BIKE_PENALTY = bike_speed, bike_penalty 457 self.m = m 458 self.pop_tif_path = pop_tif_path 459 self.add_pop = add_pop 460 self.projector = pyproj.Proj(m.header.projection) 461 self.workers = workers 462 self.template = template_func 463 self.persons = [] 464 global MAX_CHUNK_SIZE 465 MAX_CHUNK_SIZE = multiprocessing_chunk_size 466 # activity proportion 467 if activity_distributions is not None: 468 ori_modes_stat = { 469 k: float(v) 470 for k, v in activity_distributions.items() 471 if k in HUMAN_MODE_STATS 472 } 473 else: 474 ori_modes_stat = HUMAN_MODE_STATS 475 self.modes = [mode for mode in ori_modes_stat.keys()] 476 self.p = np.array([prob for prob in ori_modes_stat.values()]) 477 self.p_mode = self.p / sum(self.p)
Args:
- m (Map): The Map.
- pop_tif_path (str): path to population tif file.
- activity_distributions (dict): human mobility mode and its probability. e.g. {"HWH": 18.0, "HWH+": 82.0,}. H for go home, W for go to work, O or + for other activities
- driving_speed (float): vehicle speed(m/s) for traffic mode assignment.
- parking_fee (float): money cost(ï¿¥) of parking a car for traffic mode assignment.
- driving_penalty (float): extra cost(s) of vehicle for traffic mode assignment.
- subway_speed (float): subway speed(m/s) for traffic mode assignment.
- subway_penalty (float): extra cost(s) of subway for traffic mode assignment.
- subway_expense (float): money cost(ï¿¥) of subway for traffic mode assignment.
- bus_speed (float): bus speed(m/s) for traffic mode assignment.
- bus_penalty (float): extra cost(s) of bus for traffic mode assignment.
- bus_expense (float): money cost(ï¿¥) of bus for traffic mode assignment.
- bike_speed (float): extra cost(s) of bike for traffic mode assignment.
- bike_penalty (float): money cost(ï¿¥) of bike for traffic mode assignment.
- template_func (Callable[[],Person]): The template function of generated person object, whose
schedules
,home
will be replaced and others will be copied. - add_pop (bool): Add population to aois.
- multiprocessing_chunk_size (int): the maximum size of each multiprocessing chunk
- workers (int): number of workers.
def
generate_persons( self, od_matrix: numpy.ndarray, areas: geopandas.geodataframe.GeoDataFrame, available_trip_modes: List[str] = ['drive', 'walk', 'bus', 'subway', 'taxi'], departure_time_curve: Optional[list[float]] = None, area_pops: Optional[list] = None, person_profiles: Optional[list[dict]] = None, seed: int = 0, agent_num: Optional[int] = None) -> List[city.person.v2.person_pb2.Person]:
725 def generate_persons( 726 self, 727 od_matrix: np.ndarray, 728 areas: GeoDataFrame, 729 available_trip_modes: List[str] = ["drive", "walk", "bus", "subway", "taxi"], 730 departure_time_curve: Optional[list[float]] = None, 731 area_pops: Optional[list] = None, 732 person_profiles: Optional[list[dict]] = None, 733 seed: int = 0, 734 agent_num: Optional[int] = None, 735 ) -> List[Person]: 736 """ 737 Args: 738 - od_matrix (numpy.ndarray): The OD matrix. 739 - areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string. 740 - available_trip_modes (list[str]): available trip modes for person schedules. 741 - departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h. 742 - area_pops (list): list of populations in each area. If is not None, # of the persons departs from each home position is exactly equal to the given pop num. 743 - person_profiles (Optional[List[dict]]): list of profiles in dict format. 744 - seed (int): The random seed. 745 - agent_num (int): number of agents to generate. 746 747 Returns: 748 - List[Person]: The generated person objects. 749 """ 750 # init 751 self.area_shapes = [] 752 self.aoi2region = defaultdict(list) 753 self.region2aoi = defaultdict(list) 754 self.aoi_type2ids = defaultdict(list) 755 self.persons = [] 756 # user input time curve 757 if departure_time_curve is not None: 758 assert len(departure_time_curve) >= 24 759 sum_times = sum(departure_time_curve) 760 self.departure_prob = np.array( 761 [d / sum_times for d in departure_time_curve] 762 ) 763 else: 764 self.departure_prob = None 765 self.od_matrix = od_matrix 766 self.areas = areas 767 self.available_trip_modes = available_trip_modes 768 self._read_aois() 769 self._read_regions() 770 self._read_od_matrix() 771 self._match_aoi2region() 772 if agent_num is None: 773 agent_num = int(np.sum(self.od) / self.LEN_OD_TIMES) 774 if not agent_num >= 1: 775 logging.warning("agent_num should >=1") 776 return [] 777 self._generate_mobi(agent_num, area_pops, person_profiles, seed) 778 return self.persons
Args:
- od_matrix (numpy.ndarray): The OD matrix.
- areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined
crs
string. - available_trip_modes (list[str]): available trip modes for person schedules.
- departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h.
- area_pops (list): list of populations in each area. If is not None, # of the persons departs from each home position is exactly equal to the given pop num.
- person_profiles (Optional[List[dict]]): list of profiles in dict format.
- seed (int): The random seed.
- agent_num (int): number of agents to generate.
Returns:
- List[Person]: The generated person objects.
def
generate_public_transport_drivers( self, template_func: Optional[Callable[[], city.person.v2.person_pb2.Person]] = None, stop_duration_time: float = 30.0, seed: int = 0) -> List[city.person.v2.person_pb2.Person]:
811 def generate_public_transport_drivers( 812 self, 813 template_func: Optional[Callable[[], Person]] = None, 814 stop_duration_time: float = 30.0, 815 seed: int = 0, 816 ) -> List[Person]: 817 """ 818 Args: 819 - template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. If not provided, the `temp_func` provided in `__init__` will be utilized. 820 - stop_duration_time (float): The duration time (in second) for bus at each stop. 821 - seed (int): The random seed. 822 823 Returns: 824 - List[Person]: The generated driver objects. 825 """ 826 self.persons = [] 827 road_aoi_id2d_pos = self._get_driving_pos_dict() 828 person_id = PT_START_ID 829 _template = template_func if template_func is not None else self.template 830 for sl in self.m.sublines: 831 departure_times = list(sl.schedules.departure_times) 832 if not sl.type in {mapv2.SUBLINE_TYPE_BUS, mapv2.SUBLINE_TYPE_SUBWAY}: 833 continue 834 person_id, generated_drivers = gen_bus_drivers( 835 person_id, 836 _template, 837 departure_times, 838 stop_duration_time, 839 road_aoi_id2d_pos, 840 sl, 841 ) 842 self.persons.extend(generated_drivers) 843 return self.persons
Args:
- template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose
schedules
,home
will be replaced and others will be copied. If not provided, thetemp_func
provided in__init__
will be utilized. - stop_duration_time (float): The duration time (in second) for bus at each stop.
- seed (int): The random seed.
Returns:
- List[Person]: The generated driver objects.
def
fill_person_schedules( self, input_persons: List[city.person.v2.person_pb2.Person], od_matrix: numpy.ndarray, areas: geopandas.geodataframe.GeoDataFrame, available_trip_modes: List[str] = ['drive', 'walk', 'bus', 'subway', 'taxi'], departure_time_curve: Optional[list[float]] = None, seed: int = 0) -> List[city.person.v2.person_pb2.Person]:
963 def fill_person_schedules( 964 self, 965 input_persons: List[Person], 966 od_matrix: np.ndarray, 967 areas: GeoDataFrame, 968 available_trip_modes: List[str] = ["drive", "walk", "bus", "subway", "taxi"], 969 departure_time_curve: Optional[list[float]] = None, 970 seed: int = 0, 971 ) -> List[Person]: 972 """ 973 Generate person schedules. 974 975 Args: 976 - input_persons (List[Person]): Input Person objects. 977 - od_matrix (numpy.ndarray): The OD matrix. 978 - areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string. 979 - available_trip_modes (Optional[List[str]]): available trip modes for person schedules. 980 - departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h. 981 - seed (int): The random seed. 982 983 Returns: 984 - List[Person]: The person objects with generated schedules. 985 """ 986 # init 987 self.area_shapes = [] 988 self.aoi2region = defaultdict(list) 989 self.region2aoi = defaultdict(list) 990 self.aoi_type2ids = defaultdict(list) 991 self.persons = [] 992 # user input time curve 993 if departure_time_curve is not None: 994 assert len(departure_time_curve) >= 24 995 sum_times = sum(departure_time_curve) 996 self.departure_prob = np.array( 997 [d / sum_times for d in departure_time_curve] 998 ) 999 else: 1000 self.departure_prob = None 1001 self.od_matrix = od_matrix 1002 self.areas = areas 1003 self.available_trip_modes = available_trip_modes 1004 self._read_aois() 1005 self._read_regions() 1006 self._read_od_matrix() 1007 self._match_aoi2region() 1008 self._generate_schedules(input_persons, seed) 1009 1010 return self.persons
Generate person schedules.
Args:
- input_persons (List[Person]): Input Person objects.
- od_matrix (numpy.ndarray): The OD matrix.
- areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined
crs
string. - available_trip_modes (Optional[List[str]]): available trip modes for person schedules.
- departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h.
- seed (int): The random seed.
Returns:
- List[Person]: The person objects with generated schedules.
def
generate_taxi_drivers( self, template_func: Optional[Callable[[], city.person.v2.person_pb2.Person]] = None, parking_positions: Optional[List[Union[city.geo.v2.geo_pb2.LanePosition, city.geo.v2.geo_pb2.AoiPosition]]] = None, agent_num: Optional[int] = None, seed: int = 0) -> List[city.person.v2.person_pb2.Person]:
1012 def generate_taxi_drivers( 1013 self, 1014 template_func: Optional[Callable[[], Person]] = None, 1015 parking_positions: Optional[List[Union[LanePosition, AoiPosition]]] = None, 1016 agent_num: Optional[int] = None, 1017 seed: int = 0, 1018 ) -> List[Person]: 1019 """ 1020 Args: 1021 - template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. If not provided, the `temp_func` provided in `__init__` will be utilized. 1022 - parking_positions (Optional[List[Union[LanePosition,AoiPosition]]]): The parking positions of each taxi. 1023 - agent_num (Optional[int]): The taxi driver num. 1024 - seed (int): The random seed. 1025 1026 Returns: 1027 - List[Person]: The generated driver objects. 1028 """ 1029 self.persons = [] 1030 person_id = TAXI_START_ID 1031 _template = template_func if template_func is not None else self.template 1032 self._read_aois() 1033 if parking_positions is not None: 1034 logging.info(f"") 1035 _taxi_home_positions = parking_positions 1036 elif agent_num is not None: 1037 logging.info(f"") 1038 if not agent_num >= 1: 1039 logging.warning("agent_num should >=1") 1040 return [] 1041 rng = np.random.default_rng(seed) 1042 has_driving_aoi_ids = [a["id"] for a in self.aois if a["has_driving_gates"]] 1043 _taxi_home_positions = [ 1044 AoiPosition(aoi_id=_id) 1045 for _id in rng.choice(has_driving_aoi_ids, int(agent_num)) 1046 ] 1047 else: 1048 logging.warning( 1049 "Either `agent_num` or `parking_positions` should be provided!" 1050 ) 1051 return [] 1052 for _pos in _taxi_home_positions: 1053 person_id, generated_drivers = gen_taxi_drivers( 1054 person_id, 1055 _template, 1056 _pos, 1057 ) 1058 self.persons.extend(generated_drivers) 1059 return self.persons
Args:
- template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose
schedules
,home
will be replaced and others will be copied. If not provided, thetemp_func
provided in__init__
will be utilized. - parking_positions (Optional[List[Union[LanePosition,AoiPosition]]]): The parking positions of each taxi.
- agent_num (Optional[int]): The taxi driver num.
- seed (int): The random seed.
Returns:
- List[Person]: The generated driver objects.