In [1]:
import os
import pandas as pd
import numpy as np
import sys
sys.path.append('../../Scripts')
from preprocess_daily import DailyPreprocessor
from generate_signals import SignalGenerator

비보호좌회전, 신호우회전, 유턴

줄임말 목록
- `i` : 교차로번호, `inter_no`
- `f` : 진입, from, `inc_edge_id`
- `t` : 진출, to, `out_edge_id`
- `vec` : 방향벡터, unit vector (`np.array([0.6, 0.8])`)
- `dire` : 방위, direction (동, 서, 남, 북, 북동, 북서, 남동, 남서)
- `rvec` : 정방향 방향벡터, unit vector to the right direction (`np.array([0,1])`)

필요한 객체들 목록

- `inter2dire2rvec` : `inter_no` $\mapsto$ `dire2rvec`
  - `dire2rvec` : `dire` $\mapsto$ `rvec`
- `inter2incs` : `inter_no` $\mapsto$ `inc_edge_ids`
- `inter2outs` : `inter_no` $\mapsto$ `out_edge_ids`
- `inter2inc2dire` : `inter_no` $\mapsto$ `int2dire`
  - `inc2dire` : `out_edge_id` $\mapsto$ `dire`
- `inter2out2dire` : `inter_no` $\mapsto$ `out2dire`
  - `out2dire` : `inc_edge_id` $\mapsto$ `dire`
- `inter2inc2vec` : `inter_no` $\mapsto$ `int2vec`
  - `inc2vec` : `out_edge_id` $\mapsto$ `vec`
- `inter2out2vec` : `inter_no` $\mapsto$ `out2vec`
  - `out2vec` : `inc_edge_id` $\mapsto$ `vec`

좌회전 판단

Given `inter_no`, `inc_edge_id` and `out_edge_id`, we have `inc_vec = inter2inc2vec[inter_no][inc_edge_id]` and `out_vec = inter2out2vec[inter_no][out_edge_id]`.
Rotate `inc_vec` by 90, 180 and 270 degrees clockwise, to define 
`out_vec_left`, `out_vec_straight` and `out_vec_right`.
Define `out_vecs={'right':out_vec_left, 'straight':out_vec_straight, 'right':out_vec_right}`.
Select the key that maximize the similarity of the corresponding value of the key and `inc_vec`.

In [2]:
self = DailyPreprocessor()
self.load_data()
self.make_match1()
self.make_match2()
self.make_match3()
self.make_match4()
self.make_match5()
display(self.uturn)
display(self.u_turn)
self.u_turn = pd.merge(self.u_turn, self.u_condition, on='child_id')
self.u_turn

1. 데이터를 로드합니다.
1-1. 네트워크가 로드되었습니다.
1-2. 테이블들이 로드되었습니다.
1-3. 네트워크의 모든 clean state requirement들을 체크했습니다.
1-4. 테이블들의 무결성 검사를 완료했습니다.
1-5. 주요 객체 (리스트, 딕셔너리)들을 저장했습니다.


Unnamed: 0,parent_id,child_id,direction,condition,inc_edge_id,out_edge_id
0,i0,u00,북,좌회전시,571500487_02,571500487_01.32
1,i2,u20,북,보행신호시,571542810_01.51,571542810_02
2,i3,u30,북,보행신호시,571556452_01,571556452_02
3,i3,u31,동,보행신호시,571500475_02,571500475_01.26
4,i3,u32,서,보행신호시,571540303_02,-571540303_02
5,i6,u60,서,좌회전시,571500535_02,-571500535_02


Unnamed: 0,parent_id,child_id,head_edge_id,from_edge_id,to_edge_id
0,i0,u00,-571500487_01,571500487_02,571500487_01.32
1,i2,u20,571542811_02,571542810_01.51,571542810_02
2,i3,u30,571556450_02,571556452_01,571556452_02
3,i3,u31,-571500475_01,571500475_02,571500475_01.26
4,i3,u32,571540303_02.21,571540303_02,-571540303_02
5,i6,u60,571500535_02.18,571500535_02,-571500535_02


Unnamed: 0,parent_id,child_id,head_edge_id,from_edge_id,to_edge_id,condition
0,i0,u00,-571500487_01,571500487_02,571500487_01.32,좌회전시
1,i2,u20,571542811_02,571542810_01.51,571542810_02,보행신호시
2,i3,u30,571556450_02,571556452_01,571556452_02,보행신호시
3,i3,u31,-571500475_01,571500475_02,571500475_01.26,보행신호시
4,i3,u32,571540303_02.21,571540303_02,-571540303_02,보행신호시
5,i6,u60,571500535_02.18,571500535_02,-571500535_02,좌회전시


In [85]:
# n2inc_edge2angle : node_id to inc_edge2angle
n2inc_edge2angle = dict()
# n2out_edge2angle : node_id to out_edge2angle
n2out_edge2angle = dict()
# n2inc_angle2edge : node_id to inc_angle2edge
n2inc_angle2edge = dict()
# n2out_angle2edge : node_id to out_angle2edge
n2out_angle2edge = dict()
for node_id in self.parent_ids:
    m5 = self.match5[self.match5.node_id==node_id]
    m5 = m5.dropna(subset=['inc_edge_id', 'out_edge_id'])
    # inc_edge2angle : inc_edge_id to inc_angle
    inc_edge2angle = dict(zip(m5.inc_edge_id, m5.inc_angle.astype(int)))
    n2inc_edge2angle[node_id] = inc_edge2angle
    # out_edge2angle : out_edge_id to out_angle
    out_edge2angle = dict(zip(m5.out_edge_id, m5.out_angle.astype(int)))
    n2out_edge2angle[node_id] = out_edge2angle
    # inc_angle2edge : inc_angle to inc_edge_id
    inc_angle2edge = dict(zip(m5.inc_angle.astype(int), m5.inc_edge_id))
    n2inc_angle2edge[node_id] = inc_angle2edge
    # out_angle2edge : out_angle to out_edge_id
    out_angle2edge = dict(zip(m5.out_angle.astype(int), m5.out_edge_id))
    n2out_angle2edge[node_id] = out_angle2edge

In [173]:
i = 1
row = self.u_turn.iloc[i]

parent_id = row.parent_id
child_id = row.child_id
condition = row.condition
inc_edge_id = row.from_edge_id
out_edge_id = row.to_edge_id
head_edge_id = row.head_edge_id
row

parent_id                    i2
child_id                    u20
head_edge_id       571542811_02
from_edge_id    571542810_01.51
to_edge_id         571542810_02
condition                 보행신호시
Name: 1, dtype: object

In [174]:
# match5에서 부모노드id에 해당하는 행들을 가져옴 (cmatch)
cmatch = self.match5.copy()[self.match5.node_id==parent_id] # match dataframe for a child node
cmatch = cmatch.dropna(subset=['inc_angle', 'out_angle'])
cmatch = cmatch.sort_values(by=['phase_no', 'ring_type']).reset_index(drop=True)
cmatch['node_id'] = child_id
print(parent_id, child_id, condition)
display(cmatch)

i2 u20 보행신호시


Unnamed: 0,inter_no,phase_no,ring_type,move_no,inc_dire,out_dire,inc_angle,out_angle,inc_edge_id,out_edge_id,node_id,turn_type
0,177,1,A,8,남,북,179,0,-571542809_01,571542811_01,u20,left
1,177,1,B,4,북,남,0,180,571542811_02,571542809_01,u20,straight
2,177,2,A,7,북,동,0,90,571542811_02,571542107_01,u20,left
3,177,2,B,3,남,서,179,270,-571542809_01,571542809_01,u20,left
4,177,4,A,5,서,북,268,0,-571542809_01,571542811_01,u20,left
5,177,4,B,1,동,남,90,180,571542107_02,571542809_01,u20,left


In [184]:
self.turn_type[self.turn_type.node_id=='i2']

Unnamed: 0,node_id,inc_edge_id,out_edge_id,turn_type
13,i2,-571542809_01,571542811_01,straight
14,i2,571542811_02,571542809_01,straight
15,i2,571542811_02,571542107_01,left
16,i2,-571542809_01,571542809_01,left
17,i2,-571542809_01,571542811_01,left
18,i2,571542107_02,571542809_01,left


In [181]:
# dictionary that maps node_id to io2turn
n2io2turn = dict()
for node_id in self.parent_ids:
    print(node_id)
    turn = self.turn_type[self.turn_type.node_id==node_id]
    io = list(zip(turn.inc_edge_id, turn.out_edge_id))
    # dictionary that maps (inc_edge_id, out_edge_id) to turn_type
    io2turn = dict(zip(io, turn.turn_type))
    n2io2turn[node_id] = io2turn
    print(io2turn)

for i, row in self.match5.iterrows():
    node_id = row.node_id
    inc_edge_id = row.inc_edge_id
    out_edge_id = row.out_edge_id
    if not (pd.isna(inc_edge_id) and pd.isna(out_edge_id)):
        turn_type = n2io2turn[node_id][(inc_edge_id, out_edge_id)]
        self.match5.at[i, 'turn_type'] = turn_type


i0
{('-571542797_02', '571500487_01'): 'straight', ('-571500487_01', '571542797_02'): 'straight', ('-571500487_01', '571545870_01'): 'left', ('-571542797_02', '571510153_01'): 'left', ('571545870_02', '571510153_01'): 'straight', ('571545870_02', '571542797_02'): 'left', ('571510153_02', '571500487_01'): 'left', ('571510153_02', '571545870_01'): 'straight'}
i1
{('-571542810_01', '-571542797_02.99'): 'straight', ('571542797_02.99', '571542810_01'): 'straight', ('-571542810_01', '571543469_01'): 'left', ('571543469_02', '-571542797_02.99'): 'left'}
i2
{('-571542809_01', '571542811_01'): 'left', ('571542811_02', '571542809_01'): 'straight', ('571542811_02', '571542107_01'): 'left', ('-571542809_01', '571542809_01'): 'left', ('571542107_02', '571542809_01'): 'left'}
i3
{('571540304_02', '571556450_01'): 'straight', ('571556450_02', '571540304_01'): 'straight', ('571556450_02', '571500475_01'): 'left', ('571540304_02', '571540303_01'): 'left', ('571540303_02.21', '571556450_01'): 'left', ('

In [None]:

# 헤드엣지가 포함된 행
head_true = cmatch.inc_edge_id==head_edge_id

# 헤드엣지 각도
inc_angle_head = n2inc_edge2angle[parent_id][head_edge_id]
print(inc_angle_head)

# 이격각도
self.angle_separation = 10

# 진입로 각도 목록
inc_angles = cmatch.inc_angle.astype(int).unique()
inc_angles = np.sort(inc_angles)
inc_angles = list(inc_angles - 360) + list(inc_angles) + list(inc_angles + 360)
inc_angles = np.array(inc_angles)
print(inc_angles)

# 보행신호시의 진입로 각도
inc_angles_left = inc_angles[inc_angles >= inc_angle_head + self.angle_separation]
inc_angle_pedes = np.sort(inc_angles_left)[0] % 360
print(inc_angles_left)
print(inc_angle_pedes)

# 보행신호시의 진입로 엣지id
inc_angle2edge = n2inc_angle2edge[parent_id]
inc_edge_id_pedes = inc_angle2edge[inc_angle_pedes]
print(inc_edge_id_pedes)

# 진출로 각도 목록
out_angles = cmatch.out_angle.astype(int).unique()
out_angles = np.sort(out_angles)
out_angles = list(out_angles - 360) + list(out_angles) + list(out_angles + 360)
out_angles = np.array(out_angles)
print(out_angles)

# 보행신호시의 진입로 각도
out_angles_right = out_angles[out_angles <= inc_angle_head - self.angle_separation]
out_angle_pedes = np.sort(out_angles_right)[-1] % 360
print(out_angles_right)

print(out_angle_pedes)

# 보행신호시의 진입로 엣지id
out_angle2edge = n2out_angle2edge[parent_id]
out_edge_id_pedes = out_angle2edge[out_angle_pedes]
print(out_edge_id_pedes)

# 보행신호시 조건
pedes_flag = (cmatch.inc_edge_id==inc_edge_id_pedes) & (cmatch.out_edge_id==out_edge_id_pedes)

# 좌회전시 조건
right_flag = head_true & (cmatch.turn_type=='left')

# 보행신호시/좌회전시 진입/진출 엣지id 배정
cmatch[['inc_edge_id', 'out_edge_id']] = np.nan
if condition == "보행신호시":
    cmatch.loc[pedes_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
elif condition == "좌회전시":
    cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]

display(cmatch)

In [190]:
i = 1
row = self.u_turn.iloc[i]

parent_id = row.parent_id
child_id = row.child_id
condition = row.condition
inc_edge_id = row.from_edge_id
out_edge_id = row.to_edge_id
head_edge_id = row.head_edge_id

# match5에서 부모노드id에 해당하는 행들을 가져옴 (cmatch)
cmatch = self.match5.copy()[self.match5.node_id==parent_id] # match dataframe for a child node
cmatch = cmatch.dropna(subset=['inc_angle', 'out_angle'])
cmatch = cmatch.sort_values(by=['phase_no', 'ring_type']).reset_index(drop=True)
cmatch['node_id'] = child_id
print(parent_id, child_id, condition)
display(cmatch)

# 헤드엣지가 포함된 행
head_true = cmatch.inc_edge_id==head_edge_id

# 헤드엣지 각도
inc_angle_head = n2inc_edge2angle[parent_id][head_edge_id]
print(inc_angle_head)

# 이격각도
self.angle_separation = 10

# 진입로 각도 목록
inc_angles = cmatch.inc_angle.astype(int).unique()
inc_angles = np.sort(inc_angles)
inc_angles = list(inc_angles - 360) + list(inc_angles) + list(inc_angles + 360)
inc_angles = np.array(inc_angles)
print(inc_angles)

# 보행신호시의 진입로 각도
inc_angles_left = inc_angles[inc_angles >= inc_angle_head + self.angle_separation]
inc_angle_pedes = np.sort(inc_angles_left)[0] % 360
print(inc_angles_left)
print(inc_angle_pedes)

# 보행신호시의 진입로 엣지id
inc_angle2edge = n2inc_angle2edge[parent_id]
inc_edge_id_pedes = inc_angle2edge[inc_angle_pedes]
print(inc_edge_id_pedes)

# 진출로 각도 목록
out_angles = cmatch.out_angle.astype(int).unique()
out_angles = np.sort(out_angles)
out_angles = list(out_angles - 360) + list(out_angles) + list(out_angles + 360)
out_angles = np.array(out_angles)
print(out_angles)

# 보행신호시의 진입로 각도
out_angles_right = out_angles[out_angles <= inc_angle_head - self.angle_separation]
out_angle_pedes = np.sort(out_angles_right)[-1] % 360
print(out_angles_right)

print(out_angle_pedes)

# 보행신호시의 진입로 엣지id
out_angle2edge = n2out_angle2edge[parent_id]
out_edge_id_pedes = out_angle2edge[out_angle_pedes]
print(out_edge_id_pedes)

# 보행신호시 조건
pedes_flag = (cmatch.inc_edge_id==inc_edge_id_pedes) & (cmatch.out_edge_id==out_edge_id_pedes)

# 좌회전시 조건
right_flag = head_true & (cmatch.turn_type=='left')

# 보행신호시/좌회전시 진입/진출 엣지id 배정
cmatch[['inc_edge_id', 'out_edge_id']] = np.nan
if condition == "보행신호시":
    cmatch.loc[pedes_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
elif condition == "좌회전시":
    cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]

display(cmatch)

i2 u20 보행신호시


Unnamed: 0,inter_no,phase_no,ring_type,move_no,inc_dire,out_dire,inc_angle,out_angle,inc_edge_id,out_edge_id,node_id,turn_type
0,177,1,A,8,남,북,179,0,-571542809_01,571542811_01,u20,left
1,177,1,B,4,북,남,0,180,571542811_02,571542809_01,u20,straight
2,177,2,A,7,북,동,0,90,571542811_02,571542107_01,u20,left
3,177,2,B,3,남,서,179,270,-571542809_01,571542809_01,u20,left
4,177,4,A,5,서,북,268,0,-571542809_01,571542811_01,u20,left
5,177,4,B,1,동,남,90,180,571542107_02,571542809_01,u20,left


0
[-360 -270 -181  -92    0   90  179  268  360  450  539  628]
[ 90 179 268 360 450 539 628]
90
571542107_02
[-360 -270 -180  -90    0   90  180  270  360  450  540  630]
[-360 -270 -180  -90]
270
571542809_01


Unnamed: 0,inter_no,phase_no,ring_type,move_no,inc_dire,out_dire,inc_angle,out_angle,inc_edge_id,out_edge_id,node_id,turn_type
0,177,1,A,8,남,북,179,0,,,u20,left
1,177,1,B,4,북,남,0,180,,,u20,straight
2,177,2,A,7,북,동,0,90,,,u20,left
3,177,2,B,3,남,서,179,270,,,u20,left
4,177,4,A,5,서,북,268,0,,,u20,left
5,177,4,B,1,동,남,90,180,571542810_01.51,571542810_02,u20,left


In [None]:
# 각 uturn node에 대하여 (inc_edge_id, out_edge_id) 부여
cmatches = []
for _, row in self.uturn.iterrows():
    child_id = row.child_id
    parent_id = row.parent_id
    dire = row.direction
    condition = row.condition
    inc_edge_id = row.inc_edge_id
    out_edge_id = row.out_edge_id

    print(parent_id, condition)

    # match5에서 부모노드id에 해당하는 행들을 가져옴 (cmatch)
    cmatch = self.match5.copy()[self.match5.node_id==parent_id] # match dataframe for a child node
    cmatch = cmatch.sort_values(by=['phase_no', 'ring_type']).reset_index(drop=True)
    cmatch['node_id'] = child_id
    cmatch[['inc_edge_id', 'out_edge_id']] = np.nan
    display(cmatch)

    # 보행신호시/좌회전시 진입/진출방향
    ind = self.dires.index(dire)
    inc_dire_pedes = self.dires[(ind + 2) % len(self.dires)]
    out_dire_pedes = self.dires[(ind - 2) % len(self.dires)]
    inc_dire_right = dire
    out_dire_right = self.dires[(ind + 2) % len(self.dires)]

    if condition == '보행신호시':
        print(inc_dire_pedes, out_dire_pedes)
    else:
        print(inc_dire_right, out_dire_right)

    # 보행신호시/좌회전시 조건
    pedes_exists = (cmatch.inc_dire==inc_dire_pedes) & (cmatch.out_dire==out_dire_pedes)
    right_exists = (cmatch.inc_dire==inc_dire_right) & (cmatch.out_dire==out_dire_right)

    # 보행신호시/좌회전시 진입/진출 엣지id 배정
    ind = self.dires.index(dire)
    if condition == "보행신호시":
        cmatch.loc[pedes_exists, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
    elif condition == "좌회전시":
        cmatch.loc[right_exists, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
    display(cmatch)

    # 신호없음이동류발생시/보행신호이동류발생시 조건
    all_redsigns = cmatch.move_no == 18
    crosswalk_on = cmatch.move_no == 17

    # 만약 어떤 유턴신호도 배정되지 않았다면
    # 좌회전시 → 보행신호시 → 보행신호이동류발생시 → 신호없음이동류발생시 순으로 진입/진출 엣지id 배정
    uturn_not_assigned = cmatch[['inc_edge_id','out_edge_id']].isna().any(axis=1).all()
    if uturn_not_assigned:
        # 좌회전시
        if right_exists.any():
            cmatch.loc[right_exists, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
        # 보행신호시
        elif pedes_exists.any():
            cmatch.loc[pedes_exists, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
        # 보행신호이동류(17) 발생시
        elif crosswalk_on.any():
            cmatch.loc[crosswalk_on & (cmatch.out_dire!=dire), ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
        # 신호없음이동류(18) 발생시
        elif all_redsigns.any():
            cmatch.loc[all_redsigns & (cmatch.out_dire!=dire), ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
    display(cmatch)
cmatches.append(cmatch)


In [None]:
'''Given `inter_no`, `inc_edge_id` and `out_edge_id`, we have `inc_vec = inter2inc2vec[inter_no][inc_edge_id]` and `out_vec = inter2out2vec[inter_no][out_edge_id]`.
Rotate `inc_vec` by 90, 180 and 270 degrees clockwise, to define 
`out_vec_left`, `out_vec_straight` and `out_vec_right`.
Define `out_vecs={'right':out_vec_left, 'straight':out_vec_straight, 'right':out_vec_right}`.
Select the key that maximize the similarity of the corresponding value of the key and `inc_vec`.'''