mosstool.trip.generator

 1from .AIGC import AigcGenerator
 2from .generate_from_od import TripGenerator
 3from .gravity import GravityGenerator
 4from .random import PositionMode, RandomGenerator
 5from .template import (CalibratedTemplateGenerator, GaussianTemplateGenerator,
 6                       ProbabilisticTemplateGenerator,
 7                       UniformTemplateGenerator,
 8                       default_bus_template_generator,
 9                       default_person_template_generator)
10
11__all__ = [
12    "ProbabilisticTemplateGenerator",
13    "GaussianTemplateGenerator",
14    "UniformTemplateGenerator",
15    "CalibratedTemplateGenerator",
16    "default_person_template_generator",
17    "default_bus_template_generator",
18    "RandomGenerator",
19    "PositionMode",
20    "GravityGenerator",
21    "AigcGenerator",
22    "TripGenerator",
23]
class ProbabilisticTemplateGenerator:
 93class ProbabilisticTemplateGenerator:
 94    def __init__(
 95        self,
 96        max_speed_values: Optional[List[float]] = None,
 97        max_speed_probabilities: Optional[List[float]] = None,
 98        max_acceleration_values: Optional[List[float]] = None,
 99        max_acceleration_probabilities: Optional[List[float]] = None,
100        max_braking_acceleration_values: Optional[List[float]] = None,
101        max_braking_acceleration_probabilities: Optional[List[float]] = None,
102        usual_braking_acceleration_values: Optional[List[float]] = None,
103        usual_braking_acceleration_probabilities: Optional[List[float]] = None,
104        headway_values: Optional[List[float]] = None,
105        headway_probabilities: Optional[List[float]] = None,
106        min_gap_values: Optional[List[float]] = None,
107        min_gap_probabilities: Optional[List[float]] = None,
108        seed: int = 0,
109        template: Optional[Person] = None,
110    ):
111        """
112        Args:
113        - max_speed_values (Optional[List[float]]): A list of possible maximum speeds.
114        - max_speed_probabilities (Optional[List[float]]): Probabilities corresponding to max_speed_values.
115        - max_acceleration_values (Optional[List[float]]): A list of possible maximum accelerations.
116        - max_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_acceleration_values.
117        - max_braking_acceleration_values (Optional[List[float]]): A list of possible maximum braking accelerations.
118        - max_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_braking_acceleration_values.
119        - usual_braking_acceleration_values (Optional[List[float]]): A list of usual braking accelerations.
120        - usual_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to usual_braking_acceleration_values.
121        - headway_values (Optional[List[float]]): A list of safe time headways.
122        - headway_probabilities (Optional[List[float]]): Probabilities corresponding to headway_values.
123        - min_gap_values (Optional[List[float]]): A list of minimum gaps.
124        - min_gap_probabilities (Optional[List[float]]): Probabilities corresponding to min_gap_values.
125        - seed (int): Seed value for the random number generator.
126        - template (Optional[Person]): The template function of generated person object.
127        """
128        self.template_p = template
129        if self.template_p is None:
130            self.template_p = default_person_template_generator()
131        self.template_p.ClearField("schedules")
132        # max speed
133        self.max_speed_values = max_speed_values
134        self.max_speed_probabilities = max_speed_probabilities
135        if max_speed_probabilities is not None and max_speed_values is not None:
136            _sum = sum(max_speed_probabilities)
137            self.max_speed_probabilities = np.array(
138                [d / _sum for d in max_speed_probabilities]
139            )
140            assert len(max_speed_probabilities) == len(
141                max_speed_values
142            ), f"Inconsistent length between max speed values and probabilities"
143        else:
144            self.max_speed_values = None
145            self.max_speed_probabilities = None
146        # max acceleration
147        self.max_acceleration_values = max_acceleration_values
148        self.max_acceleration_probabilities = max_acceleration_probabilities
149        if (
150            max_acceleration_probabilities is not None
151            and max_acceleration_values is not None
152        ):
153            _sum = sum(max_acceleration_probabilities)
154            self.max_acceleration_probabilities = np.array(
155                [d / _sum for d in max_acceleration_probabilities]
156            )
157            assert len(max_acceleration_probabilities) == len(
158                max_acceleration_values
159            ), f"Inconsistent length between max acceleration values and probabilities"
160        else:
161            self.max_acceleration_values = None
162            self.max_acceleration_probabilities = None
163        # max braking acceleration
164        self.max_braking_acceleration_values = max_braking_acceleration_values
165        self.max_braking_acceleration_probabilities = (
166            max_braking_acceleration_probabilities
167        )
168        if (
169            max_braking_acceleration_probabilities is not None
170            and max_braking_acceleration_values is not None
171        ):
172            _sum = sum(max_braking_acceleration_probabilities)
173            self.max_braking_acceleration_probabilities = np.array(
174                [d / _sum for d in max_braking_acceleration_probabilities]
175            )
176            assert len(max_braking_acceleration_probabilities) == len(
177                max_braking_acceleration_values
178            ), f"Inconsistent length between max braking acceleration values and probabilities"
179        else:
180            self.max_braking_acceleration_values = None
181            self.max_braking_acceleration_probabilities = None
182        # usual braking acceleration
183        self.usual_braking_acceleration_values = usual_braking_acceleration_values
184        self.usual_braking_acceleration_probabilities = (
185            usual_braking_acceleration_probabilities
186        )
187        if (
188            usual_braking_acceleration_probabilities is not None
189            and usual_braking_acceleration_values is not None
190        ):
191            _sum = sum(usual_braking_acceleration_probabilities)
192            self.usual_braking_acceleration_probabilities = np.array(
193                [d / _sum for d in usual_braking_acceleration_probabilities]
194            )
195            assert len(usual_braking_acceleration_probabilities) == len(
196                usual_braking_acceleration_values
197            ), f"Inconsistent length between usual braking acceleration values and probabilities"
198        else:
199            self.usual_braking_acceleration_values = None
200            self.usual_braking_acceleration_probabilities = None
201        # safe time headway
202        self.headway_values = headway_values
203        self.headway_probabilities = headway_probabilities
204        if headway_probabilities is not None and headway_values is not None:
205            _sum = sum(headway_probabilities)
206            self.headway_probabilities = np.array(
207                [d / _sum for d in headway_probabilities]
208            )
209            assert len(headway_probabilities) == len(
210                headway_values
211            ), f"Inconsistent length between headway values and probabilities"
212        else:
213            self.headway_values = None
214            self.headway_probabilities = None
215        # min gap
216        self.min_gap_values = min_gap_values
217        self.min_gap_probabilities = min_gap_probabilities
218        if min_gap_probabilities is not None and min_gap_values is not None:
219            _sum = sum(min_gap_probabilities)
220            self.min_gap_probabilities = np.array(
221                [d / _sum for d in min_gap_probabilities]
222            )
223            assert len(min_gap_probabilities) == len(
224                min_gap_values
225            ), f"Inconsistent length between minGap values and probabilities"
226        else:
227            self.min_gap_values = None
228            self.min_gap_probabilities = None
229        # radom engine
230        self.rng = np.random.default_rng(seed)
231
232    def template_generator(
233        self,
234    ) -> Person:
235        rng = self.rng
236        p = Person()
237        assert self.template_p is not None
238        p.CopyFrom(self.template_p)
239        # max speed
240        if (
241            self.max_speed_probabilities is not None
242            and self.max_speed_values is not None
243        ):
244            p.vehicle_attribute.max_speed = rng.choice(
245                self.max_speed_values, p=self.max_speed_probabilities
246            )
247        # max acceleration
248        if (
249            self.max_acceleration_probabilities is not None
250            and self.max_acceleration_values is not None
251        ):
252            p.vehicle_attribute.max_acceleration = rng.choice(
253                self.max_acceleration_values, p=self.max_acceleration_probabilities
254            )
255        # max braking acceleration
256        if (
257            self.max_braking_acceleration_probabilities is not None
258            and self.max_braking_acceleration_values is not None
259        ):
260            p.vehicle_attribute.max_braking_acceleration = rng.choice(
261                self.max_braking_acceleration_values,
262                p=self.max_braking_acceleration_probabilities,
263            )
264        # usual braking acceleration
265        if (
266            self.usual_braking_acceleration_probabilities is not None
267            and self.usual_braking_acceleration_values is not None
268        ):
269            p.vehicle_attribute.usual_braking_acceleration = rng.choice(
270                self.usual_braking_acceleration_values,
271                p=self.usual_braking_acceleration_probabilities,
272            )
273        # safe time headway
274        if self.headway_probabilities is not None and self.headway_values is not None:
275            p.vehicle_attribute.headway = rng.choice(
276                self.headway_values, p=self.headway_probabilities
277            )
278        # min gap
279        if self.min_gap_probabilities is not None and self.min_gap_values is not None:
280            p.vehicle_attribute.min_gap = rng.choice(
281                self.min_gap_values, p=self.min_gap_probabilities
282            )
283        return p
ProbabilisticTemplateGenerator( max_speed_values: Optional[List[float]] = None, max_speed_probabilities: Optional[List[float]] = None, max_acceleration_values: Optional[List[float]] = None, max_acceleration_probabilities: Optional[List[float]] = None, max_braking_acceleration_values: Optional[List[float]] = None, max_braking_acceleration_probabilities: Optional[List[float]] = None, usual_braking_acceleration_values: Optional[List[float]] = None, usual_braking_acceleration_probabilities: Optional[List[float]] = None, headway_values: Optional[List[float]] = None, headway_probabilities: Optional[List[float]] = None, min_gap_values: Optional[List[float]] = None, min_gap_probabilities: Optional[List[float]] = None, seed: int = 0, template: Optional[city.person.v2.person_pb2.Person] = None)
 94    def __init__(
 95        self,
 96        max_speed_values: Optional[List[float]] = None,
 97        max_speed_probabilities: Optional[List[float]] = None,
 98        max_acceleration_values: Optional[List[float]] = None,
 99        max_acceleration_probabilities: Optional[List[float]] = None,
100        max_braking_acceleration_values: Optional[List[float]] = None,
101        max_braking_acceleration_probabilities: Optional[List[float]] = None,
102        usual_braking_acceleration_values: Optional[List[float]] = None,
103        usual_braking_acceleration_probabilities: Optional[List[float]] = None,
104        headway_values: Optional[List[float]] = None,
105        headway_probabilities: Optional[List[float]] = None,
106        min_gap_values: Optional[List[float]] = None,
107        min_gap_probabilities: Optional[List[float]] = None,
108        seed: int = 0,
109        template: Optional[Person] = None,
110    ):
111        """
112        Args:
113        - max_speed_values (Optional[List[float]]): A list of possible maximum speeds.
114        - max_speed_probabilities (Optional[List[float]]): Probabilities corresponding to max_speed_values.
115        - max_acceleration_values (Optional[List[float]]): A list of possible maximum accelerations.
116        - max_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_acceleration_values.
117        - max_braking_acceleration_values (Optional[List[float]]): A list of possible maximum braking accelerations.
118        - max_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_braking_acceleration_values.
119        - usual_braking_acceleration_values (Optional[List[float]]): A list of usual braking accelerations.
120        - usual_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to usual_braking_acceleration_values.
121        - headway_values (Optional[List[float]]): A list of safe time headways.
122        - headway_probabilities (Optional[List[float]]): Probabilities corresponding to headway_values.
123        - min_gap_values (Optional[List[float]]): A list of minimum gaps.
124        - min_gap_probabilities (Optional[List[float]]): Probabilities corresponding to min_gap_values.
125        - seed (int): Seed value for the random number generator.
126        - template (Optional[Person]): The template function of generated person object.
127        """
128        self.template_p = template
129        if self.template_p is None:
130            self.template_p = default_person_template_generator()
131        self.template_p.ClearField("schedules")
132        # max speed
133        self.max_speed_values = max_speed_values
134        self.max_speed_probabilities = max_speed_probabilities
135        if max_speed_probabilities is not None and max_speed_values is not None:
136            _sum = sum(max_speed_probabilities)
137            self.max_speed_probabilities = np.array(
138                [d / _sum for d in max_speed_probabilities]
139            )
140            assert len(max_speed_probabilities) == len(
141                max_speed_values
142            ), f"Inconsistent length between max speed values and probabilities"
143        else:
144            self.max_speed_values = None
145            self.max_speed_probabilities = None
146        # max acceleration
147        self.max_acceleration_values = max_acceleration_values
148        self.max_acceleration_probabilities = max_acceleration_probabilities
149        if (
150            max_acceleration_probabilities is not None
151            and max_acceleration_values is not None
152        ):
153            _sum = sum(max_acceleration_probabilities)
154            self.max_acceleration_probabilities = np.array(
155                [d / _sum for d in max_acceleration_probabilities]
156            )
157            assert len(max_acceleration_probabilities) == len(
158                max_acceleration_values
159            ), f"Inconsistent length between max acceleration values and probabilities"
160        else:
161            self.max_acceleration_values = None
162            self.max_acceleration_probabilities = None
163        # max braking acceleration
164        self.max_braking_acceleration_values = max_braking_acceleration_values
165        self.max_braking_acceleration_probabilities = (
166            max_braking_acceleration_probabilities
167        )
168        if (
169            max_braking_acceleration_probabilities is not None
170            and max_braking_acceleration_values is not None
171        ):
172            _sum = sum(max_braking_acceleration_probabilities)
173            self.max_braking_acceleration_probabilities = np.array(
174                [d / _sum for d in max_braking_acceleration_probabilities]
175            )
176            assert len(max_braking_acceleration_probabilities) == len(
177                max_braking_acceleration_values
178            ), f"Inconsistent length between max braking acceleration values and probabilities"
179        else:
180            self.max_braking_acceleration_values = None
181            self.max_braking_acceleration_probabilities = None
182        # usual braking acceleration
183        self.usual_braking_acceleration_values = usual_braking_acceleration_values
184        self.usual_braking_acceleration_probabilities = (
185            usual_braking_acceleration_probabilities
186        )
187        if (
188            usual_braking_acceleration_probabilities is not None
189            and usual_braking_acceleration_values is not None
190        ):
191            _sum = sum(usual_braking_acceleration_probabilities)
192            self.usual_braking_acceleration_probabilities = np.array(
193                [d / _sum for d in usual_braking_acceleration_probabilities]
194            )
195            assert len(usual_braking_acceleration_probabilities) == len(
196                usual_braking_acceleration_values
197            ), f"Inconsistent length between usual braking acceleration values and probabilities"
198        else:
199            self.usual_braking_acceleration_values = None
200            self.usual_braking_acceleration_probabilities = None
201        # safe time headway
202        self.headway_values = headway_values
203        self.headway_probabilities = headway_probabilities
204        if headway_probabilities is not None and headway_values is not None:
205            _sum = sum(headway_probabilities)
206            self.headway_probabilities = np.array(
207                [d / _sum for d in headway_probabilities]
208            )
209            assert len(headway_probabilities) == len(
210                headway_values
211            ), f"Inconsistent length between headway values and probabilities"
212        else:
213            self.headway_values = None
214            self.headway_probabilities = None
215        # min gap
216        self.min_gap_values = min_gap_values
217        self.min_gap_probabilities = min_gap_probabilities
218        if min_gap_probabilities is not None and min_gap_values is not None:
219            _sum = sum(min_gap_probabilities)
220            self.min_gap_probabilities = np.array(
221                [d / _sum for d in min_gap_probabilities]
222            )
223            assert len(min_gap_probabilities) == len(
224                min_gap_values
225            ), f"Inconsistent length between minGap values and probabilities"
226        else:
227            self.min_gap_values = None
228            self.min_gap_probabilities = None
229        # radom engine
230        self.rng = np.random.default_rng(seed)

Args:

  • max_speed_values (Optional[List[float]]): A list of possible maximum speeds.
  • max_speed_probabilities (Optional[List[float]]): Probabilities corresponding to max_speed_values.
  • max_acceleration_values (Optional[List[float]]): A list of possible maximum accelerations.
  • max_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_acceleration_values.
  • max_braking_acceleration_values (Optional[List[float]]): A list of possible maximum braking accelerations.
  • max_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to max_braking_acceleration_values.
  • usual_braking_acceleration_values (Optional[List[float]]): A list of usual braking accelerations.
  • usual_braking_acceleration_probabilities (Optional[List[float]]): Probabilities corresponding to usual_braking_acceleration_values.
  • headway_values (Optional[List[float]]): A list of safe time headways.
  • headway_probabilities (Optional[List[float]]): Probabilities corresponding to headway_values.
  • min_gap_values (Optional[List[float]]): A list of minimum gaps.
  • min_gap_probabilities (Optional[List[float]]): Probabilities corresponding to min_gap_values.
  • seed (int): Seed value for the random number generator.
  • template (Optional[Person]): The template function of generated person object.
template_p
max_speed_values
max_speed_probabilities
max_acceleration_values
max_acceleration_probabilities
max_braking_acceleration_values
max_braking_acceleration_probabilities
usual_braking_acceleration_values
usual_braking_acceleration_probabilities
headway_values
headway_probabilities
min_gap_values
min_gap_probabilities
rng
def template_generator(self) -> city.person.v2.person_pb2.Person:
232    def template_generator(
233        self,
234    ) -> Person:
235        rng = self.rng
236        p = Person()
237        assert self.template_p is not None
238        p.CopyFrom(self.template_p)
239        # max speed
240        if (
241            self.max_speed_probabilities is not None
242            and self.max_speed_values is not None
243        ):
244            p.vehicle_attribute.max_speed = rng.choice(
245                self.max_speed_values, p=self.max_speed_probabilities
246            )
247        # max acceleration
248        if (
249            self.max_acceleration_probabilities is not None
250            and self.max_acceleration_values is not None
251        ):
252            p.vehicle_attribute.max_acceleration = rng.choice(
253                self.max_acceleration_values, p=self.max_acceleration_probabilities
254            )
255        # max braking acceleration
256        if (
257            self.max_braking_acceleration_probabilities is not None
258            and self.max_braking_acceleration_values is not None
259        ):
260            p.vehicle_attribute.max_braking_acceleration = rng.choice(
261                self.max_braking_acceleration_values,
262                p=self.max_braking_acceleration_probabilities,
263            )
264        # usual braking acceleration
265        if (
266            self.usual_braking_acceleration_probabilities is not None
267            and self.usual_braking_acceleration_values is not None
268        ):
269            p.vehicle_attribute.usual_braking_acceleration = rng.choice(
270                self.usual_braking_acceleration_values,
271                p=self.usual_braking_acceleration_probabilities,
272            )
273        # safe time headway
274        if self.headway_probabilities is not None and self.headway_values is not None:
275            p.vehicle_attribute.headway = rng.choice(
276                self.headway_values, p=self.headway_probabilities
277            )
278        # min gap
279        if self.min_gap_probabilities is not None and self.min_gap_values is not None:
280            p.vehicle_attribute.min_gap = rng.choice(
281                self.min_gap_values, p=self.min_gap_probabilities
282            )
283        return p
class GaussianTemplateGenerator:
286class GaussianTemplateGenerator:
287    def __init__(
288        self,
289        max_speed_mean: Optional[float] = None,
290        max_speed_std: Optional[float] = None,
291        max_acceleration_mean: Optional[float] = None,
292        max_acceleration_std: Optional[float] = None,
293        max_braking_acceleration_mean: Optional[float] = None,
294        max_braking_acceleration_std: Optional[float] = None,
295        usual_braking_acceleration_mean: Optional[float] = None,
296        usual_braking_acceleration_std: Optional[float] = None,
297        headway_mean: Optional[float] = None,
298        headway_std: Optional[float] = None,
299        min_gap_mean: Optional[float] = None,
300        min_gap_std: Optional[float] = None,
301        seed: int = 0,
302        template: Optional[Person] = None,
303    ) -> None:
304        """
305        Args
306        - max_speed_mean (Optional[float]): Mean of the Gaussian distribution for maximum speeds.
307        - max_speed_std (Optional[float]): Standard deviation corresponding to max_speed_mean.
308        - max_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum accelerations.
309        - max_acceleration_std (Optional[float]): Standard deviation corresponding to max_acceleration_mean.
310        - max_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum braking accelerations.
311        - max_braking_acceleration_std (Optional[float]): Standard deviation corresponding to max_braking_acceleration_mean.
312        - usual_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for usual braking accelerations.
313        - usual_braking_acceleration_std (Optional[float]): Standard deviation corresponding to usual_braking_acceleration_mean.
314        - headway_mean (Optional[float]): Mean of the Gaussian distribution for safe time headways.
315        - headway_std (Optional[float]): Standard deviation corresponding to headway_mean.
316        - min_gap_mean (Optional[float]): Mean of the Gaussian distribution for minimum gaps.
317        - min_gap_std (Optional[float]): Standard deviation corresponding to min_gap_mean.
318        - seed (int): Seed value for the random number generator.
319        - template (Optional[Person]): The template function of generated person object.
320        """
321        self.template_p = template
322        if self.template_p is None:
323            self.template_p = default_person_template_generator()
324        self.template_p.ClearField("schedules")
325        # max speed
326        self.max_speed_mean = max_speed_mean
327        self.max_speed_std = max_speed_std
328        # max acceleration
329        self.max_acceleration_mean = max_acceleration_mean
330        self.max_acceleration_std = max_acceleration_std
331        # max braking acceleration
332        self.max_braking_acceleration_mean = max_braking_acceleration_mean
333        self.max_braking_acceleration_std = max_braking_acceleration_std
334        # usual braking acceleration
335        self.usual_braking_acceleration_mean = usual_braking_acceleration_mean
336        self.usual_braking_acceleration_std = usual_braking_acceleration_std
337        # safe time headway
338        self.headway_mean = headway_mean
339        self.headway_std = headway_std
340        # min gap
341        self.min_gap_mean = min_gap_mean
342        self.min_gap_std = min_gap_std
343        # radom engine
344        self.rng = np.random.default_rng(seed)
345
346    def template_generator(
347        self,
348    ) -> Person:
349        rng = self.rng
350        p = Person()
351        assert self.template_p is not None
352        p.CopyFrom(self.template_p)
353        # max speed
354        if self.max_speed_mean is not None and self.max_speed_std is not None:
355            p.vehicle_attribute.max_speed = np.abs(
356                np.sqrt(self.max_speed_std) * rng.normal() + self.max_speed_mean
357            )
358        # max acceleration
359        if (
360            self.max_acceleration_mean is not None
361            and self.max_acceleration_std is not None
362        ):
363            p.vehicle_attribute.max_acceleration = np.abs(
364                np.sqrt(self.max_acceleration_std) * rng.normal()
365                + self.max_acceleration_mean
366            )
367        # max braking acceleration
368        if (
369            self.max_braking_acceleration_mean is not None
370            and self.max_braking_acceleration_std is not None
371        ):
372            p.vehicle_attribute.max_braking_acceleration = -np.abs(
373                np.sqrt(self.max_braking_acceleration_std) * rng.normal()
374                + self.max_braking_acceleration_std
375            )
376        # usual braking acceleration
377        if (
378            self.usual_braking_acceleration_mean is not None
379            and self.usual_braking_acceleration_std is not None
380        ):
381            p.vehicle_attribute.usual_braking_acceleration = -np.abs(
382                np.sqrt(self.usual_braking_acceleration_std) * rng.normal()
383                + self.usual_braking_acceleration_mean
384            )
385        # safe time headway
386        if self.headway_mean is not None and self.headway_std is not None:
387            p.vehicle_attribute.headway = np.abs(
388                np.sqrt(self.headway_std) * rng.normal() + self.headway_mean
389            )
390        # min gap
391        if self.min_gap_mean is not None and self.min_gap_std is not None:
392            p.vehicle_attribute.min_gap = np.abs(
393                np.sqrt(self.min_gap_std) * rng.normal() + self.min_gap_mean
394            )
395        return p
GaussianTemplateGenerator( max_speed_mean: Optional[float] = None, max_speed_std: Optional[float] = None, max_acceleration_mean: Optional[float] = None, max_acceleration_std: Optional[float] = None, max_braking_acceleration_mean: Optional[float] = None, max_braking_acceleration_std: Optional[float] = None, usual_braking_acceleration_mean: Optional[float] = None, usual_braking_acceleration_std: Optional[float] = None, headway_mean: Optional[float] = None, headway_std: Optional[float] = None, min_gap_mean: Optional[float] = None, min_gap_std: Optional[float] = None, seed: int = 0, template: Optional[city.person.v2.person_pb2.Person] = None)
287    def __init__(
288        self,
289        max_speed_mean: Optional[float] = None,
290        max_speed_std: Optional[float] = None,
291        max_acceleration_mean: Optional[float] = None,
292        max_acceleration_std: Optional[float] = None,
293        max_braking_acceleration_mean: Optional[float] = None,
294        max_braking_acceleration_std: Optional[float] = None,
295        usual_braking_acceleration_mean: Optional[float] = None,
296        usual_braking_acceleration_std: Optional[float] = None,
297        headway_mean: Optional[float] = None,
298        headway_std: Optional[float] = None,
299        min_gap_mean: Optional[float] = None,
300        min_gap_std: Optional[float] = None,
301        seed: int = 0,
302        template: Optional[Person] = None,
303    ) -> None:
304        """
305        Args
306        - max_speed_mean (Optional[float]): Mean of the Gaussian distribution for maximum speeds.
307        - max_speed_std (Optional[float]): Standard deviation corresponding to max_speed_mean.
308        - max_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum accelerations.
309        - max_acceleration_std (Optional[float]): Standard deviation corresponding to max_acceleration_mean.
310        - max_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum braking accelerations.
311        - max_braking_acceleration_std (Optional[float]): Standard deviation corresponding to max_braking_acceleration_mean.
312        - usual_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for usual braking accelerations.
313        - usual_braking_acceleration_std (Optional[float]): Standard deviation corresponding to usual_braking_acceleration_mean.
314        - headway_mean (Optional[float]): Mean of the Gaussian distribution for safe time headways.
315        - headway_std (Optional[float]): Standard deviation corresponding to headway_mean.
316        - min_gap_mean (Optional[float]): Mean of the Gaussian distribution for minimum gaps.
317        - min_gap_std (Optional[float]): Standard deviation corresponding to min_gap_mean.
318        - seed (int): Seed value for the random number generator.
319        - template (Optional[Person]): The template function of generated person object.
320        """
321        self.template_p = template
322        if self.template_p is None:
323            self.template_p = default_person_template_generator()
324        self.template_p.ClearField("schedules")
325        # max speed
326        self.max_speed_mean = max_speed_mean
327        self.max_speed_std = max_speed_std
328        # max acceleration
329        self.max_acceleration_mean = max_acceleration_mean
330        self.max_acceleration_std = max_acceleration_std
331        # max braking acceleration
332        self.max_braking_acceleration_mean = max_braking_acceleration_mean
333        self.max_braking_acceleration_std = max_braking_acceleration_std
334        # usual braking acceleration
335        self.usual_braking_acceleration_mean = usual_braking_acceleration_mean
336        self.usual_braking_acceleration_std = usual_braking_acceleration_std
337        # safe time headway
338        self.headway_mean = headway_mean
339        self.headway_std = headway_std
340        # min gap
341        self.min_gap_mean = min_gap_mean
342        self.min_gap_std = min_gap_std
343        # radom engine
344        self.rng = np.random.default_rng(seed)

Args

  • max_speed_mean (Optional[float]): Mean of the Gaussian distribution for maximum speeds.
  • max_speed_std (Optional[float]): Standard deviation corresponding to max_speed_mean.
  • max_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum accelerations.
  • max_acceleration_std (Optional[float]): Standard deviation corresponding to max_acceleration_mean.
  • max_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for maximum braking accelerations.
  • max_braking_acceleration_std (Optional[float]): Standard deviation corresponding to max_braking_acceleration_mean.
  • usual_braking_acceleration_mean (Optional[float]): Mean of the Gaussian distribution for usual braking accelerations.
  • usual_braking_acceleration_std (Optional[float]): Standard deviation corresponding to usual_braking_acceleration_mean.
  • headway_mean (Optional[float]): Mean of the Gaussian distribution for safe time headways.
  • headway_std (Optional[float]): Standard deviation corresponding to headway_mean.
  • min_gap_mean (Optional[float]): Mean of the Gaussian distribution for minimum gaps.
  • min_gap_std (Optional[float]): Standard deviation corresponding to min_gap_mean.
  • seed (int): Seed value for the random number generator.
  • template (Optional[Person]): The template function of generated person object.
template_p
max_speed_mean
max_speed_std
max_acceleration_mean
max_acceleration_std
max_braking_acceleration_mean
max_braking_acceleration_std
usual_braking_acceleration_mean
usual_braking_acceleration_std
headway_mean
headway_std
min_gap_mean
min_gap_std
rng
def template_generator(self) -> city.person.v2.person_pb2.Person:
346    def template_generator(
347        self,
348    ) -> Person:
349        rng = self.rng
350        p = Person()
351        assert self.template_p is not None
352        p.CopyFrom(self.template_p)
353        # max speed
354        if self.max_speed_mean is not None and self.max_speed_std is not None:
355            p.vehicle_attribute.max_speed = np.abs(
356                np.sqrt(self.max_speed_std) * rng.normal() + self.max_speed_mean
357            )
358        # max acceleration
359        if (
360            self.max_acceleration_mean is not None
361            and self.max_acceleration_std is not None
362        ):
363            p.vehicle_attribute.max_acceleration = np.abs(
364                np.sqrt(self.max_acceleration_std) * rng.normal()
365                + self.max_acceleration_mean
366            )
367        # max braking acceleration
368        if (
369            self.max_braking_acceleration_mean is not None
370            and self.max_braking_acceleration_std is not None
371        ):
372            p.vehicle_attribute.max_braking_acceleration = -np.abs(
373                np.sqrt(self.max_braking_acceleration_std) * rng.normal()
374                + self.max_braking_acceleration_std
375            )
376        # usual braking acceleration
377        if (
378            self.usual_braking_acceleration_mean is not None
379            and self.usual_braking_acceleration_std is not None
380        ):
381            p.vehicle_attribute.usual_braking_acceleration = -np.abs(
382                np.sqrt(self.usual_braking_acceleration_std) * rng.normal()
383                + self.usual_braking_acceleration_mean
384            )
385        # safe time headway
386        if self.headway_mean is not None and self.headway_std is not None:
387            p.vehicle_attribute.headway = np.abs(
388                np.sqrt(self.headway_std) * rng.normal() + self.headway_mean
389            )
390        # min gap
391        if self.min_gap_mean is not None and self.min_gap_std is not None:
392            p.vehicle_attribute.min_gap = np.abs(
393                np.sqrt(self.min_gap_std) * rng.normal() + self.min_gap_mean
394            )
395        return p
class UniformTemplateGenerator:
398class UniformTemplateGenerator:
399    def __init__(
400        self,
401        max_speed_min: Optional[float] = None,
402        max_speed_max: Optional[float] = None,
403        max_acceleration_min: Optional[float] = None,
404        max_acceleration_max: Optional[float] = None,
405        max_braking_acceleration_min: Optional[float] = None,
406        max_braking_acceleration_max: Optional[float] = None,
407        usual_braking_acceleration_min: Optional[float] = None,
408        usual_braking_acceleration_max: Optional[float] = None,
409        headway_min: Optional[float] = None,
410        headway_max: Optional[float] = None,
411        min_gap_min: Optional[float] = None,
412        min_gap_max: Optional[float] = None,
413        seed: int = 0,
414        template: Optional[Person] = None,
415    ) -> None:
416        """
417        Args
418        - max_speed_mean (Optional[float]): Lower bound of the Uniform distribution for maximum speeds.
419        - max_speed_std (Optional[float]): Higher bound of the Uniform distribution for maximum speeds.
420        - max_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum accelerations.
421        - max_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum accelerations.
422        - max_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum braking accelerations.
423        - max_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum braking accelerations.
424        - usual_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for usual braking accelerations.
425        - usual_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for usual braking accelerations.
426        - headway_mean (Optional[float]): Lower bound of the Uniform distribution for safe time headways.
427        - headway_std (Optional[float]): Higher bound of the Uniform distribution for safe time headways.
428        - min_gap_mean (Optional[float]): Lower bound of the Uniform distribution for minimum gaps.
429        - min_gap_std (Optional[float]): Higher bound of the Uniform distribution for minimum gaps.
430        - seed (int): Seed value for the random number generator.
431        - template (Optional[Person]): The template function of generated person object.
432        """
433        self.template_p = template
434        if self.template_p is None:
435            self.template_p = default_person_template_generator()
436        self.template_p.ClearField("schedules")
437        # max speed
438        self.max_speed_min = max_speed_min
439        self.max_speed_max = max_speed_max
440        # max acceleration
441        self.max_acceleration_min = max_acceleration_min
442        self.max_acceleration_max = max_acceleration_max
443        # max braking acceleration
444        self.max_braking_acceleration_min = max_braking_acceleration_min
445        self.max_braking_acceleration_max = max_braking_acceleration_max
446        # usual braking acceleration
447        self.usual_braking_acceleration_min = usual_braking_acceleration_min
448        self.usual_braking_acceleration_max = usual_braking_acceleration_max
449        # safe time headway
450        self.headway_min = headway_min
451        self.headway_max = headway_max
452        # min gap
453        self.min_gap_min = min_gap_min
454        self.min_gap_max = min_gap_max
455        # radom engine
456        self.rng = np.random.default_rng(seed)
457
458    def template_generator(
459        self,
460    ) -> Person:
461        rng = self.rng
462        p = Person()
463        assert self.template_p is not None
464        p.CopyFrom(self.template_p)
465        # max speed
466        if self.max_speed_min is not None and self.max_speed_max is not None:
467            p.vehicle_attribute.max_speed = np.abs(
468                rng.uniform(self.max_speed_min, self.max_speed_max)
469            )
470        # max acceleration
471        if (
472            self.max_acceleration_min is not None
473            and self.max_acceleration_max is not None
474        ):
475            p.vehicle_attribute.max_acceleration = np.abs(
476                rng.uniform(self.max_acceleration_min, self.max_acceleration_max)
477            )
478        # max braking acceleration
479        if (
480            self.max_braking_acceleration_min is not None
481            and self.max_braking_acceleration_max is not None
482        ):
483            p.vehicle_attribute.max_braking_acceleration = -np.abs(
484                rng.uniform(
485                    self.max_braking_acceleration_min, self.max_braking_acceleration_max
486                )
487            )
488        # usual braking acceleration
489        if (
490            self.usual_braking_acceleration_min is not None
491            and self.usual_braking_acceleration_max is not None
492        ):
493            p.vehicle_attribute.usual_braking_acceleration = -np.abs(
494                rng.uniform(
495                    self.usual_braking_acceleration_min,
496                    self.usual_braking_acceleration_max,
497                )
498            )
499        # safe time headway
500        if self.headway_min is not None and self.headway_max is not None:
501            p.vehicle_attribute.headway = np.abs(
502                rng.uniform(self.headway_min, self.headway_max)
503            )
504        # min gap
505        if self.min_gap_min is not None and self.min_gap_max is not None:
506            p.vehicle_attribute.min_gap = np.abs(
507                rng.uniform(self.min_gap_min, self.min_gap_max)
508            )
509        return p
UniformTemplateGenerator( max_speed_min: Optional[float] = None, max_speed_max: Optional[float] = None, max_acceleration_min: Optional[float] = None, max_acceleration_max: Optional[float] = None, max_braking_acceleration_min: Optional[float] = None, max_braking_acceleration_max: Optional[float] = None, usual_braking_acceleration_min: Optional[float] = None, usual_braking_acceleration_max: Optional[float] = None, headway_min: Optional[float] = None, headway_max: Optional[float] = None, min_gap_min: Optional[float] = None, min_gap_max: Optional[float] = None, seed: int = 0, template: Optional[city.person.v2.person_pb2.Person] = None)
399    def __init__(
400        self,
401        max_speed_min: Optional[float] = None,
402        max_speed_max: Optional[float] = None,
403        max_acceleration_min: Optional[float] = None,
404        max_acceleration_max: Optional[float] = None,
405        max_braking_acceleration_min: Optional[float] = None,
406        max_braking_acceleration_max: Optional[float] = None,
407        usual_braking_acceleration_min: Optional[float] = None,
408        usual_braking_acceleration_max: Optional[float] = None,
409        headway_min: Optional[float] = None,
410        headway_max: Optional[float] = None,
411        min_gap_min: Optional[float] = None,
412        min_gap_max: Optional[float] = None,
413        seed: int = 0,
414        template: Optional[Person] = None,
415    ) -> None:
416        """
417        Args
418        - max_speed_mean (Optional[float]): Lower bound of the Uniform distribution for maximum speeds.
419        - max_speed_std (Optional[float]): Higher bound of the Uniform distribution for maximum speeds.
420        - max_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum accelerations.
421        - max_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum accelerations.
422        - max_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum braking accelerations.
423        - max_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum braking accelerations.
424        - usual_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for usual braking accelerations.
425        - usual_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for usual braking accelerations.
426        - headway_mean (Optional[float]): Lower bound of the Uniform distribution for safe time headways.
427        - headway_std (Optional[float]): Higher bound of the Uniform distribution for safe time headways.
428        - min_gap_mean (Optional[float]): Lower bound of the Uniform distribution for minimum gaps.
429        - min_gap_std (Optional[float]): Higher bound of the Uniform distribution for minimum gaps.
430        - seed (int): Seed value for the random number generator.
431        - template (Optional[Person]): The template function of generated person object.
432        """
433        self.template_p = template
434        if self.template_p is None:
435            self.template_p = default_person_template_generator()
436        self.template_p.ClearField("schedules")
437        # max speed
438        self.max_speed_min = max_speed_min
439        self.max_speed_max = max_speed_max
440        # max acceleration
441        self.max_acceleration_min = max_acceleration_min
442        self.max_acceleration_max = max_acceleration_max
443        # max braking acceleration
444        self.max_braking_acceleration_min = max_braking_acceleration_min
445        self.max_braking_acceleration_max = max_braking_acceleration_max
446        # usual braking acceleration
447        self.usual_braking_acceleration_min = usual_braking_acceleration_min
448        self.usual_braking_acceleration_max = usual_braking_acceleration_max
449        # safe time headway
450        self.headway_min = headway_min
451        self.headway_max = headway_max
452        # min gap
453        self.min_gap_min = min_gap_min
454        self.min_gap_max = min_gap_max
455        # radom engine
456        self.rng = np.random.default_rng(seed)

Args

  • max_speed_mean (Optional[float]): Lower bound of the Uniform distribution for maximum speeds.
  • max_speed_std (Optional[float]): Higher bound of the Uniform distribution for maximum speeds.
  • max_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum accelerations.
  • max_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum accelerations.
  • max_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for maximum braking accelerations.
  • max_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for maximum braking accelerations.
  • usual_braking_acceleration_mean (Optional[float]): Lower bound of the Uniform distribution for usual braking accelerations.
  • usual_braking_acceleration_std (Optional[float]): Higher bound of the Uniform distribution for usual braking accelerations.
  • headway_mean (Optional[float]): Lower bound of the Uniform distribution for safe time headways.
  • headway_std (Optional[float]): Higher bound of the Uniform distribution for safe time headways.
  • min_gap_mean (Optional[float]): Lower bound of the Uniform distribution for minimum gaps.
  • min_gap_std (Optional[float]): Higher bound of the Uniform distribution for minimum gaps.
  • seed (int): Seed value for the random number generator.
  • template (Optional[Person]): The template function of generated person object.
template_p
max_speed_min
max_speed_max
max_acceleration_min
max_acceleration_max
max_braking_acceleration_min
max_braking_acceleration_max
usual_braking_acceleration_min
usual_braking_acceleration_max
headway_min
headway_max
min_gap_min
min_gap_max
rng
def template_generator(self) -> city.person.v2.person_pb2.Person:
458    def template_generator(
459        self,
460    ) -> Person:
461        rng = self.rng
462        p = Person()
463        assert self.template_p is not None
464        p.CopyFrom(self.template_p)
465        # max speed
466        if self.max_speed_min is not None and self.max_speed_max is not None:
467            p.vehicle_attribute.max_speed = np.abs(
468                rng.uniform(self.max_speed_min, self.max_speed_max)
469            )
470        # max acceleration
471        if (
472            self.max_acceleration_min is not None
473            and self.max_acceleration_max is not None
474        ):
475            p.vehicle_attribute.max_acceleration = np.abs(
476                rng.uniform(self.max_acceleration_min, self.max_acceleration_max)
477            )
478        # max braking acceleration
479        if (
480            self.max_braking_acceleration_min is not None
481            and self.max_braking_acceleration_max is not None
482        ):
483            p.vehicle_attribute.max_braking_acceleration = -np.abs(
484                rng.uniform(
485                    self.max_braking_acceleration_min, self.max_braking_acceleration_max
486                )
487            )
488        # usual braking acceleration
489        if (
490            self.usual_braking_acceleration_min is not None
491            and self.usual_braking_acceleration_max is not None
492        ):
493            p.vehicle_attribute.usual_braking_acceleration = -np.abs(
494                rng.uniform(
495                    self.usual_braking_acceleration_min,
496                    self.usual_braking_acceleration_max,
497                )
498            )
499        # safe time headway
500        if self.headway_min is not None and self.headway_max is not None:
501            p.vehicle_attribute.headway = np.abs(
502                rng.uniform(self.headway_min, self.headway_max)
503            )
504        # min gap
505        if self.min_gap_min is not None and self.min_gap_max is not None:
506            p.vehicle_attribute.min_gap = np.abs(
507                rng.uniform(self.min_gap_min, self.min_gap_max)
508            )
509        return p
class CalibratedTemplateGenerator:
512class CalibratedTemplateGenerator:
513    def __init__(self, seed: int = 0, template: Optional[Person] = None) -> None:
514        """
515        Args
516        - seed (int): Seed value for the random number generator.
517        - template (Optional[Person]): The template function of generated person object.
518        """
519        self.template_p = template
520        if self.template_p is None:
521            self.template_p = default_person_template_generator()
522        self.template_p.ClearField("schedules")
523        # radom engine
524        self.rng = np.random.default_rng(seed)
525
526    def template_generator(
527        self,
528    ) -> Person:
529        rng = self.rng
530        p = Person()
531        assert self.template_p is not None
532        p.CopyFrom(self.template_p)
533        # max acceleration
534        p.vehicle_attribute.max_acceleration = rng.choice(
535            CALIBRATED_MAX_ACC_VALUES, p=CALIBRATED_MAX_ACC_PDF
536        )
537        # usual braking acceleration
538        p.vehicle_attribute.usual_braking_acceleration = rng.choice(
539            CALIBRATED_BRAKE_ACC_VALUES, p=CALIBRATED_BRAKE_ACC_PDF
540        )
541        # safe time headway
542        p.vehicle_attribute.headway = rng.choice(
543            CALIBRATED_HEADWAY_VALUES, p=CALIBRATED_HEADWAY_PDF
544        )
545        return p
CalibratedTemplateGenerator( seed: int = 0, template: Optional[city.person.v2.person_pb2.Person] = None)
513    def __init__(self, seed: int = 0, template: Optional[Person] = None) -> None:
514        """
515        Args
516        - seed (int): Seed value for the random number generator.
517        - template (Optional[Person]): The template function of generated person object.
518        """
519        self.template_p = template
520        if self.template_p is None:
521            self.template_p = default_person_template_generator()
522        self.template_p.ClearField("schedules")
523        # radom engine
524        self.rng = np.random.default_rng(seed)

Args

  • seed (int): Seed value for the random number generator.
  • template (Optional[Person]): The template function of generated person object.
template_p
rng
def template_generator(self) -> city.person.v2.person_pb2.Person:
526    def template_generator(
527        self,
528    ) -> Person:
529        rng = self.rng
530        p = Person()
531        assert self.template_p is not None
532        p.CopyFrom(self.template_p)
533        # max acceleration
534        p.vehicle_attribute.max_acceleration = rng.choice(
535            CALIBRATED_MAX_ACC_VALUES, p=CALIBRATED_MAX_ACC_PDF
536        )
537        # usual braking acceleration
538        p.vehicle_attribute.usual_braking_acceleration = rng.choice(
539            CALIBRATED_BRAKE_ACC_VALUES, p=CALIBRATED_BRAKE_ACC_PDF
540        )
541        # safe time headway
542        p.vehicle_attribute.headway = rng.choice(
543            CALIBRATED_HEADWAY_VALUES, p=CALIBRATED_HEADWAY_PDF
544        )
545        return p
def default_person_template_generator() -> city.person.v2.person_pb2.Person:
25def default_person_template_generator() -> Person:
26    return Person(
27        attribute=PersonAttribute(),
28        type=PersonType.PERSON_TYPE_NORMAL,
29        vehicle_attribute=VehicleAttribute(
30            length=5,
31            width=2,
32            max_speed=150 / 3.6,
33            max_acceleration=3,
34            max_braking_acceleration=-10,
35            usual_acceleration=2,
36            usual_braking_acceleration=-4.5,
37            headway=1.5,
38            lane_max_speed_recognition_deviation=1.0,
39            lane_change_length=10,
40            min_gap=1,
41            emission_attribute=EmissionAttribute(
42                weight=2100,
43                type=VehicleEngineType.VEHICLE_ENGINE_TYPE_FUEL,
44                coefficient_drag=0.251,
45                lambda_s=0.29,
46                frontal_area=2.52,
47                fuel_efficiency=VehicleEngineEfficiency(
48                    energy_conversion_efficiency=0.27 * 0.049,
49                    c_ef=66.98,
50                ),
51            ),
52            model="normal",
53        ),
54        pedestrian_attribute=PedestrianAttribute(speed=1.34, model="normal"),
55        bike_attribute=BikeAttribute(speed=5, model="normal"),
56    )
def default_bus_template_generator() -> city.person.v2.person_pb2.Person:
59def default_bus_template_generator() -> Person:
60    return Person(
61        attribute=PersonAttribute(),
62        type=PersonType.PERSON_TYPE_NORMAL,
63        vehicle_attribute=VehicleAttribute(
64            length=15,
65            width=2,
66            max_speed=150 / 3.6,
67            max_acceleration=3,
68            max_braking_acceleration=-10,
69            usual_acceleration=2,
70            usual_braking_acceleration=-4.5,
71            headway=1.5,
72            lane_max_speed_recognition_deviation=1.0,
73            lane_change_length=10,
74            min_gap=1,
75            emission_attribute=EmissionAttribute(
76                weight=18000,
77                type=VehicleEngineType.VEHICLE_ENGINE_TYPE_FUEL,
78                coefficient_drag=0.251,
79                lambda_s=0.29,
80                frontal_area=8.67,
81                fuel_efficiency=VehicleEngineEfficiency(
82                    energy_conversion_efficiency=0.27 * 0.049,
83                    c_ef=70.27,
84                ),
85            ),
86            model="normal",
87        ),
88        pedestrian_attribute=PedestrianAttribute(speed=1.34, model="normal"),
89        bike_attribute=BikeAttribute(speed=5, model="normal"),
90    )
class RandomGenerator:
 23class RandomGenerator:
 24    """
 25    Random trip generator
 26    """
 27
 28    def __init__(
 29        self,
 30        m: Map,
 31        position_modes: List[PositionMode],
 32        trip_mode: TripMode,
 33        template_func: Callable[[], Person] = default_person_template_generator,
 34    ):
 35        """
 36        Args:
 37        - m (Map): The Map.
 38        - position_modes (List[PositionMode]): The schedules generated will follow the position modes in this list. For example, if the list is [PositionMode.AOI, PositionMode.LANE, PositionMode.AOI], the generated person will start from an AOI, then go to a lane, and finally go to an AOI.
 39        - trip_mode (TripMode): The target trip mode.
 40        - template_func (Callable[[],Person]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied.
 41        """
 42        self.m = m
 43        self.modes = position_modes
 44        if len(self.modes) <= 1:
 45            raise ValueError("position_modes should have at least 2 elements")
 46        self.trip_mode = trip_mode
 47        self.template = template_func
 48
 49        # Pre-process the map to build the randomly selected candidate set
 50        walk = is_walking(trip_mode)
 51        self._aoi_candidates = [
 52            aoi
 53            for aoi in self.m.aois
 54            if len(aoi.walking_positions if walk else aoi.driving_positions) > 0
 55        ]
 56        self._lane_candidates = [
 57            lane
 58            for lane in self.m.lanes
 59            if lane.parent_id < JUNC_START_ID
 60            and lane.type
 61            == (LaneType.LANE_TYPE_WALKING if walk else LaneType.LANE_TYPE_DRIVING)
 62        ]
 63        if PositionMode.AOI in position_modes and len(self._aoi_candidates) == 0:
 64            raise ValueError("No available AOI")
 65        if PositionMode.LANE in position_modes and len(self._lane_candidates) == 0:
 66            raise ValueError("No available lane")
 67
 68    def _rand_position(self, candidates: Union[List[Aoi], List[Lane]]):
 69        index = np.random.randint(0, len(candidates))
 70        candidate = candidates[index]
 71        if isinstance(candidate, Aoi):
 72            return Position(aoi_position=AoiPosition(aoi_id=candidate.id))
 73        else:
 74            return Position(
 75                lane_position=LanePosition(
 76                    lane_id=candidate.id, s=np.random.rand() * candidate.length
 77                )
 78            )
 79
 80    def uniform(
 81        self,
 82        num: int,
 83        first_departure_time_range: Tuple[float, float],
 84        schedule_interval_range: Tuple[float, float],
 85        seed: Optional[int] = None,
 86        start_id: Optional[int] = None,
 87    ) -> List[Person]:
 88        """
 89        Generate a person object by uniform random sampling
 90
 91        Args:
 92        - num (int): The number of person objects to generate.
 93        - first_departure_time_range (Tuple[float, float]): The range of the first departure time (uniform random sampling).
 94        - schedule_interval_range (Tuple[float, float]): The range of the interval between schedules (uniform random sampling).
 95        - seed (Optional[int], optional): The random seed. Defaults to None.
 96        - start_id (Optional[int], optional): The start id of the generated person objects. Defaults to None. If None, the `id` will be NOT set.
 97
 98        Returns:
 99        - List[Person]: The generated person objects.
100        """
101        if seed is not None:
102            np.random.seed(seed)
103        persons = []
104        for i in range(num):
105            p = Person()
106            p.CopyFrom(self.template())
107            p.ClearField("schedules")
108            if start_id is not None:
109                p.id = start_id + i
110            p.home.CopyFrom(
111                self._rand_position(
112                    self._aoi_candidates
113                    if self.modes[0] == PositionMode.AOI
114                    else self._lane_candidates
115                )
116            )
117            departure_time = np.random.uniform(*first_departure_time_range)
118            for mode in self.modes[1:]:
119                schedule = cast(Schedule, p.schedules.add())
120                schedule.departure_time = departure_time
121                schedule.loop_count = 1
122                trip = Trip(
123                    mode=self.trip_mode,
124                    end=self._rand_position(
125                        self._aoi_candidates
126                        if mode == PositionMode.AOI
127                        else self._lane_candidates
128                    ),
129                )
130                schedule.trips.append(trip)
131                departure_time += np.random.uniform(*schedule_interval_range)
132            persons.append(p)
133        return persons

Random trip generator

RandomGenerator( m: city.map.v2.map_pb2.Map, position_modes: List[PositionMode], trip_mode: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object>, template_func: Callable[[], city.person.v2.person_pb2.Person] = <function default_person_template_generator>)
28    def __init__(
29        self,
30        m: Map,
31        position_modes: List[PositionMode],
32        trip_mode: TripMode,
33        template_func: Callable[[], Person] = default_person_template_generator,
34    ):
35        """
36        Args:
37        - m (Map): The Map.
38        - position_modes (List[PositionMode]): The schedules generated will follow the position modes in this list. For example, if the list is [PositionMode.AOI, PositionMode.LANE, PositionMode.AOI], the generated person will start from an AOI, then go to a lane, and finally go to an AOI.
39        - trip_mode (TripMode): The target trip mode.
40        - template_func (Callable[[],Person]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied.
41        """
42        self.m = m
43        self.modes = position_modes
44        if len(self.modes) <= 1:
45            raise ValueError("position_modes should have at least 2 elements")
46        self.trip_mode = trip_mode
47        self.template = template_func
48
49        # Pre-process the map to build the randomly selected candidate set
50        walk = is_walking(trip_mode)
51        self._aoi_candidates = [
52            aoi
53            for aoi in self.m.aois
54            if len(aoi.walking_positions if walk else aoi.driving_positions) > 0
55        ]
56        self._lane_candidates = [
57            lane
58            for lane in self.m.lanes
59            if lane.parent_id < JUNC_START_ID
60            and lane.type
61            == (LaneType.LANE_TYPE_WALKING if walk else LaneType.LANE_TYPE_DRIVING)
62        ]
63        if PositionMode.AOI in position_modes and len(self._aoi_candidates) == 0:
64            raise ValueError("No available AOI")
65        if PositionMode.LANE in position_modes and len(self._lane_candidates) == 0:
66            raise ValueError("No available lane")

Args:

  • m (Map): The Map.
  • position_modes (List[PositionMode]): The schedules generated will follow the position modes in this list. For example, if the list is [PositionMode.AOI, PositionMode.LANE, PositionMode.AOI], the generated person will start from an AOI, then go to a lane, and finally go to an AOI.
  • trip_mode (TripMode): The target trip mode.
  • template_func (Callable[[],Person]): The template function of generated person object, whose schedules, home will be replaced and others will be copied.
m
modes
trip_mode
template
def uniform( self, num: int, first_departure_time_range: Tuple[float, float], schedule_interval_range: Tuple[float, float], seed: Optional[int] = None, start_id: Optional[int] = None) -> List[city.person.v2.person_pb2.Person]:
 80    def uniform(
 81        self,
 82        num: int,
 83        first_departure_time_range: Tuple[float, float],
 84        schedule_interval_range: Tuple[float, float],
 85        seed: Optional[int] = None,
 86        start_id: Optional[int] = None,
 87    ) -> List[Person]:
 88        """
 89        Generate a person object by uniform random sampling
 90
 91        Args:
 92        - num (int): The number of person objects to generate.
 93        - first_departure_time_range (Tuple[float, float]): The range of the first departure time (uniform random sampling).
 94        - schedule_interval_range (Tuple[float, float]): The range of the interval between schedules (uniform random sampling).
 95        - seed (Optional[int], optional): The random seed. Defaults to None.
 96        - start_id (Optional[int], optional): The start id of the generated person objects. Defaults to None. If None, the `id` will be NOT set.
 97
 98        Returns:
 99        - List[Person]: The generated person objects.
100        """
101        if seed is not None:
102            np.random.seed(seed)
103        persons = []
104        for i in range(num):
105            p = Person()
106            p.CopyFrom(self.template())
107            p.ClearField("schedules")
108            if start_id is not None:
109                p.id = start_id + i
110            p.home.CopyFrom(
111                self._rand_position(
112                    self._aoi_candidates
113                    if self.modes[0] == PositionMode.AOI
114                    else self._lane_candidates
115                )
116            )
117            departure_time = np.random.uniform(*first_departure_time_range)
118            for mode in self.modes[1:]:
119                schedule = cast(Schedule, p.schedules.add())
120                schedule.departure_time = departure_time
121                schedule.loop_count = 1
122                trip = Trip(
123                    mode=self.trip_mode,
124                    end=self._rand_position(
125                        self._aoi_candidates
126                        if mode == PositionMode.AOI
127                        else self._lane_candidates
128                    ),
129                )
130                schedule.trips.append(trip)
131                departure_time += np.random.uniform(*schedule_interval_range)
132            persons.append(p)
133        return persons

Generate a person object by uniform random sampling

Args:

  • num (int): The number of person objects to generate.
  • first_departure_time_range (Tuple[float, float]): The range of the first departure time (uniform random sampling).
  • schedule_interval_range (Tuple[float, float]): The range of the interval between schedules (uniform random sampling).
  • seed (Optional[int], optional): The random seed. Defaults to None.
  • start_id (Optional[int], optional): The start id of the generated person objects. Defaults to None. If None, the id will be NOT set.

Returns:

  • List[Person]: The generated person objects.
class PositionMode(enum.Enum):
18class PositionMode(Enum):
19    AOI = 0
20    LANE = 1

An enumeration.

AOI = <PositionMode.AOI: 0>
LANE = <PositionMode.LANE: 1>
class GravityGenerator:
  9class GravityGenerator:
 10    """
 11    Generate OD matrix inside the given area, based on the gravity model.
 12    """
 13
 14    def __init__(
 15        self,
 16        Lambda: float,
 17        Alpha: float,
 18        Beta: float,
 19        Gamma: float,
 20    ):
 21        """
 22        Args:
 23        - pops (list[int]): The population of each area.
 24        - dists (np.ndarray): The distance matrix between each pair of areas.
 25        """
 26        self._lambda = Lambda
 27        self._alpha = Alpha
 28        self._beta = Beta
 29        self._gamma = Gamma
 30
 31    def load_area(
 32        self,
 33        area: gpd.GeoDataFrame,
 34    ):
 35        """
 36        Load the area data.
 37
 38        Args:
 39        - area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string.
 40        """
 41        self._area = area
 42
 43    def _get_one_point(self):
 44        """
 45        get one point from the shapefile
 46        """
 47        first_geometry = self._area.geometry.iloc[0]
 48
 49        # for different types of geometry, get the first point
 50        if first_geometry.geom_type == "Polygon":
 51            first_point = first_geometry.exterior.coords[0]
 52        elif first_geometry.geom_type == "MultiPolygon":
 53            first_polygon = list(first_geometry.geoms)[0]
 54            first_point = first_polygon.exterior.coords[0]
 55        else:
 56            raise ValueError("Geometry type not supported")
 57
 58        pointx, pointy = first_point[0], first_point[1]
 59
 60        return pointx, pointy
 61
 62    def _calculate_utm_epsg(self, longitude: float, latitude: float):
 63        """
 64        Calculate the UTM zone and corresponding EPSG code for a given longitude and latitude.
 65
 66        Args:
 67        longitude (float): The longitude of the location.
 68        latitude (float): The latitude of the location.
 69
 70        Returns:
 71        int: The EPSG code for the UTM zone.
 72        """
 73        # Determine the UTM zone from the longitude
 74        utm_zone = int((longitude + 180) / 6) + 1
 75
 76        # Determine the hemisphere and construct the EPSG code
 77        if latitude >= 0:
 78            # Northern Hemisphere
 79            epsg_code = 32600 + utm_zone
 80        else:
 81            # Southern Hemisphere
 82            epsg_code = 32700 + utm_zone
 83
 84        return epsg_code
 85
 86    def cal_distance(self):
 87        """
 88        Euclidean distance matrix
 89        get the distance matrix for regions in the area
 90        based on the shapefile of the area
 91        """
 92        pointx, pointy = self._get_one_point()
 93        epsg = self._calculate_utm_epsg(pointx, pointy)
 94        area = self._area.to_crs(f"EPSG:{epsg}")
 95        area["centroid"] = area["geometry"].centroid  # type:ignore
 96
 97        # dis
 98        points = area["centroid"].apply(lambda p: [p.x, p.y]).tolist()  # type:ignore
 99        dist_matrix = distance_matrix(points, points)
100
101        distance = dist_matrix.astype(np.float32)
102
103        return distance
104
105    def generate(self, pops):  #: list[int]
106        """
107        Generate the OD matrix based on the gravity model.
108
109        Args:
110        - pops (list[int]): The population of each area.
111        - dists (np.ndarray): The distance matrix between each pair of areas.
112
113        Returns:
114        - np.ndarray: The generated OD matrix.
115        """
116        dists = self.cal_distance()
117
118        N = np.power(pops, self._alpha)
119        M = np.power(pops, self._beta)
120        D = np.power(dists, self._gamma)
121
122        N = N.reshape(-1, 1).repeat(len(pops), axis=1)
123        M = M.reshape(1, -1).repeat(len(pops), axis=0)
124
125        od_matrix = self._lambda * N * M / (D + 1e-8)
126        od_matrix = od_matrix.astype(np.int64)
127        od_matrix[od_matrix < 0] = 0
128        for i in range(od_matrix.shape[0]):
129            od_matrix[i, i] = 0
130
131        return od_matrix

Generate OD matrix inside the given area, based on the gravity model.

GravityGenerator(Lambda: float, Alpha: float, Beta: float, Gamma: float)
14    def __init__(
15        self,
16        Lambda: float,
17        Alpha: float,
18        Beta: float,
19        Gamma: float,
20    ):
21        """
22        Args:
23        - pops (list[int]): The population of each area.
24        - dists (np.ndarray): The distance matrix between each pair of areas.
25        """
26        self._lambda = Lambda
27        self._alpha = Alpha
28        self._beta = Beta
29        self._gamma = Gamma

Args:

  • pops (list[int]): The population of each area.
  • dists (np.ndarray): The distance matrix between each pair of areas.
def load_area(self, area: geopandas.geodataframe.GeoDataFrame):
31    def load_area(
32        self,
33        area: gpd.GeoDataFrame,
34    ):
35        """
36        Load the area data.
37
38        Args:
39        - area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string.
40        """
41        self._area = area

Load the area data.

Args:

  • area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined crs string.
def cal_distance(self):
 86    def cal_distance(self):
 87        """
 88        Euclidean distance matrix
 89        get the distance matrix for regions in the area
 90        based on the shapefile of the area
 91        """
 92        pointx, pointy = self._get_one_point()
 93        epsg = self._calculate_utm_epsg(pointx, pointy)
 94        area = self._area.to_crs(f"EPSG:{epsg}")
 95        area["centroid"] = area["geometry"].centroid  # type:ignore
 96
 97        # dis
 98        points = area["centroid"].apply(lambda p: [p.x, p.y]).tolist()  # type:ignore
 99        dist_matrix = distance_matrix(points, points)
100
101        distance = dist_matrix.astype(np.float32)
102
103        return distance

Euclidean distance matrix get the distance matrix for regions in the area based on the shapefile of the area

def generate(self, pops):
105    def generate(self, pops):  #: list[int]
106        """
107        Generate the OD matrix based on the gravity model.
108
109        Args:
110        - pops (list[int]): The population of each area.
111        - dists (np.ndarray): The distance matrix between each pair of areas.
112
113        Returns:
114        - np.ndarray: The generated OD matrix.
115        """
116        dists = self.cal_distance()
117
118        N = np.power(pops, self._alpha)
119        M = np.power(pops, self._beta)
120        D = np.power(dists, self._gamma)
121
122        N = N.reshape(-1, 1).repeat(len(pops), axis=1)
123        M = M.reshape(1, -1).repeat(len(pops), axis=0)
124
125        od_matrix = self._lambda * N * M / (D + 1e-8)
126        od_matrix = od_matrix.astype(np.int64)
127        od_matrix[od_matrix < 0] = 0
128        for i in range(od_matrix.shape[0]):
129            od_matrix[i, i] = 0
130
131        return od_matrix

Generate the OD matrix based on the gravity model.

Args:

  • pops (list[int]): The population of each area.
  • dists (np.ndarray): The distance matrix between each pair of areas.

Returns:

  • np.ndarray: The generated OD matrix.
class AigcGenerator:
 8class AigcGenerator:
 9    """
10    Generate OD matrix inside the given area.
11    """
12
13    def __init__(self):
14        self.generator = generator.Generator()
15
16    def set_satetoken(self, satetoken: str):
17        """
18        Set the satetoken for the generator.
19        """
20        self.generator.set_satetoken(satetoken)
21
22    def load_area(
23        self,
24        area: gpd.GeoDataFrame,
25    ):
26        """
27        Load the area data.
28
29        Args:
30        - area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string.
31        """
32        self.generator.load_area(area)
33
34    def generate(self):
35        """
36        Generate the OD matrix.
37        """
38        return self.generator.generate()

Generate OD matrix inside the given area.

generator
def set_satetoken(self, satetoken: str):
16    def set_satetoken(self, satetoken: str):
17        """
18        Set the satetoken for the generator.
19        """
20        self.generator.set_satetoken(satetoken)

Set the satetoken for the generator.

def load_area(self, area: geopandas.geodataframe.GeoDataFrame):
22    def load_area(
23        self,
24        area: gpd.GeoDataFrame,
25    ):
26        """
27        Load the area data.
28
29        Args:
30        - area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string.
31        """
32        self.generator.load_area(area)

Load the area data.

Args:

  • area (gpd.GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined crs string.
def generate(self):
34    def generate(self):
35        """
36        Generate the OD matrix.
37        """
38        return self.generator.generate()

Generate the OD matrix.

class TripGenerator:
 400class TripGenerator:
 401    """
 402    generate trip from OD matrix.
 403    """
 404
 405    def __init__(
 406        self,
 407        m: Map,
 408        pop_tif_path: Optional[str] = None,
 409        activity_distributions: Optional[dict] = None,
 410        driving_speed: float = 30 / 3.6,
 411        parking_fee: float = 20.0,
 412        driving_penalty: float = 0.0,
 413        subway_speed: float = 35 / 3.6,
 414        subway_penalty: float = 600.0,
 415        subway_expense: float = 10.0,
 416        bus_speed: float = 15 / 3.6,
 417        bus_penalty: float = 600.0,
 418        bus_expense: float = 5.0,
 419        bike_speed: float = 10 / 3.6,
 420        bike_penalty: float = 0.0,
 421        template_func: Callable[[], Person] = default_person_template_generator,
 422        add_pop: bool = False,
 423        multiprocessing_chunk_size: int = 500,
 424        workers: int = cpu_count(),
 425    ):
 426        """
 427        Args:
 428        - m (Map): The Map.
 429        - pop_tif_path (str): path to population tif file.
 430        - activity_distributions (dict): human mobility mode and its probability. e.g. {"HWH": 18.0, "HWH+": 82.0,}. H for go home, W for go to work, O or + for other activities
 431        - driving_speed (float): vehicle speed(m/s) for traffic mode assignment.
 432        - parking_fee (float): money cost(ï¿¥) of parking a car for traffic mode assignment.
 433        - driving_penalty (float): extra cost(s) of vehicle for traffic mode assignment.
 434        - subway_speed (float): subway speed(m/s) for traffic mode assignment.
 435        - subway_penalty (float): extra cost(s) of subway for traffic mode assignment.
 436        - subway_expense (float): money cost(ï¿¥) of subway for traffic mode assignment.
 437        - bus_speed (float): bus speed(m/s) for traffic mode assignment.
 438        - bus_penalty (float): extra cost(s) of bus for traffic mode assignment.
 439        - bus_expense (float): money  cost(ï¿¥) of bus for traffic mode assignment.
 440        - bike_speed (float): extra cost(s) of bike for traffic mode assignment.
 441        - bike_penalty (float): money  cost(ï¿¥) of bike for traffic mode assignment.
 442        - template_func (Callable[[],Person]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied.
 443        - add_pop (bool): Add population to aois.
 444        - multiprocessing_chunk_size (int): the maximum size of each multiprocessing chunk
 445        - workers (int): number of workers.
 446        """
 447        global SUBWAY_EXPENSE, BUS_EXPENSE, DRIVING_SPEED, DRIVING_PENALTY, SUBWAY_SPEED, SUBWAY_PENALTY, BUS_SPEED, BUS_PENALTY, BIKE_SPEED, BIKE_PENALTY, PARKING_FEE
 448        SUBWAY_EXPENSE, BUS_EXPENSE = subway_expense, bus_expense
 449        DRIVING_SPEED, DRIVING_PENALTY, PARKING_FEE = (
 450            driving_speed,
 451            driving_penalty,
 452            parking_fee,
 453        )
 454        SUBWAY_SPEED, SUBWAY_PENALTY = subway_speed, subway_penalty
 455        BUS_SPEED, BUS_PENALTY = bus_speed, bus_penalty
 456        BIKE_SPEED, BIKE_PENALTY = bike_speed, bike_penalty
 457        self.m = m
 458        self.pop_tif_path = pop_tif_path
 459        self.add_pop = add_pop
 460        self.projector = pyproj.Proj(m.header.projection)
 461        self.workers = workers
 462        self.template = template_func
 463        self.persons = []
 464        global MAX_CHUNK_SIZE
 465        MAX_CHUNK_SIZE = multiprocessing_chunk_size
 466        # activity proportion
 467        if activity_distributions is not None:
 468            ori_modes_stat = {
 469                k: float(v)
 470                for k, v in activity_distributions.items()
 471                if k in HUMAN_MODE_STATS
 472            }
 473        else:
 474            ori_modes_stat = HUMAN_MODE_STATS
 475        self.modes = [mode for mode in ori_modes_stat.keys()]
 476        self.p = np.array([prob for prob in ori_modes_stat.values()])
 477        self.p_mode = self.p / sum(self.p)
 478
 479    def _read_aois(self):
 480        aois = []
 481        # read aois
 482        for i in self.m.aois:
 483            a = {}
 484            a["id"] = i.id
 485            a["external"] = {
 486                "area": i.area,
 487                "urban_land_use": i.urban_land_use,
 488            }
 489            a["geo"] = [self.projector(p.x, p.y, inverse=True) for p in i.positions]
 490            a["shapely"] = geometry.Polygon(
 491                [
 492                    p.x,
 493                    p.y,
 494                ]
 495                for p in i.positions
 496            )
 497            a["has_driving_gates"] = len(i.driving_gates) > 0
 498            aois.append(a)
 499
 500        def get_aoi_catg(urban_land_use: str):
 501            if urban_land_use in {"R"}:
 502                return "residential"
 503            elif urban_land_use in {"B29"}:
 504                return "business"
 505            elif urban_land_use in {
 506                "B1",
 507                "B",
 508            }:
 509                return "commercial"
 510            elif urban_land_use in {"M"}:
 511                return "industrial"
 512            elif urban_land_use in {"S4", "S"}:
 513                return "transportation"
 514            elif urban_land_use in {"A", "A1"}:
 515                return "administrative"
 516            elif urban_land_use in {"A3"}:
 517                return "education"
 518            elif urban_land_use in {"A5"}:
 519                return "medical"
 520            elif urban_land_use in {"B32", "A4", "A2"}:
 521                return "sport and cultual"
 522            elif urban_land_use in {"B31", "G1", "B3", "B13"}:
 523                return "park and leisure"
 524            return "other"
 525
 526        # add catg
 527        for aoi in aois:
 528            aoi["external"]["catg"] = get_aoi_catg(aoi["external"]["urban_land_use"])
 529        # population
 530        if self.add_pop and self.pop_tif_path is not None:
 531            raise NotImplementedError(
 532                "Adding population to AOIs in trip_generator has been removed!"
 533            )
 534            # geos = []
 535            # for aoi_data in aois:
 536            #     geos.append(
 537            #         Feature(
 538            #             geometry=Polygon([[list(c) for c in aoi_data["geo"]]]),
 539            #             properties={
 540            #                 "id": aoi_data["id"],
 541            #             },
 542            #         )
 543            #     )
 544            # geos = FeatureCollection(geos)
 545            # geos = geo2pop(geos, self.pop_tif_path)
 546
 547            # geos = cast(FeatureCollection, geos)
 548            # aoi_id2pop = defaultdict(int)
 549            # for feature in geos["features"]:
 550            #     aoi_id = feature["properties"]["id"]
 551            #     pop = feature["properties"]["population"]
 552            #     aoi_id2pop[aoi_id] = pop
 553            # for aoi in aois:
 554            #     aoi_id = aoi["id"]
 555            #     aoi["external"]["population"] = aoi_id2pop.get(aoi_id, 0)
 556        else:
 557            for aoi in aois:
 558                aoi["external"]["population"] = aoi["external"]["area"]
 559        self.aois = aois
 560
 561    def _read_regions(self):
 562        self.regions = []
 563        for i, poly in enumerate(self.areas.geometry.to_crs(self.m.header.projection)):
 564            self.area_shapes.append(poly)
 565            r = {
 566                "geometry": geo_coords(poly),  # xy coords
 567                "ori_id": i,
 568                "region_id": i,
 569            }
 570            self.regions.append(r)
 571
 572    def _read_od_matrix(self):
 573        logging.info("Reading original ods")
 574        n_region = len(self.regions)
 575        assert (
 576            n_region == self.od_matrix.shape[0] and n_region == self.od_matrix.shape[1]
 577        )
 578        # OD-matrix contains time axis = 2
 579        if len(self.od_matrix.shape) > 2:
 580            od = self.od_matrix
 581            self.LEN_OD_TIMES = od.shape[2]
 582        else:
 583            orig_od = np.expand_dims(self.od_matrix, axis=2)
 584            self.LEN_OD_TIMES = 1
 585            od = np.broadcast_to(
 586                orig_od, (n_region, n_region, self.LEN_OD_TIMES)
 587            ).astype(np.int64)
 588        sum_od_j = np.sum(od, axis=1)
 589        od_prob = od / sum_od_j[:, np.newaxis]
 590        od_prob = np.nan_to_num(od_prob)
 591        self.od = od
 592        self.od_prob = od_prob
 593
 594    def _match_aoi2region(self):
 595        global shapes
 596        shapes = self.area_shapes
 597        aois = self.aois
 598        results = []
 599        _match_aoi_unit_with_arg = partial(_match_aoi_unit, self.projector)
 600        for i in range(0, len(aois), MAX_BATCH_SIZE):
 601            aois_batch = aois[i : i + MAX_BATCH_SIZE]
 602            with Pool(processes=self.workers) as pool:
 603                results += pool.map(
 604                    _match_aoi_unit_with_arg,
 605                    aois_batch,
 606                    chunksize=min(ceil(len(aois_batch) / self.workers), MAX_CHUNK_SIZE),
 607                )
 608        results = [r for r in results if r is not None]
 609        for r in results:
 610            aoi_id, reg_id = r[:2]
 611            self.aoi2region[aoi_id] = reg_id
 612            self.region2aoi[reg_id].append(aoi_id)
 613        logging.info(f"AOI matched: {len(self.aoi2region)}")
 614
 615    def _generate_mobi(
 616        self,
 617        agent_num: int = 10000,
 618        area_pops: Optional[list] = None,
 619        person_profiles: Optional[list] = None,
 620        seed: int = 0,
 621    ):
 622        global region2aoi, aoi_map, aoi_type2ids
 623        global home_dist, work_od, other_od, educate_od
 624        global available_trip_modes
 625        available_trip_modes = self.available_trip_modes
 626        if "bus" in available_trip_modes and "subway" in available_trip_modes:
 627            available_trip_modes.append("bus_subway")
 628        region2aoi = self.region2aoi
 629        aoi_map = {d["id"]: d for d in self.aois}
 630        n_region = len(self.regions)
 631        home_dist, work_od, educate_od, other_od = extract_HWEO_from_od_matrix(
 632            self.aois,
 633            n_region,
 634            self.aoi2region,
 635            self.aoi_type2ids,
 636            self.od_prob,
 637            self.LEN_OD_TIMES,
 638        )
 639        aoi_type2ids = self.aoi_type2ids
 640        agent_args = []
 641        a_home_regions = []
 642        if area_pops is not None:
 643            for ii, pop in enumerate(area_pops):
 644                pop_num = int(pop)
 645                if pop_num > 0:
 646                    a_home_regions += [ii for _ in range(pop_num)]
 647            agent_num = sum(a_home_regions)
 648        else:
 649            a_home_regions = [None for _ in range(agent_num)]
 650        rng = np.random.default_rng(seed)
 651        if person_profiles is not None:
 652            a_profiles = person_profiles
 653        else:
 654            a_profiles = gen_profiles(agent_num, self.workers, MAX_CHUNK_SIZE)
 655        a_modes = [self.modes for _ in range(agent_num)]
 656        a_p_modes = [self.p_mode for _ in range(agent_num)]
 657        for i, (a_home_region, a_profile, a_mode, a_p_mode) in enumerate(
 658            zip(a_home_regions, a_profiles, a_modes, a_p_modes)
 659        ):
 660            agent_args.append(
 661                (
 662                    i,
 663                    rng.integers(0, 2**16 - 1),
 664                    a_home_region,
 665                    a_profile,
 666                    a_mode,
 667                    a_p_mode,
 668                )
 669            )
 670        partial_args = (
 671            len(self.regions),
 672            self.projector,
 673            self.departure_prob,
 674            self.LEN_OD_TIMES,
 675        )
 676        _process_agent_unit_with_arg = partial(_process_agent_unit, partial_args)
 677        raw_persons = []
 678        for i in range(0, len(agent_args), MAX_BATCH_SIZE):
 679            agent_args_batch = agent_args[i : i + MAX_BATCH_SIZE]
 680            with Pool(processes=self.workers) as pool:
 681                raw_persons += pool.map(
 682                    _process_agent_unit_with_arg,
 683                    agent_args_batch,
 684                    chunksize=min(
 685                        ceil(len(agent_args_batch) / self.workers), MAX_CHUNK_SIZE
 686                    ),
 687                )
 688        raw_persons = [r for r in raw_persons]
 689        for agent_id, (
 690            aoi_list,
 691            person_home,
 692            person_work,
 693            a_profile,
 694            times,
 695            trip_modes,
 696            trip_models,
 697            activities,
 698        ) in enumerate(raw_persons):
 699            times = np.array(times) * 3600  # hour->second
 700            p = Person()
 701            p.CopyFrom(self.template())
 702            p.ClearField("schedules")
 703            p.id = agent_id
 704            p.home.CopyFrom(Position(aoi_position=AoiPosition(aoi_id=person_home)))
 705            p.work.CopyFrom(Position(aoi_position=AoiPosition(aoi_id=person_work)))
 706            p.profile.CopyFrom(dict2pb(a_profile, PersonProfile()))
 707            for time, aoi_id, trip_mode, activity, trip_model in zip(
 708                times, aoi_list[1:], trip_modes, activities, trip_models
 709            ):
 710                schedule = cast(Schedule, p.schedules.add())
 711                schedule.departure_time = time
 712                schedule.loop_count = 1
 713                trip = Trip(
 714                    mode=cast(
 715                        TripMode,
 716                        trip_mode,
 717                    ),
 718                    end=Position(aoi_position=AoiPosition(aoi_id=aoi_id)),
 719                    activity=activity,
 720                    model=trip_model,
 721                )
 722                schedule.trips.append(trip)
 723            self.persons.append(p)
 724
 725    def generate_persons(
 726        self,
 727        od_matrix: np.ndarray,
 728        areas: GeoDataFrame,
 729        available_trip_modes: List[str] = ["drive", "walk", "bus", "subway", "taxi"],
 730        departure_time_curve: Optional[list[float]] = None,
 731        area_pops: Optional[list] = None,
 732        person_profiles: Optional[list[dict]] = None,
 733        seed: int = 0,
 734        agent_num: Optional[int] = None,
 735    ) -> List[Person]:
 736        """
 737        Args:
 738        - od_matrix (numpy.ndarray): The OD matrix.
 739        - areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string.
 740        - available_trip_modes (list[str]): available trip modes for person schedules.
 741        - departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h.
 742        - area_pops (list): list of populations in each area. If is not None, # of the persons departs from each home position is exactly equal to the given pop num.
 743        - person_profiles (Optional[List[dict]]): list of profiles in dict format.
 744        - seed (int): The random seed.
 745        - agent_num (int): number of agents to generate.
 746
 747        Returns:
 748        - List[Person]: The generated person objects.
 749        """
 750        # init
 751        self.area_shapes = []
 752        self.aoi2region = defaultdict(list)
 753        self.region2aoi = defaultdict(list)
 754        self.aoi_type2ids = defaultdict(list)
 755        self.persons = []
 756        # user input time curve
 757        if departure_time_curve is not None:
 758            assert len(departure_time_curve) >= 24
 759            sum_times = sum(departure_time_curve)
 760            self.departure_prob = np.array(
 761                [d / sum_times for d in departure_time_curve]
 762            )
 763        else:
 764            self.departure_prob = None
 765        self.od_matrix = od_matrix
 766        self.areas = areas
 767        self.available_trip_modes = available_trip_modes
 768        self._read_aois()
 769        self._read_regions()
 770        self._read_od_matrix()
 771        self._match_aoi2region()
 772        if agent_num is None:
 773            agent_num = int(np.sum(self.od) / self.LEN_OD_TIMES)
 774        if not agent_num >= 1:
 775            logging.warning("agent_num should >=1")
 776            return []
 777        self._generate_mobi(agent_num, area_pops, person_profiles, seed)
 778        return self.persons
 779
 780    def _get_driving_pos_dict(self) -> Dict[Tuple[int, int], LanePosition]:
 781        road_aoi_id2d_pos = {}
 782        road_id2d_lane_ids = {}
 783        lane_id2parent_road_id = {}
 784        m_lanes = {l.id: l for l in self.m.lanes}
 785        m_roads = {r.id: r for r in self.m.roads}
 786        m_aois = {a.id: a for a in self.m.aois}
 787        for road_id, road in m_roads.items():
 788            d_lane_ids = [
 789                lid
 790                for lid in road.lane_ids
 791                if m_lanes[lid].type == mapv2.LANE_TYPE_DRIVING
 792            ]
 793            road_id2d_lane_ids[road_id] = d_lane_ids
 794        for lane_id, lane in m_lanes.items():
 795            parent_id = lane.parent_id
 796            # road lane
 797            if parent_id in m_roads:
 798                lane_id2parent_road_id[lane_id] = parent_id
 799        for aoi_id, aoi in m_aois.items():
 800            for d_pos in aoi.driving_positions:
 801                pos_lane_id, _ = d_pos.lane_id, d_pos.s
 802                # junction lane
 803                assert (
 804                    pos_lane_id in lane_id2parent_road_id
 805                ), f"Bad lane position {d_pos} at AOI {aoi_id}"
 806                parent_road_id = lane_id2parent_road_id[pos_lane_id]
 807                road_aoi_key = (parent_road_id, aoi_id)
 808                road_aoi_id2d_pos[road_aoi_key] = d_pos
 809        return road_aoi_id2d_pos
 810
 811    def generate_public_transport_drivers(
 812        self,
 813        template_func: Optional[Callable[[], Person]] = None,
 814        stop_duration_time: float = 30.0,
 815        seed: int = 0,
 816    ) -> List[Person]:
 817        """
 818        Args:
 819        - template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. If not provided, the `temp_func` provided in `__init__` will be utilized.
 820        - stop_duration_time (float): The duration time (in second) for bus at each stop.
 821        - seed (int): The random seed.
 822
 823        Returns:
 824        - List[Person]: The generated driver objects.
 825        """
 826        self.persons = []
 827        road_aoi_id2d_pos = self._get_driving_pos_dict()
 828        person_id = PT_START_ID
 829        _template = template_func if template_func is not None else self.template
 830        for sl in self.m.sublines:
 831            departure_times = list(sl.schedules.departure_times)
 832            if not sl.type in {mapv2.SUBLINE_TYPE_BUS, mapv2.SUBLINE_TYPE_SUBWAY}:
 833                continue
 834            person_id, generated_drivers = gen_bus_drivers(
 835                person_id,
 836                _template,
 837                departure_times,
 838                stop_duration_time,
 839                road_aoi_id2d_pos,
 840                sl,
 841            )
 842            self.persons.extend(generated_drivers)
 843        return self.persons
 844
 845    def _generate_schedules(self, input_persons: List[Person], seed: int):
 846        global region2aoi, aoi_map, aoi_type2ids
 847        global home_dist, work_od, other_od, educate_od
 848        global available_trip_modes
 849        available_trip_modes = self.available_trip_modes
 850        if "bus" in available_trip_modes and "subway" in available_trip_modes:
 851            available_trip_modes.append("bus_subway")
 852        region2aoi = self.region2aoi
 853        aoi_map = {d["id"]: d for d in self.aois}
 854        n_region = len(self.regions)
 855        home_dist, work_od, educate_od, other_od = extract_HWEO_from_od_matrix(
 856            self.aois,
 857            n_region,
 858            self.aoi2region,
 859            self.aoi_type2ids,
 860            self.od_prob,
 861            self.LEN_OD_TIMES,
 862        )
 863        orig_persons = deepcopy(input_persons)
 864        person_args = []
 865        bad_person_indexes = set()
 866        rng = np.random.default_rng(seed)
 867        for idx, p in enumerate(orig_persons):
 868            try:
 869                p_home = p.home.aoi_position.aoi_id
 870                if not p_home >= AOI_START_ID:
 871                    logging.warning(
 872                        f"Person {p.id} has no home AOI ID, use random home instead!"
 873                    )
 874                    p_home = rng.choice(self.aoi_type2ids["home"])
 875                p_work = p.work.aoi_position.aoi_id
 876                if not p_work >= AOI_START_ID:
 877                    logging.warning(
 878                        f"Person {p.id} has no work AOI ID, use random work instead!"
 879                    )
 880                    p_work = rng.choice(self.aoi_type2ids["other"])
 881                p_profile = pb2dict(p.profile)
 882                person_args.append(
 883                    [
 884                        p.id,
 885                        p_home,
 886                        self.aoi2region[p_home],  # possibly key error
 887                        p_work,
 888                        self.aoi2region[p_work],  # possibly key error
 889                        p_profile,
 890                        self.modes,
 891                        self.p_mode,
 892                        rng.integers(0, 2**16 - 1),
 893                    ]
 894                )
 895            except Exception as e:
 896                bad_person_indexes.add(idx)
 897                logging.warning(f"{e} when handling Person {p.id}, Skip!")
 898        to_process_persons = [
 899            p for idx, p in enumerate(orig_persons) if idx not in bad_person_indexes
 900        ]
 901        # return directly
 902        no_process_persons = [
 903            p for idx, p in enumerate(orig_persons) if idx in bad_person_indexes
 904        ]
 905        partial_args = (
 906            len(self.regions),
 907            self.projector,
 908            self.departure_prob,
 909            self.LEN_OD_TIMES,
 910        )
 911        filled_schedules = []
 912        _fill_person_schedule_unit_with_arg = partial(
 913            _fill_person_schedule_unit, partial_args
 914        )
 915        for i in range(0, len(person_args), MAX_BATCH_SIZE):
 916            person_args_batch = person_args[i : i + MAX_BATCH_SIZE]
 917            with Pool(processes=self.workers) as pool:
 918                filled_schedules += pool.map(
 919                    _fill_person_schedule_unit_with_arg,
 920                    person_args_batch,
 921                    chunksize=min(
 922                        ceil(len(person_args_batch) / self.workers), MAX_CHUNK_SIZE
 923                    ),
 924                )
 925        for (
 926            aoi_list,
 927            times,
 928            trip_modes,
 929            trip_models,
 930            activities,
 931        ), orig_p in zip(filled_schedules, to_process_persons):
 932            times = np.array(times) * 3600  # hour->second
 933            p = Person()
 934            p.CopyFrom(orig_p)
 935            p.ClearField("schedules")
 936            for time, aoi_id, trip_mode, activity, trip_model in zip(
 937                times, aoi_list[1:], trip_modes, activities, trip_models
 938            ):
 939                schedule = cast(Schedule, p.schedules.add())
 940                schedule.departure_time = time
 941                schedule.loop_count = 1
 942                trip = Trip(
 943                    mode=cast(
 944                        TripMode,
 945                        trip_mode,
 946                    ),
 947                    end=Position(aoi_position=AoiPosition(aoi_id=aoi_id)),
 948                    activity=activity,
 949                    model=trip_model,
 950                )
 951                schedule.trips.append(trip)
 952            self.persons.append(p)
 953        len_filled_persons, len_no_process_persons = len(to_process_persons), len(
 954            no_process_persons
 955        )
 956        logging.info(f"Filled schedules of {len_filled_persons} persons")
 957        if len_no_process_persons > 0:
 958            logging.warning(
 959                f"Unprocessed persons: {len_no_process_persons}, index range [{len_filled_persons}:] in returned results"
 960            )
 961        self.persons.extend(no_process_persons)
 962
 963    def fill_person_schedules(
 964        self,
 965        input_persons: List[Person],
 966        od_matrix: np.ndarray,
 967        areas: GeoDataFrame,
 968        available_trip_modes: List[str] = ["drive", "walk", "bus", "subway", "taxi"],
 969        departure_time_curve: Optional[list[float]] = None,
 970        seed: int = 0,
 971    ) -> List[Person]:
 972        """
 973        Generate person schedules.
 974
 975        Args:
 976        - input_persons (List[Person]): Input Person objects.
 977        - od_matrix (numpy.ndarray): The OD matrix.
 978        - areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string.
 979        - available_trip_modes (Optional[List[str]]): available trip modes for person schedules.
 980        - departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h.
 981        - seed (int): The random seed.
 982
 983        Returns:
 984        - List[Person]: The person objects with generated schedules.
 985        """
 986        # init
 987        self.area_shapes = []
 988        self.aoi2region = defaultdict(list)
 989        self.region2aoi = defaultdict(list)
 990        self.aoi_type2ids = defaultdict(list)
 991        self.persons = []
 992        # user input time curve
 993        if departure_time_curve is not None:
 994            assert len(departure_time_curve) >= 24
 995            sum_times = sum(departure_time_curve)
 996            self.departure_prob = np.array(
 997                [d / sum_times for d in departure_time_curve]
 998            )
 999        else:
1000            self.departure_prob = None
1001        self.od_matrix = od_matrix
1002        self.areas = areas
1003        self.available_trip_modes = available_trip_modes
1004        self._read_aois()
1005        self._read_regions()
1006        self._read_od_matrix()
1007        self._match_aoi2region()
1008        self._generate_schedules(input_persons, seed)
1009
1010        return self.persons
1011
1012    def generate_taxi_drivers(
1013        self,
1014        template_func: Optional[Callable[[], Person]] = None,
1015        parking_positions: Optional[List[Union[LanePosition, AoiPosition]]] = None,
1016        agent_num: Optional[int] = None,
1017        seed: int = 0,
1018    ) -> List[Person]:
1019        """
1020        Args:
1021        - template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. If not provided, the `temp_func` provided in `__init__` will be utilized.
1022        - parking_positions (Optional[List[Union[LanePosition,AoiPosition]]]): The parking positions of each taxi.
1023        - agent_num (Optional[int]): The taxi driver num.
1024        - seed (int): The random seed.
1025
1026        Returns:
1027        - List[Person]: The generated driver objects.
1028        """
1029        self.persons = []
1030        person_id = TAXI_START_ID
1031        _template = template_func if template_func is not None else self.template
1032        self._read_aois()
1033        if parking_positions is not None:
1034            logging.info(f"")
1035            _taxi_home_positions = parking_positions
1036        elif agent_num is not None:
1037            logging.info(f"")
1038            if not agent_num >= 1:
1039                logging.warning("agent_num should >=1")
1040                return []
1041            rng = np.random.default_rng(seed)
1042            has_driving_aoi_ids = [a["id"] for a in self.aois if a["has_driving_gates"]]
1043            _taxi_home_positions = [
1044                AoiPosition(aoi_id=_id)
1045                for _id in rng.choice(has_driving_aoi_ids, int(agent_num))
1046            ]
1047        else:
1048            logging.warning(
1049                "Either `agent_num` or `parking_positions` should be provided!"
1050            )
1051            return []
1052        for _pos in _taxi_home_positions:
1053            person_id, generated_drivers = gen_taxi_drivers(
1054                person_id,
1055                _template,
1056                _pos,
1057            )
1058            self.persons.extend(generated_drivers)
1059        return self.persons

generate trip from OD matrix.

TripGenerator( m: city.map.v2.map_pb2.Map, pop_tif_path: Optional[str] = None, activity_distributions: Optional[dict] = None, driving_speed: float = 8.333333333333334, parking_fee: float = 20.0, driving_penalty: float = 0.0, subway_speed: float = 9.722222222222221, subway_penalty: float = 600.0, subway_expense: float = 10.0, bus_speed: float = 4.166666666666667, bus_penalty: float = 600.0, bus_expense: float = 5.0, bike_speed: float = 2.7777777777777777, bike_penalty: float = 0.0, template_func: Callable[[], city.person.v2.person_pb2.Person] = <function default_person_template_generator>, add_pop: bool = False, multiprocessing_chunk_size: int = 500, workers: int = 4)
405    def __init__(
406        self,
407        m: Map,
408        pop_tif_path: Optional[str] = None,
409        activity_distributions: Optional[dict] = None,
410        driving_speed: float = 30 / 3.6,
411        parking_fee: float = 20.0,
412        driving_penalty: float = 0.0,
413        subway_speed: float = 35 / 3.6,
414        subway_penalty: float = 600.0,
415        subway_expense: float = 10.0,
416        bus_speed: float = 15 / 3.6,
417        bus_penalty: float = 600.0,
418        bus_expense: float = 5.0,
419        bike_speed: float = 10 / 3.6,
420        bike_penalty: float = 0.0,
421        template_func: Callable[[], Person] = default_person_template_generator,
422        add_pop: bool = False,
423        multiprocessing_chunk_size: int = 500,
424        workers: int = cpu_count(),
425    ):
426        """
427        Args:
428        - m (Map): The Map.
429        - pop_tif_path (str): path to population tif file.
430        - activity_distributions (dict): human mobility mode and its probability. e.g. {"HWH": 18.0, "HWH+": 82.0,}. H for go home, W for go to work, O or + for other activities
431        - driving_speed (float): vehicle speed(m/s) for traffic mode assignment.
432        - parking_fee (float): money cost(ï¿¥) of parking a car for traffic mode assignment.
433        - driving_penalty (float): extra cost(s) of vehicle for traffic mode assignment.
434        - subway_speed (float): subway speed(m/s) for traffic mode assignment.
435        - subway_penalty (float): extra cost(s) of subway for traffic mode assignment.
436        - subway_expense (float): money cost(ï¿¥) of subway for traffic mode assignment.
437        - bus_speed (float): bus speed(m/s) for traffic mode assignment.
438        - bus_penalty (float): extra cost(s) of bus for traffic mode assignment.
439        - bus_expense (float): money  cost(ï¿¥) of bus for traffic mode assignment.
440        - bike_speed (float): extra cost(s) of bike for traffic mode assignment.
441        - bike_penalty (float): money  cost(ï¿¥) of bike for traffic mode assignment.
442        - template_func (Callable[[],Person]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied.
443        - add_pop (bool): Add population to aois.
444        - multiprocessing_chunk_size (int): the maximum size of each multiprocessing chunk
445        - workers (int): number of workers.
446        """
447        global SUBWAY_EXPENSE, BUS_EXPENSE, DRIVING_SPEED, DRIVING_PENALTY, SUBWAY_SPEED, SUBWAY_PENALTY, BUS_SPEED, BUS_PENALTY, BIKE_SPEED, BIKE_PENALTY, PARKING_FEE
448        SUBWAY_EXPENSE, BUS_EXPENSE = subway_expense, bus_expense
449        DRIVING_SPEED, DRIVING_PENALTY, PARKING_FEE = (
450            driving_speed,
451            driving_penalty,
452            parking_fee,
453        )
454        SUBWAY_SPEED, SUBWAY_PENALTY = subway_speed, subway_penalty
455        BUS_SPEED, BUS_PENALTY = bus_speed, bus_penalty
456        BIKE_SPEED, BIKE_PENALTY = bike_speed, bike_penalty
457        self.m = m
458        self.pop_tif_path = pop_tif_path
459        self.add_pop = add_pop
460        self.projector = pyproj.Proj(m.header.projection)
461        self.workers = workers
462        self.template = template_func
463        self.persons = []
464        global MAX_CHUNK_SIZE
465        MAX_CHUNK_SIZE = multiprocessing_chunk_size
466        # activity proportion
467        if activity_distributions is not None:
468            ori_modes_stat = {
469                k: float(v)
470                for k, v in activity_distributions.items()
471                if k in HUMAN_MODE_STATS
472            }
473        else:
474            ori_modes_stat = HUMAN_MODE_STATS
475        self.modes = [mode for mode in ori_modes_stat.keys()]
476        self.p = np.array([prob for prob in ori_modes_stat.values()])
477        self.p_mode = self.p / sum(self.p)

Args:

  • m (Map): The Map.
  • pop_tif_path (str): path to population tif file.
  • activity_distributions (dict): human mobility mode and its probability. e.g. {"HWH": 18.0, "HWH+": 82.0,}. H for go home, W for go to work, O or + for other activities
  • driving_speed (float): vehicle speed(m/s) for traffic mode assignment.
  • parking_fee (float): money cost(ï¿¥) of parking a car for traffic mode assignment.
  • driving_penalty (float): extra cost(s) of vehicle for traffic mode assignment.
  • subway_speed (float): subway speed(m/s) for traffic mode assignment.
  • subway_penalty (float): extra cost(s) of subway for traffic mode assignment.
  • subway_expense (float): money cost(ï¿¥) of subway for traffic mode assignment.
  • bus_speed (float): bus speed(m/s) for traffic mode assignment.
  • bus_penalty (float): extra cost(s) of bus for traffic mode assignment.
  • bus_expense (float): money cost(ï¿¥) of bus for traffic mode assignment.
  • bike_speed (float): extra cost(s) of bike for traffic mode assignment.
  • bike_penalty (float): money cost(ï¿¥) of bike for traffic mode assignment.
  • template_func (Callable[[],Person]): The template function of generated person object, whose schedules, home will be replaced and others will be copied.
  • add_pop (bool): Add population to aois.
  • multiprocessing_chunk_size (int): the maximum size of each multiprocessing chunk
  • workers (int): number of workers.
m
pop_tif_path
add_pop
projector
workers
template
persons
modes
p
p_mode
def generate_persons( self, od_matrix: numpy.ndarray, areas: geopandas.geodataframe.GeoDataFrame, available_trip_modes: List[str] = ['drive', 'walk', 'bus', 'subway', 'taxi'], departure_time_curve: Optional[list[float]] = None, area_pops: Optional[list] = None, person_profiles: Optional[list[dict]] = None, seed: int = 0, agent_num: Optional[int] = None) -> List[city.person.v2.person_pb2.Person]:
725    def generate_persons(
726        self,
727        od_matrix: np.ndarray,
728        areas: GeoDataFrame,
729        available_trip_modes: List[str] = ["drive", "walk", "bus", "subway", "taxi"],
730        departure_time_curve: Optional[list[float]] = None,
731        area_pops: Optional[list] = None,
732        person_profiles: Optional[list[dict]] = None,
733        seed: int = 0,
734        agent_num: Optional[int] = None,
735    ) -> List[Person]:
736        """
737        Args:
738        - od_matrix (numpy.ndarray): The OD matrix.
739        - areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string.
740        - available_trip_modes (list[str]): available trip modes for person schedules.
741        - departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h.
742        - area_pops (list): list of populations in each area. If is not None, # of the persons departs from each home position is exactly equal to the given pop num.
743        - person_profiles (Optional[List[dict]]): list of profiles in dict format.
744        - seed (int): The random seed.
745        - agent_num (int): number of agents to generate.
746
747        Returns:
748        - List[Person]: The generated person objects.
749        """
750        # init
751        self.area_shapes = []
752        self.aoi2region = defaultdict(list)
753        self.region2aoi = defaultdict(list)
754        self.aoi_type2ids = defaultdict(list)
755        self.persons = []
756        # user input time curve
757        if departure_time_curve is not None:
758            assert len(departure_time_curve) >= 24
759            sum_times = sum(departure_time_curve)
760            self.departure_prob = np.array(
761                [d / sum_times for d in departure_time_curve]
762            )
763        else:
764            self.departure_prob = None
765        self.od_matrix = od_matrix
766        self.areas = areas
767        self.available_trip_modes = available_trip_modes
768        self._read_aois()
769        self._read_regions()
770        self._read_od_matrix()
771        self._match_aoi2region()
772        if agent_num is None:
773            agent_num = int(np.sum(self.od) / self.LEN_OD_TIMES)
774        if not agent_num >= 1:
775            logging.warning("agent_num should >=1")
776            return []
777        self._generate_mobi(agent_num, area_pops, person_profiles, seed)
778        return self.persons

Args:

  • od_matrix (numpy.ndarray): The OD matrix.
  • areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined crs string.
  • available_trip_modes (list[str]): available trip modes for person schedules.
  • departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h.
  • area_pops (list): list of populations in each area. If is not None, # of the persons departs from each home position is exactly equal to the given pop num.
  • person_profiles (Optional[List[dict]]): list of profiles in dict format.
  • seed (int): The random seed.
  • agent_num (int): number of agents to generate.

Returns:

  • List[Person]: The generated person objects.
def generate_public_transport_drivers( self, template_func: Optional[Callable[[], city.person.v2.person_pb2.Person]] = None, stop_duration_time: float = 30.0, seed: int = 0) -> List[city.person.v2.person_pb2.Person]:
811    def generate_public_transport_drivers(
812        self,
813        template_func: Optional[Callable[[], Person]] = None,
814        stop_duration_time: float = 30.0,
815        seed: int = 0,
816    ) -> List[Person]:
817        """
818        Args:
819        - template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. If not provided, the `temp_func` provided in `__init__` will be utilized.
820        - stop_duration_time (float): The duration time (in second) for bus at each stop.
821        - seed (int): The random seed.
822
823        Returns:
824        - List[Person]: The generated driver objects.
825        """
826        self.persons = []
827        road_aoi_id2d_pos = self._get_driving_pos_dict()
828        person_id = PT_START_ID
829        _template = template_func if template_func is not None else self.template
830        for sl in self.m.sublines:
831            departure_times = list(sl.schedules.departure_times)
832            if not sl.type in {mapv2.SUBLINE_TYPE_BUS, mapv2.SUBLINE_TYPE_SUBWAY}:
833                continue
834            person_id, generated_drivers = gen_bus_drivers(
835                person_id,
836                _template,
837                departure_times,
838                stop_duration_time,
839                road_aoi_id2d_pos,
840                sl,
841            )
842            self.persons.extend(generated_drivers)
843        return self.persons

Args:

  • template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose schedules, home will be replaced and others will be copied. If not provided, the temp_func provided in __init__ will be utilized.
  • stop_duration_time (float): The duration time (in second) for bus at each stop.
  • seed (int): The random seed.

Returns:

  • List[Person]: The generated driver objects.
def fill_person_schedules( self, input_persons: List[city.person.v2.person_pb2.Person], od_matrix: numpy.ndarray, areas: geopandas.geodataframe.GeoDataFrame, available_trip_modes: List[str] = ['drive', 'walk', 'bus', 'subway', 'taxi'], departure_time_curve: Optional[list[float]] = None, seed: int = 0) -> List[city.person.v2.person_pb2.Person]:
 963    def fill_person_schedules(
 964        self,
 965        input_persons: List[Person],
 966        od_matrix: np.ndarray,
 967        areas: GeoDataFrame,
 968        available_trip_modes: List[str] = ["drive", "walk", "bus", "subway", "taxi"],
 969        departure_time_curve: Optional[list[float]] = None,
 970        seed: int = 0,
 971    ) -> List[Person]:
 972        """
 973        Generate person schedules.
 974
 975        Args:
 976        - input_persons (List[Person]): Input Person objects.
 977        - od_matrix (numpy.ndarray): The OD matrix.
 978        - areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined `crs` string.
 979        - available_trip_modes (Optional[List[str]]): available trip modes for person schedules.
 980        - departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h.
 981        - seed (int): The random seed.
 982
 983        Returns:
 984        - List[Person]: The person objects with generated schedules.
 985        """
 986        # init
 987        self.area_shapes = []
 988        self.aoi2region = defaultdict(list)
 989        self.region2aoi = defaultdict(list)
 990        self.aoi_type2ids = defaultdict(list)
 991        self.persons = []
 992        # user input time curve
 993        if departure_time_curve is not None:
 994            assert len(departure_time_curve) >= 24
 995            sum_times = sum(departure_time_curve)
 996            self.departure_prob = np.array(
 997                [d / sum_times for d in departure_time_curve]
 998            )
 999        else:
1000            self.departure_prob = None
1001        self.od_matrix = od_matrix
1002        self.areas = areas
1003        self.available_trip_modes = available_trip_modes
1004        self._read_aois()
1005        self._read_regions()
1006        self._read_od_matrix()
1007        self._match_aoi2region()
1008        self._generate_schedules(input_persons, seed)
1009
1010        return self.persons

Generate person schedules.

Args:

  • input_persons (List[Person]): Input Person objects.
  • od_matrix (numpy.ndarray): The OD matrix.
  • areas (GeoDataFrame): The area data. Must contain a 'geometry' column with geometric information and a defined crs string.
  • available_trip_modes (Optional[List[str]]): available trip modes for person schedules.
  • departure_time_curve (Optional[List[float]]): The departure time of a day (24h). The resolution must >=1h.
  • seed (int): The random seed.

Returns:

  • List[Person]: The person objects with generated schedules.
def generate_taxi_drivers( self, template_func: Optional[Callable[[], city.person.v2.person_pb2.Person]] = None, parking_positions: Optional[List[Union[city.geo.v2.geo_pb2.LanePosition, city.geo.v2.geo_pb2.AoiPosition]]] = None, agent_num: Optional[int] = None, seed: int = 0) -> List[city.person.v2.person_pb2.Person]:
1012    def generate_taxi_drivers(
1013        self,
1014        template_func: Optional[Callable[[], Person]] = None,
1015        parking_positions: Optional[List[Union[LanePosition, AoiPosition]]] = None,
1016        agent_num: Optional[int] = None,
1017        seed: int = 0,
1018    ) -> List[Person]:
1019        """
1020        Args:
1021        - template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose `schedules`, `home` will be replaced and others will be copied. If not provided, the `temp_func` provided in `__init__` will be utilized.
1022        - parking_positions (Optional[List[Union[LanePosition,AoiPosition]]]): The parking positions of each taxi.
1023        - agent_num (Optional[int]): The taxi driver num.
1024        - seed (int): The random seed.
1025
1026        Returns:
1027        - List[Person]: The generated driver objects.
1028        """
1029        self.persons = []
1030        person_id = TAXI_START_ID
1031        _template = template_func if template_func is not None else self.template
1032        self._read_aois()
1033        if parking_positions is not None:
1034            logging.info(f"")
1035            _taxi_home_positions = parking_positions
1036        elif agent_num is not None:
1037            logging.info(f"")
1038            if not agent_num >= 1:
1039                logging.warning("agent_num should >=1")
1040                return []
1041            rng = np.random.default_rng(seed)
1042            has_driving_aoi_ids = [a["id"] for a in self.aois if a["has_driving_gates"]]
1043            _taxi_home_positions = [
1044                AoiPosition(aoi_id=_id)
1045                for _id in rng.choice(has_driving_aoi_ids, int(agent_num))
1046            ]
1047        else:
1048            logging.warning(
1049                "Either `agent_num` or `parking_positions` should be provided!"
1050            )
1051            return []
1052        for _pos in _taxi_home_positions:
1053            person_id, generated_drivers = gen_taxi_drivers(
1054                person_id,
1055                _template,
1056                _pos,
1057            )
1058            self.persons.extend(generated_drivers)
1059        return self.persons

Args:

  • template_func (Optional[Callable[[],Person]]): The template function of generated person object, whose schedules, home will be replaced and others will be copied. If not provided, the temp_func provided in __init__ will be utilized.
  • parking_positions (Optional[List[Union[LanePosition,AoiPosition]]]): The parking positions of each taxi.
  • agent_num (Optional[int]): The taxi driver num.
  • seed (int): The random seed.

Returns:

  • List[Person]: The generated driver objects.