mosstool.trip.route

1from .client import RoutingClient
2from .preroute import pre_route
3
4__all__ = [
5    "RoutingClient",
6    "pre_route",
7]
class RoutingClient:
39class RoutingClient:
40    """
41    Client side of Routing service
42    """
43
44    def __init__(self, server_address: str, secure: bool = False):
45        """
46        Constructor of RoutingClient
47
48        Args:
49        - server_address (str): Routing server address
50        - secure (bool, optional): Defaults to False. Whether to use a secure connection. Defaults to False.
51        """
52        aio_channel = _create_aio_channel(server_address, secure)
53        self._aio_stub = routing_grpc.RoutingServiceStub(aio_channel)
54
55    async def GetRoute(
56        self,
57        req: GetRouteRequest,
58    ) -> GetRouteResponse:
59        """
60        Request navigation
61
62        Args:
63        - req (routing_service.GetRouteRequest): https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteRequest
64
65        Returns:
66        - https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteResponse
67        """
68        res = cast(Awaitable[GetRouteResponse], self._aio_stub.GetRoute(req))
69        return await res

Client side of Routing service

RoutingClient(server_address: str, secure: bool = False)
44    def __init__(self, server_address: str, secure: bool = False):
45        """
46        Constructor of RoutingClient
47
48        Args:
49        - server_address (str): Routing server address
50        - secure (bool, optional): Defaults to False. Whether to use a secure connection. Defaults to False.
51        """
52        aio_channel = _create_aio_channel(server_address, secure)
53        self._aio_stub = routing_grpc.RoutingServiceStub(aio_channel)

Constructor of RoutingClient

Args:

  • server_address (str): Routing server address
  • secure (bool, optional): Defaults to False. Whether to use a secure connection. Defaults to False.
async def GetRoute( self, req: city.routing.v2.routing_service_pb2.GetRouteRequest) -> city.routing.v2.routing_service_pb2.GetRouteResponse:
55    async def GetRoute(
56        self,
57        req: GetRouteRequest,
58    ) -> GetRouteResponse:
59        """
60        Request navigation
61
62        Args:
63        - req (routing_service.GetRouteRequest): https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteRequest
64
65        Returns:
66        - https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteResponse
67        """
68        res = cast(Awaitable[GetRouteResponse], self._aio_stub.GetRoute(req))
69        return await res
async def pre_route( client: RoutingClient, person: city.person.v2.person_pb2.Person, in_place: bool = False) -> city.person.v2.person_pb2.Person:
 25async def pre_route(
 26    client: RoutingClient,
 27    person: Person,
 28    in_place: bool = False,
 29) -> Person:
 30    """
 31    Fill in the route of the person's all schedules.
 32    The function will REMOVE all schedules that can not be routed.
 33
 34    Args:
 35    - client (RoutingClient): routing service client
 36    - person (Person): person object
 37    - in_place (bool, optional): whether to modify the person object in place. Defaults to False.
 38
 39    Returns:
 40    - None
 41    """
 42    if not in_place:
 43        p = Person()
 44        p.CopyFrom(person)  # type:ignore
 45        person = p  # type:ignore
 46    start = person.home
 47    departure_time = None
 48    all_schedules = list(person.schedules)
 49    person.ClearField("schedules")
 50    for schedule in all_schedules:
 51        schedule = cast(Schedule, schedule)
 52        if schedule.HasField("departure_time"):
 53            departure_time = schedule.departure_time
 54        if schedule.loop_count != 1:
 55            # Schedule is not a one-time trip, departure time is not accurate, no pre-calculation is performed
 56            logging.warning(
 57                "Schedule is not a one-time trip, departure time is not accurate, no pre-calculation is performed"
 58            )
 59            start = schedule.trips[-1].end
 60            continue
 61        good_trips = []
 62        for trip in schedule.trips:
 63            last_departure_time = departure_time
 64            # Cover departure time
 65            if trip.HasField("departure_time"):
 66                departure_time = trip.departure_time
 67            if departure_time is None:
 68                # No explicit departure time, no pre-calculation is performed
 69                logging.warning(
 70                    "No explicit departure time, no pre-calculation is performed"
 71                )
 72                # append directly
 73                good_trips.append(trip)
 74                # update start position
 75                start = trip.end
 76                # Set departure time invalid
 77                departure_time = None
 78                continue
 79            if start == trip.end:
 80                continue
 81            # build request
 82            res = await client.GetRoute(
 83                GetRouteRequest(
 84                    type=_TYPE_MAP[trip.mode],
 85                    start=start,
 86                    end=trip.end,
 87                    time=departure_time,
 88                )
 89            )
 90            if res is None or len(res.journeys) == 0:
 91                logging.warning("No route found")
 92                departure_time = last_departure_time
 93            else:
 94                # append directly
 95                good_trips.append(trip)
 96                trip.ClearField("routes")
 97                trip.routes.MergeFrom(res.journeys)
 98                # update start position
 99                start = trip.end
100                # Set departure time invalid
101                departure_time = None
102        if len(good_trips) > 0:
103            good_schedule = cast(Schedule, person.schedules.add())
104            good_schedule.CopyFrom(schedule)
105            good_schedule.ClearField("trips")
106            good_schedule.trips.extend(good_trips)
107    return person

Fill in the route of the person's all schedules. The function will REMOVE all schedules that can not be routed.

Args:

  • client (RoutingClient): routing service client
  • person (Person): person object
  • in_place (bool, optional): whether to modify the person object in place. Defaults to False.

Returns:

  • None