mosstool.trip.route
class
RoutingClient:
39class RoutingClient: 40 """ 41 Client side of Routing service 42 """ 43 44 def __init__(self, server_address: str, secure: bool = False): 45 """ 46 Constructor of RoutingClient 47 48 Args: 49 - server_address (str): Routing server address 50 - secure (bool, optional): Defaults to False. Whether to use a secure connection. Defaults to False. 51 """ 52 aio_channel = _create_aio_channel(server_address, secure) 53 self._aio_stub = routing_grpc.RoutingServiceStub(aio_channel) 54 55 async def GetRoute( 56 self, 57 req: GetRouteRequest, 58 ) -> GetRouteResponse: 59 """ 60 Request navigation 61 62 Args: 63 - req (routing_service.GetRouteRequest): https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteRequest 64 65 Returns: 66 - https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteResponse 67 """ 68 res = cast(Awaitable[GetRouteResponse], self._aio_stub.GetRoute(req)) 69 return await res
Client side of Routing service
RoutingClient(server_address: str, secure: bool = False)
44 def __init__(self, server_address: str, secure: bool = False): 45 """ 46 Constructor of RoutingClient 47 48 Args: 49 - server_address (str): Routing server address 50 - secure (bool, optional): Defaults to False. Whether to use a secure connection. Defaults to False. 51 """ 52 aio_channel = _create_aio_channel(server_address, secure) 53 self._aio_stub = routing_grpc.RoutingServiceStub(aio_channel)
Constructor of RoutingClient
Args:
- server_address (str): Routing server address
- secure (bool, optional): Defaults to False. Whether to use a secure connection. Defaults to False.
async def
GetRoute( self, req: city.routing.v2.routing_service_pb2.GetRouteRequest) -> city.routing.v2.routing_service_pb2.GetRouteResponse:
55 async def GetRoute( 56 self, 57 req: GetRouteRequest, 58 ) -> GetRouteResponse: 59 """ 60 Request navigation 61 62 Args: 63 - req (routing_service.GetRouteRequest): https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteRequest 64 65 Returns: 66 - https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteResponse 67 """ 68 res = cast(Awaitable[GetRouteResponse], self._aio_stub.GetRoute(req)) 69 return await res
Request navigation
Args:
- req (routing_service.GetRouteRequest): https://cityproto.sim.fiblab.net/#city.routing.v2.GetRouteRequest
Returns:
async def
pre_route( client: RoutingClient, person: city.person.v2.person_pb2.Person, in_place: bool = False) -> city.person.v2.person_pb2.Person:
25async def pre_route( 26 client: RoutingClient, 27 person: Person, 28 in_place: bool = False, 29) -> Person: 30 """ 31 Fill in the route of the person's all schedules. 32 The function will REMOVE all schedules that can not be routed. 33 34 Args: 35 - client (RoutingClient): routing service client 36 - person (Person): person object 37 - in_place (bool, optional): whether to modify the person object in place. Defaults to False. 38 39 Returns: 40 - None 41 """ 42 if not in_place: 43 p = Person() 44 p.CopyFrom(person) # type:ignore 45 person = p # type:ignore 46 start = person.home 47 departure_time = None 48 all_schedules = list(person.schedules) 49 person.ClearField("schedules") 50 for schedule in all_schedules: 51 schedule = cast(Schedule, schedule) 52 if schedule.HasField("departure_time"): 53 departure_time = schedule.departure_time 54 if schedule.loop_count != 1: 55 # Schedule is not a one-time trip, departure time is not accurate, no pre-calculation is performed 56 logging.warning( 57 "Schedule is not a one-time trip, departure time is not accurate, no pre-calculation is performed" 58 ) 59 start = schedule.trips[-1].end 60 continue 61 good_trips = [] 62 for trip in schedule.trips: 63 last_departure_time = departure_time 64 # Cover departure time 65 if trip.HasField("departure_time"): 66 departure_time = trip.departure_time 67 if departure_time is None: 68 # No explicit departure time, no pre-calculation is performed 69 logging.warning( 70 "No explicit departure time, no pre-calculation is performed" 71 ) 72 # append directly 73 good_trips.append(trip) 74 # update start position 75 start = trip.end 76 # Set departure time invalid 77 departure_time = None 78 continue 79 if start == trip.end: 80 continue 81 # build request 82 res = await client.GetRoute( 83 GetRouteRequest( 84 type=_TYPE_MAP[trip.mode], 85 start=start, 86 end=trip.end, 87 time=departure_time, 88 ) 89 ) 90 if res is None or len(res.journeys) == 0: 91 logging.warning("No route found") 92 departure_time = last_departure_time 93 else: 94 # append directly 95 good_trips.append(trip) 96 trip.ClearField("routes") 97 trip.routes.MergeFrom(res.journeys) 98 # update start position 99 start = trip.end 100 # Set departure time invalid 101 departure_time = None 102 if len(good_trips) > 0: 103 good_schedule = cast(Schedule, person.schedules.add()) 104 good_schedule.CopyFrom(schedule) 105 good_schedule.ClearField("trips") 106 good_schedule.trips.extend(good_trips) 107 return person
Fill in the route of the person's all schedules. The function will REMOVE all schedules that can not be routed.
Args:
- client (RoutingClient): routing service client
- person (Person): person object
- in_place (bool, optional): whether to modify the person object in place. Defaults to False.
Returns:
- None