Browse Source

apply u_turn logic to preprocess.py

master
김선중 1 year ago
parent
commit
24afd87a15
2 changed files with 189 additions and 1122 deletions
  1. +95
    -1091
      Analysis/0411_unp-left_p-right-uturn/0411_uturn.ipynb
  2. +94
    -31
      Scripts/preprocess_daily.py

+ 95
- 1091
Analysis/0411_unp-left_p-right-uturn/0411_uturn.ipynb
File diff suppressed because it is too large
View File


+ 94
- 31
Scripts/preprocess_daily.py View File

@ -428,60 +428,123 @@ class DailyPreprocessor():
: inter_no, phase_no, ring_type, move_no, inc_dire, out_dire, inc_angle, out_angle, inc_edge_id, out_edge_id, node_id : inter_no, phase_no, ring_type, move_no, inc_dire, out_dire, inc_angle, out_angle, inc_edge_id, out_edge_id, node_id
''' '''
self.u_turn = pd.merge(self.u_turn, self.u_condition, on='child_id')
# p2inc_edge2angle : node_id to inc_edge2angle
p2inc_edge2angle = dict()
# p2out_edge2angle : node_id to out_edge2angle
p2out_edge2angle = dict()
# p2inc_angle2edge : node_id to inc_angle2edge
p2inc_angle2edge = dict()
# p2out_angle2edge : node_id to out_angle2edge
p2out_angle2edge = dict()
for node_id in self.parent_ids:
m5 = self.match5[self.match5.node_id==node_id]
m5 = m5.dropna(subset=['inc_edge_id', 'out_edge_id'])
# inc_edge2angle : inc_edge_id to inc_angle
inc_edge2angle = dict(zip(m5.inc_edge_id, m5.inc_angle.astype(int)))
p2inc_edge2angle[node_id] = inc_edge2angle
# out_edge2angle : out_edge_id to out_angle
out_edge2angle = dict(zip(m5.out_edge_id, m5.out_angle.astype(int)))
p2out_edge2angle[node_id] = out_edge2angle
# inc_angle2edge : inc_angle to inc_edge_id
inc_angle2edge = dict(zip(m5.inc_angle.astype(int), m5.inc_edge_id))
p2inc_angle2edge[node_id] = inc_angle2edge
# out_angle2edge : out_angle to out_edge_id
out_angle2edge = dict(zip(m5.out_angle.astype(int), m5.out_edge_id))
p2out_angle2edge[node_id] = out_angle2edge
# 각 uturn node에 대하여 (inc_edge_id, out_edge_id) 부여 # 각 uturn node에 대하여 (inc_edge_id, out_edge_id) 부여
cmatches = [] cmatches = []
for _, row in self.uturn.iterrows():
child_id = row.child_id
for row in self.u_turn.itertuples():
parent_id = row.parent_id parent_id = row.parent_id
dire = row.direction
child_id = row.child_id
condition = row.condition condition = row.condition
inc_edge_id = row.inc_edge_id
out_edge_id = row.out_edge_id
inc_edge_id = row.from_edge_id
out_edge_id = row.to_edge_id
adj_inc_edge_id = row.adj_from_edge_id
adj_out_edge_id = row.adj_to_edge_id
# match5에서 부모노드id에 해당하는 행들을 가져옴 (cmatch) # match5에서 부모노드id에 해당하는 행들을 가져옴 (cmatch)
cmatch = self.match5.copy()[self.match5.node_id==parent_id] # match dataframe for a child node cmatch = self.match5.copy()[self.match5.node_id==parent_id] # match dataframe for a child node
cmatch = cmatch.sort_values(by=['phase_no', 'ring_type']).reset_index(drop=True) cmatch = cmatch.sort_values(by=['phase_no', 'ring_type']).reset_index(drop=True)
cmatch['node_id'] = child_id cmatch['node_id'] = child_id
cmatch[['inc_edge_id', 'out_edge_id']] = np.nan
# 보행신호시/좌회전시 진입/진출방향
ind = self.dires.index(dire)
inc_dire_pedes = self.dires[(ind + 2) % len(self.dires)]
out_dire_pedes = self.dires[(ind - 2) % len(self.dires)]
inc_dire_right = dire
out_dire_right = self.dires[(ind + 2) % len(self.dires)]
# 진입엣지 각도
inc_angle = p2inc_edge2angle[parent_id][adj_inc_edge_id]
# 이격각도
self.angle_separation = 10
# 진입로 각도 목록
inc_angles = cmatch.dropna(subset=['inc_angle', 'out_angle']).inc_angle.astype(int).unique()
inc_angles = np.sort(inc_angles)
inc_angles = list(inc_angles - 360) + list(inc_angles) + list(inc_angles + 360)
inc_angles = np.array(inc_angles)
# 보행신호시의 진입로 각도
inc_angles_left = inc_angles[inc_angles >= inc_angle + self.angle_separation]
inc_angle_pedes = np.sort(inc_angles_left)[0] % 360
# 보행신호시의 진입로 엣지id
inc_angle2edge = p2inc_angle2edge[parent_id]
inc_edge_id_pedes = inc_angle2edge[inc_angle_pedes]
# 보행신호시/좌회전시 조건
pedes_exists = (cmatch.inc_dire==inc_dire_pedes) & (cmatch.out_dire==out_dire_pedes)
right_exists = (cmatch.inc_dire==inc_dire_right) & (cmatch.out_dire==out_dire_right)
# 진출로 각도 목록
out_angles = cmatch.dropna(subset=['inc_angle', 'out_angle']).out_angle.astype(int).unique()
out_angles = np.sort(out_angles)
out_angles = list(out_angles - 360) + list(out_angles) + list(out_angles + 360)
out_angles = np.array(out_angles)
# 보행신호시의 진입로 각도
out_angles_right = out_angles[out_angles <= inc_angle - self.angle_separation]
out_angle_pedes = np.sort(out_angles_right)[-1] % 360
# 보행신호시의 진입로 엣지id
out_angle2edge = p2out_angle2edge[parent_id]
out_edge_id_pedes = out_angle2edge[out_angle_pedes]
# 진입엣지/진출엣지 포함 조건
inc_true = (cmatch.inc_edge_id==adj_inc_edge_id)
out_true = (cmatch.out_edge_id==adj_out_edge_id)
# 보행신호시 조건
pedes_flag = (cmatch.inc_edge_id==inc_edge_id_pedes) & (cmatch.out_edge_id==out_edge_id_pedes)
# 좌회전시 조건
right_flag = inc_true & (cmatch.turn_type=='left')
# 보행신호이동류(17) 조건
crosswalk_on = (cmatch.move_no==17) & ~ out_true
# 신호없음이동류(18) 조건
all_redsigns = (cmatch.move_no==18) & ~ out_true
# 보행신호시/좌회전시 진입/진출 엣지id 배정 # 보행신호시/좌회전시 진입/진출 엣지id 배정
ind = self.dires.index(dire)
cmatch[['inc_edge_id', 'out_edge_id']] = np.nan
if condition == "보행신호시": if condition == "보행신호시":
cmatch.loc[pedes_exists, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
cmatch.loc[pedes_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
elif condition == "좌회전시": elif condition == "좌회전시":
cmatch.loc[right_exists, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 신호없음이동류발생시/보행신호이동류발생시 조건
all_redsigns = cmatch.move_no == 18
crosswalk_on = cmatch.move_no == 17
cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 만약 어떤 유턴신호도 배정되지 않았다면
# 좌회전시 → 보행신호시 → 보행신호이동류발생시 → 신호없음이동류발생시 순으로 진입/진출 엣지id 배정
uturn_not_assigned = cmatch[['inc_edge_id','out_edge_id']].isna().any(axis=1).all() uturn_not_assigned = cmatch[['inc_edge_id','out_edge_id']].isna().any(axis=1).all()
if uturn_not_assigned: if uturn_not_assigned:
# 좌회전시
if right_exists.any():
cmatch.loc[right_exists, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 보행신호시 # 보행신호시
elif pedes_exists.any():
cmatch.loc[pedes_exists, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
if pedes_flag.any():
cmatch.loc[pedes_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 좌회전시
elif right_flag.any():
cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 보행신호이동류(17) 발생시 # 보행신호이동류(17) 발생시
elif crosswalk_on.any(): elif crosswalk_on.any():
cmatch.loc[crosswalk_on & (cmatch.out_dire!=dire), ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
cmatch.loc[crosswalk_on, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 신호없음이동류(18) 발생시 # 신호없음이동류(18) 발생시
elif all_redsigns.any(): elif all_redsigns.any():
cmatch.loc[all_redsigns & (cmatch.out_dire!=dire), ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
cmatch.loc[all_redsigns, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 진출엣지 미포함시
elif out_true.any():
cmatch.loc[~ out_true, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
cmatches.append(cmatch) cmatches.append(cmatch)
# 각 연등교차로(coordination node)에 대하여 (inc_edge_id, out_edge_id) 부여 # 각 연등교차로(coordination node)에 대하여 (inc_edge_id, out_edge_id) 부여

Loading…
Cancel
Save