Source code for pybtls.traffic.traffic_loader
"""
The methods and classes that are not defined in Python are defined in C++ py_main.cpp.
"""
from ..lib.BTLS import (
_TrafficLoader,
_VehicleTrafficFile,
Vehicle,
_Vehicle,
_VehClassPattern,
_VehClassAxle,
)
from typing import Literal, Union
from pathlib import Path
__all__ = ["TrafficLoader"]
[docs]
class TrafficLoader:
def __init__(self, no_lane: int):
"""
The TrafficLoader instance stores the information for creating CTrafficLoader instances for each lane.
Parameters
----------
no_lane : int\n
The number of lanes on the bridge.
"""
self._tag = "HistoryTraffic"
self._no_lane = no_lane
self._sim_day = None
self._no_dir = None
self._no_lane_dir_1 = None
self._no_lane_dir_2 = None
self._vehicle_classifier = 1
self._lanes_vehicles = [[] for _ in range(no_lane)]
def __getstate__(self):
attribute_dict = {}
attribute_dict["tag"] = self._tag
attribute_dict["no_lane"] = self._no_lane
attribute_dict["sim_day"] = self._sim_day
attribute_dict["no_dir"] = self._no_dir
attribute_dict["no_lane_dir_1"] = self._no_lane_dir_1
attribute_dict["no_lane_dir_2"] = self._no_lane_dir_2
attribute_dict["vehicle_classifier"] = self._vehicle_classifier
attribute_dict["lanes_vehicles"] = self._lanes_vehicles
return attribute_dict
def __setstate__(self, attribute_dict):
self._tag = attribute_dict["tag"]
self._no_lane = attribute_dict["no_lane"]
self._sim_day = attribute_dict["sim_day"]
self._no_dir = attribute_dict["no_dir"]
self._no_lane_dir_1 = attribute_dict["no_lane_dir_1"]
self._no_lane_dir_2 = attribute_dict["no_lane_dir_2"]
self._vehicle_classifier = attribute_dict["vehicle_classifier"]
self._lanes_vehicles = attribute_dict["lanes_vehicles"]
[docs]
def add_traffic(
self,
traffic: Union[Path, list[_Vehicle]],
traffic_format: Literal[1, 2, 3, 4] = None,
use_average_speed: bool = False,
use_const_speed: bool = False,
const_speed_value: float = 0.0,
**kwargs,
) -> None:
"""
Add a recorded traffic from either a .txt file or a vehicle list.
Parameters
----------
traffic: Union[Path,list[_Vehicle]]\n
The path to the traffic file,
or a list of vehicles.
traffic_format : Literal[1, 2, 3, 4], optional\n
The format of the .txt traffic file.\n
1: CASTOR format.\n
2: BEDIT format.\n
3: DITIS format.\n
4: MON format.
use_average_speed : bool, optional\n
Whether to use the average speed of the vehicle. \n
The default is False.
use_const_speed : bool, optional\n
Whether to use a constant speed for all vehicles. \n
The default is False.
const_speed_value : float, optional\n
The constant speed value (in km/h). This will be an essential input if use_const_speed is True. \n
The default is 0.0.
Keyword Arguments
-----------------
classifier_type : str, optional\n
The vehicle classifier type. The default is "pattern".
Returns
-------
None.
"""
if use_average_speed and use_const_speed:
raise ValueError(
"use_average_speed and use_const_speed cannot both be True."
)
if use_const_speed and const_speed_value <= 0.0:
raise ValueError(
"const_speed_value should be positive non-zero if use_const_speed is True."
)
if kwargs.get("classifier_type") == "axle":
vehicle_classifier = _VehClassAxle()
self._vehicle_classifier = 0
else:
vehicle_classifier = _VehClassPattern()
self._vehicle_classifier = 1
traffic_data = _VehicleTrafficFile(
vehicle_classifier, use_const_speed, use_average_speed, const_speed_value
)
if isinstance(traffic, (Path, str)):
if traffic_format is None:
raise ValueError("Argument traffic_format is not specified.")
traffic = Path(traffic) if not isinstance(traffic, Path) else traffic
traffic_data.read(
traffic, traffic_format
) # The simulation requires traffic information before _get_traffic_loader is called.
elif isinstance(traffic, list):
if all(isinstance(vehicle, (Vehicle, _Vehicle)) for vehicle in traffic):
traffic_data.assignTraffic(traffic)
else:
raise ValueError("Existing non-Vehicle object in the traffic.")
else:
raise ValueError("Invalid traffic data for traffic loader.")
if self._no_lane != traffic_data.getNoLanes():
raise RuntimeError(
f"Number of lanes included in traffic file is not equal to {self._no_lane}."
)
self._sim_day = traffic_data.getNoDays()
self._no_dir = traffic_data.getNoDirn()
self._no_lane_dir_1 = traffic_data.getNoLanesDir1()
self._no_lane_dir_2 = traffic_data.getNoLanesDir2()
self._check_traffic()
for _ in range(traffic_data.getNoVehicles()):
temp_vehicle = traffic_data.getNextVehicle()
self._lanes_vehicles[temp_vehicle._getGlobalLane(self._no_lane) - 1].append(
temp_vehicle
)
def _get_traffic_loader(self) -> list[_TrafficLoader]:
"""
Get a list of CTrafficLoader instances for each lane.
"""
loader_list = [None] * self._no_lane
for i in range(self._no_lane):
traffic_loader = _TrafficLoader()
if i < self._no_lane_dir_1:
lane_dir = 1
else:
lane_dir = 2
traffic_loader.setLaneData(lane_dir, i)
for vehicle in self._lanes_vehicles[i]:
traffic_loader.addVehicle(vehicle)
if traffic_loader.getNoVehicles() > 0:
traffic_loader.setFirstArrivalTime()
else:
raise Warning(f"No vehicle in lane {i+1}.")
loader_list[i] = traffic_loader
return loader_list
@property
def sim_day(self) -> int:
return self._sim_day
@property
def tag(self) -> str:
return self._tag
@property
def no_lane(self) -> int:
return self._no_lane
@property
def no_dir(self) -> int:
return self._no_dir
@property
def no_lane_dir_1(self) -> int:
return self._no_lane_dir_1
@property
def no_lane_dir_2(self) -> int:
return self._no_lane_dir_2
@property
def vehicle_classifier(self) -> int:
return self._vehicle_classifier
def _check_traffic(self) -> None:
if self._sim_day == 0:
raise ValueError("No traffic in vehicle file.")
if self._no_lane == 0:
raise ValueError("No lanes in vehicle file.")
if self._no_dir == 0:
raise ValueError("No directions in vehicle file.")
if self._no_dir == 2 and self._no_lane == 1:
raise ValueError(
"Two directions traffic found but only one lane specified."
)