In [1]:
import pandas as pd
import numpy as np
import os, sys, copy, argparse, json, pickle
import sumolib, traci
from tqdm import tqdm
from datetime import datetime
path_root = os.path.dirname(os.path.dirname(os.path.abspath('.')))
# path_scr = os.path.join(path_root, 'scripts')
# sys.path.append(path_scr)
# from preprocess_daily import DailyPreprocessor
# from generate_signals import SignalGenerator

In [2]:
class DailyPreprocessor:
    pass
self = DailyPreprocessor()
self.config_name = 'test_0729'
self.file_net = 'new_sungnam_network_internal_target_0721.net.xml'

In [3]:
# 루트폴더 지정
self.path_root = path_root
with open(os.path.join(self.path_root, 'configs', f'config_{self.config_name}.json'), 'r') as config_file:
    config = json.load(config_file)

In [4]:
# 주요 폴더 경로 지정
self.paths = config['paths']
self.path_data = os.path.join(self.path_root, *self.paths['data'])
self.path_intermediates = os.path.join(self.path_root, *self.paths['intermediates'])
self.path_results = os.path.join(self.path_root, *self.paths['results'])
self.path_tables = os.path.join(self.path_root, *self.paths['tables'])
self.path_networks = os.path.join(self.path_root, *self.paths['networks'])
self.path_scripts = os.path.join(self.path_root, *self.paths['scripts'])

# 이슈사항 목록
self.issues = []

In [5]:
# 1-1. 네트워크 불러오기
self.net = sumolib.net.readNet(os.path.join(self.path_networks, self.file_net))

In [6]:
os.listdir(self.path_tables)

['angle.csv',
 'coord.csv',
 'inter_info.csv',
 'inter_node.csv',
 'nema.csv',
 'plan.csv',
 'TC_IF_TOD_DAY_PLAN.csv',
 'TC_IF_TOD_HOLIDAY_PLAN.csv',
 'TC_IF_TOD_RED_YELLO.csv',
 'TC_IF_TOD_WEEK_PLAN.csv',
 'TL_IF_SIGL_CYCL.csv',
 'turn_type.csv',
 'turn_type_modified.csv',
 'uturn.csv',
 'u_condition.csv']

In [7]:
# cnames = []
# csv_names = list(os.listdir(self.path_tables))
# for csv_name in csv_names:
#     print(csv_name)
#     try:
#         df = pd.read_csv(os.path.join(self.path_tables, csv_name))
#     except UnicodeDecodeError as e:
#         df = pd.read_csv(os.path.join(self.path_tables, csv_name), encoding='cp949')
#     cnames.extend(list(df.columns))
#     print(list(df.columns))

In [8]:
# 1-2. 테이블 불러오기
self.alphs = ['A', 'B']

loading_dtype_1 = {'CRSRD_ID':int,          'CYCL':int,             'DAY':int,
                   'FRI_PLAN_NO':int,       'HOUR':int,             'LAST_MDFCN_DT':str,
                   'MIN':int,               'MNTH':int,             'MON_PLAN_NO':int,
                   'OFFSET':int,            'PHASE':int,            'PLAN_NO':int,
                   'RINGA_FLOW':int,        'RINGA_MIN_SEC':int,    'RINGA_RED_SEC':int,
                   'RINGA_YELLO_SEC':int,   'RINGB_FLOW':int,       'RINGB_MIN_SEC':int,
                   'RINGB_RED_SEC':int,     'RINGB_YELLO_SEC':int,  'SAT_PLAN_NO':int,
                   'SUN_PLAN_NO':int,       'THU_PLAN_NO':int,      'TUE_PLAN_NO':int,
                   'WED_PLAN_NO':int,       'adj_inc_edge_id':str,  'adj_out_edge_id':str,
                   'angle_code':str,        'child_id':str,         'condition':str,
                   'group_no':str,          'inc_dire':str,         'inc_edge_id':str,
                   'inter_lat':float,       'inter_lon':float,      'inter_name':str,
                   'inter_no':int,          'inter_type':str,       'main_phase_no':str,
                   'move_no':int,           'node_id':str,          'out_dire':str,
                   'out_edge_id':str,       'parent_id':str,        'phase_no':int,
                   'ring_type':str,         'turn_type':str}

loading_dtype_2 = {f'RING{alph}_PHASE{i}':int for alph in self.alphs for i in range(1,9)}
loading_dtype = {**loading_dtype_1, **loading_dtype_2}

# 수작업으로 만든 테이블 불러오기
self.inter_node = pd.read_csv(os.path.join(self.path_tables, 'inter_node.csv'), dtype=loading_dtype)
self.inter_info = pd.read_csv(os.path.join(self.path_tables, 'inter_info.csv'), dtype=loading_dtype)
self.turn_type  = pd.read_csv(os.path.join(self.path_tables, 'turn_type.csv'), dtype=loading_dtype)
self.uturn      = pd.read_csv(os.path.join(self.path_tables, 'uturn.csv'), dtype=loading_dtype)
self.u_condition= pd.read_csv(os.path.join(self.path_tables, 'u_condition.csv'), dtype=loading_dtype)
self.coord      = pd.read_csv(os.path.join(self.path_tables, 'coord.csv'), dtype=loading_dtype)
self.nema       = pd.read_csv(os.path.join(self.path_tables, 'nema.csv'), dtype=loading_dtype, encoding='cp949')
self.angle      = pd.read_csv(os.path.join(self.path_tables, 'angle.csv'), dtype=loading_dtype)

# DB에서 fetch하는 테이블 불러오기
self.dayplan    = pd.read_csv(os.path.join(self.path_tables, 'TC_IF_TOD_DAY_PLAN.csv'), dtype=loading_dtype)
self.holyplan   = pd.read_csv(os.path.join(self.path_tables, 'TC_IF_TOD_HOLIDAY_PLAN.csv'), dtype=loading_dtype)
self.weekplan   = pd.read_csv(os.path.join(self.path_tables, 'TC_IF_TOD_WEEK_PLAN.csv'), dtype=loading_dtype)
self.red_yel    = pd.read_csv(os.path.join(self.path_tables, 'TC_IF_TOD_RED_YELLO.csv'), dtype=loading_dtype)

In [9]:
# 교차로목록, 노드목록 정의
self.inter_nos = [int(x) for x in sorted(self.inter_info.inter_no.unique())]
self.node_ids = sorted(self.inter_node.node_id.unique())

In [10]:
# cnames = []
# csv_names = list(os.listdir(self.path_tables))
# for csv_name in csv_names:
#     print(csv_name)
#     try:
#         df = pd.read_csv(os.path.join(self.path_tables, csv_name))
#     except UnicodeDecodeError as e:
#         df = pd.read_csv(os.path.join(self.path_tables, csv_name), encoding='cp949')
#     cnames.extend(list(df.columns))
#     print(list(df.columns))

In [11]:
cnames = []
tods = [self.dayplan, self.holyplan, self.weekplan, self.red_yel]
for tod in tods:
    cnames.extend(list(tod.columns))
    print(list(tod.columns))
cnames = sorted(set(cnames))

['CRSRD_ID', 'PLAN_NO', 'HOUR', 'MIN', 'CYCL', 'OFFSET', 'RINGA_PHASE1', 'RINGA_PHASE2', 'RINGA_PHASE3', 'RINGA_PHASE4', 'RINGA_PHASE5', 'RINGA_PHASE6', 'RINGA_PHASE7', 'RINGA_PHASE8', 'RINGB_PHASE1', 'RINGB_PHASE2', 'RINGB_PHASE3', 'RINGB_PHASE4', 'RINGB_PHASE5', 'RINGB_PHASE6', 'RINGB_PHASE7', 'RINGB_PHASE8', 'LAST_MDFCN_DT']
['CRSRD_ID', 'MNTH', 'DAY', 'PLAN_NO', 'LAST_MDFCN_DT']
['CRSRD_ID', 'SUN_PLAN_NO', 'MON_PLAN_NO', 'TUE_PLAN_NO', 'WED_PLAN_NO', 'THU_PLAN_NO', 'FRI_PLAN_NO', 'SAT_PLAN_NO', 'LAST_MDFCN_DT']
['CRSRD_ID', 'PLAN_NO', 'PHASE', 'RINGA_FLOW', 'RINGA_RED_SEC', 'RINGA_YELLO_SEC', 'RINGA_MIN_SEC', 'RINGB_FLOW', 'RINGB_RED_SEC', 'RINGB_YELLO_SEC', 'RINGB_MIN_SEC', 'LAST_MDFCN_DT']


In [12]:
# 1-5. 테이블 표준화

# 컬럼명 변경
rename_cname_1 = {'CRSRD_ID':'inter_no',        'CYCL':'cycle',             'DAY':'DD',
                  'HOUR':'hh',                  'MIN':'mm',                 'MNTH':'MM',
                  'OFFSET':'offset',            'PHASE':'phase_no',         'PLAN_NO':'plan_no',
                  'RINGA_RED_SEC':'red_A',      'RINGA_YELLO_SEC':'yel_A',  'RINGB_RED_SEC':'red_B',
                  'RINGB_YELLO_SEC':'yel_B'}

rename_cname_2 = {f'RING{alph}_PHASE{i}':f'dura_{alph}{i}' for alph in self.alphs for i in range(1,9)}
rename_cname = {**rename_cname_1, **rename_cname_2}

# 컬럼명 변경 적용
self.dayplan  = self.dayplan.rename(columns=rename_cname)
self.holyplan = self.holyplan.rename(columns=rename_cname)
self.weekplan = self.weekplan.rename(columns=rename_cname)
self.red_yel  = self.red_yel.rename(columns=rename_cname)

# 날짜 지정
DT = datetime(2024, 7, 25)
MM, DD = DT.month, DT.day # 월, 일
hplan = self.holyplan[(self.holyplan.MM==MM) & (self.holyplan.DD==DD)]
dow_number = DT.weekday() # 요일
dows = [dow for dow in self.weekplan.columns if dow.endswith('PLAN_NO')]
dows = dows[1:] + dows[0:1]
dow = dows[dow_number]

# (신호교차로, 신호계획번호) 목록
if len(hplan):
    inter_pnos = list(zip(hplan['inter_no'], hplan['plan_no']))
else:
    inter_pnos = list(zip(self.weekplan.inter_no, self.weekplan[dow]))

In [13]:
# 신호테이블 통합
self.plan = self.dayplan.copy()
self.plan['inter_pno'] = list(zip(self.plan['inter_no'], self.plan['plan_no']))
self.plan = self.plan[(self.plan.inter_pno.isin(inter_pnos))]
self.plan = self.plan.drop(columns='inter_pno')
max_phase_no = int(self.red_yel.phase_no.max())
for j in range(1,max_phase_no+1):
    RY = self.red_yel[self.red_yel.phase_no==j].iloc[0]
    red_A = RY.red_A
    red_B = RY.red_B
    yel_A = RY.yel_A
    yel_B = RY.yel_B
    self.plan[f'red_A{j}'] = red_A
    self.plan[f'red_B{j}'] = red_B
    self.plan[f'yellow_A{j}'] = yel_A
    self.plan[f'yellow_B{j}'] = yel_B

In [14]:
# 1-6. 주요 객체 (리스트, 딕셔너리) 저장
self.parent_ids = sorted(self.inter_node[self.inter_node.inter_type=='parent'].node_id.unique())
self.child_ids = sorted(self.inter_node[self.inter_node.inter_type=='child'].node_id.unique())
self.uturn_ids = sorted(self.uturn.child_id.unique())
self.coord_ids = sorted(self.coord.child_id.unique())

# node2inter : node_id to inter_no
self.node2inter = dict(zip(self.inter_node['node_id'], self.inter_node['inter_no']))

# inter2node : inter_no to node_id
inter_node1 = self.inter_node[self.inter_node.inter_type == 'parent'].drop('inter_type', axis=1)
inter_info1 = self.inter_info[['inter_no', 'inter_lat', 'inter_lon']]
inter = pd.merge(inter_node1, inter_info1, how='left', left_on=['inter_no'],
                right_on=['inter_no']).drop_duplicates()

self.inter2node = dict(zip(inter['inter_no'], inter['node_id']))

# ch2pa : child to parent
self.ch2pa = {}
for child_id in self.child_ids:
    parent_no = self.inter_node[self.inter_node.node_id==child_id].inter_no.iloc[0]
    sub_inter_node = self.inter_node[self.inter_node.inter_no==parent_no]
    self.ch2pa[child_id] = sub_inter_node[sub_inter_node.inter_type=='parent'].iloc[0].node_id
self.dires = ['북', '북동', '동', '남동', '남', '남서', '서', '북서'] # 정북기준 시계방향으로 8방향

# ids
self.ids = {'node_ids'  : self.node_ids,
            'parent_ids': self.parent_ids,
            'child_ids' : self.child_ids,
            'uturn_ids' : self.uturn_ids,
            'coord_ids' : self.coord_ids,
            'inter_nos' : self.inter_nos}
with open(os.path.join(self.path_intermediates, 'ids.json'), 'w') as file:
    json.dump(self.ids, file)

In [15]:
# 2. 중간산출물 만들기
# 2-1 매칭테이블 생성
# 2-1-1
self.match1 = pd.read_csv(os.path.join(self.path_intermediates, 'match1.csv'))
self.match1.head()

Unnamed: 0,inter_no,phas_A,phas_B,move_A,move_B
0,436,1,1,5,2
1,436,2,2,8,3
2,436,3,3,7,4
3,436,4,4,6,1
4,437,1,1,6,2


In [16]:
# 2-1-2
matchA = self.match1[['inter_no', 'phas_A', 'move_A']].copy()
matchA.columns = ['inter_no', 'phase_no', 'move_no']
matchA['ring_type'] = 'A'
matchB = self.match1[['inter_no', 'phas_B', 'move_B']].copy()
matchB.columns = ['inter_no', 'phase_no', 'move_no']
matchB['ring_type'] = 'B'
self.match2 = pd.concat([matchA, matchB]).drop_duplicates()
self.match2 = self.match2[['inter_no', 'phase_no', 'ring_type', 'move_no']]
self.match2 = self.match2.sort_values(by=list(self.match2.columns))
self.match2.head()

Unnamed: 0,inter_no,phase_no,ring_type,move_no
0,436,1,A,5
0,436,1,B,2
1,436,2,A,8
1,436,2,B,3
2,436,3,A,7


In [17]:
# 2-1-3
# nema 정보 불러오기 및 병합
self.match3 = pd.merge(self.match2, self.nema, how='left', on='move_no').drop_duplicates()
self.match3.head()

Unnamed: 0,inter_no,phase_no,ring_type,move_no,inc_dire,out_dire
0,436,1,A,5,서,북
1,436,1,B,2,서,동
2,436,2,A,8,남,북
3,436,2,B,3,남,서
4,436,3,A,7,북,동


In [18]:
node_id = '106350'
inter_no = self.node2inter[node_id]
self.angle[self.angle.inter_no==inter_no]

Unnamed: 0,inter_no,move_no,angle_code
16,438,6,100277
17,438,2,276098
18,438,5,274342
19,438,1,101181
20,438,8,183340
21,438,3,182276
22,438,7,347099
23,438,4,348186


In [19]:
# 2-1-4
# helper dictionaries
im2inc_angle = dict() # a dictionary that maps (inter_no, move_no) to inc_angle
im2out_angle = dict() # a dictionary that maps (inter_no, move_no) to out_angle
self.angle = self.angle.dropna(subset=['move_no'])
self.angle['move_no'] = self.angle['move_no'].astype(int)
for row in self.angle.itertuples():
    inter_no = row.inter_no
    move_no = row.move_no
    angle_code = row.angle_code
    if isinstance(angle_code, str) and len(angle_code) == 6:
        im2inc_angle[(inter_no, move_no)] = angle_code[:3]
        im2out_angle[(inter_no, move_no)] = angle_code[3:]
    else:
        im2inc_angle[(inter_no, move_no)] = np.nan
        im2out_angle[(inter_no, move_no)] = np.nan
for inter_no in self.inter_nos:
    im2inc_angle[(inter_no, 17)] = np.nan
    im2out_angle[(inter_no, 17)] = np.nan
    im2inc_angle[(inter_no, 18)] = np.nan
    im2out_angle[(inter_no, 18)] = np.nan

# 진입, 진출 방위각 매칭
self.match4 = self.match3.copy()
for i, row in self.match4.iterrows():
    inter_no = row.inter_no
    move_no = row.move_no
    self.match4.at[i, 'inc_angle'] = im2inc_angle[(inter_no, move_no)]
    self.match4.at[i, 'out_angle'] = im2out_angle[(inter_no, move_no)]
self.match4.head()

Unnamed: 0,inter_no,phase_no,ring_type,move_no,inc_dire,out_dire,inc_angle,out_angle
0,436,1,A,5,서,북,262,358
1,436,1,B,2,서,동,262,74
2,436,2,A,8,남,북,174,355
3,436,2,B,3,남,서,172,263
4,436,3,A,7,북,동,356,74


In [20]:
# 2-1-5
self.match5 = self.match4.copy()
# 진입진출ID 매칭
for index, row in self.match5.iterrows():
    node_id = self.inter2node[row.inter_no]
    node = self.net.getNode(node_id)
    # 교차로의 모든 (from / to) edges
    inc_edges = [edge for edge in node.getIncoming() if edge.getFunction() == ''] # incoming edges
    out_edges = [edge for edge in node.getOutgoing() if edge.getFunction() == ''] # outgoing edges
    # 교차로의 모든 (from / to) unit vector
    inc_vecs = []
    for inc_edge in inc_edges:
        start = inc_edge.getShape()[-1]
        end = inc_edge.getShape()[-2]
        inc_vec = np.array(end) - np.array(start)
        inc_vec = inc_vec / (inc_vec ** 2).sum() ** 0.5
        inc_vecs.append(inc_vec)
    out_vecs = []
    for out_edge in out_edges:
        start = out_edge.getShape()[0]
        end = out_edge.getShape()[1]
        out_vec = np.array(end) - np.array(start)
        out_vec = out_vec / (out_vec ** 2).sum() ** 0.5
        out_vecs.append(out_vec)
    # 진입각, 진출각 불러오기
    if not pd.isna(row.inc_angle):
        inc_angle = int(row.inc_angle)
        out_angle = int(row.out_angle)
        # 방위각을 일반각으로 가공, 라디안 변환, 단위벡터로 변환
        inc_angle = (90 - inc_angle) % 360
        inc_angle = inc_angle * np.pi / 180.
        inc_vec_true = np.array([np.cos(inc_angle), np.sin(inc_angle)])
        out_angle = (90 - out_angle) % 360
        out_angle = out_angle * np.pi / 180.
        out_vec_true = np.array([np.cos(out_angle), np.sin(out_angle)])
        # 매칭 엣지 반환
        inc_index = np.array([np.dot(inc_vec, inc_vec_true) for inc_vec in inc_vecs]).argmax()
        out_index = np.array([np.dot(out_vec, out_vec_true) for out_vec in out_vecs]).argmax()
        inc_edge_id = inc_edges[inc_index].getID()
        out_edge_id = out_edges[out_index].getID()
        self.match5.at[index, 'inc_edge_id'] = inc_edge_id
        self.match5.at[index, 'out_edge_id'] = out_edge_id

self.match5['node_id'] = self.match5['inter_no'].map(self.inter2node)
self.match5['node_type'] = 'normal'
self.match5 = self.match5.sort_values(by=['inter_no','phase_no','ring_type']).reset_index(drop=True)

In [21]:
# n2io2turn : dictionary that maps node_id to io2turn
self.n2io2turn = dict()
for node_id in self.parent_ids:
    turn = self.turn_type[self.turn_type.node_id==node_id]
    io = list(zip(turn.inc_edge_id, turn.out_edge_id))
    # io2turn : dictionary that maps (inc_edge_id, out_edge_id) to turn_type
    io2turn = dict(zip(io, turn.turn_type))
    self.n2io2turn[node_id] = io2turn

In [22]:
# turn_type 지정
for i, row in self.match5.iterrows():
    node_id = row.node_id
    inc_edge_id = row.inc_edge_id
    out_edge_id = row.out_edge_id
    if not (pd.isna(inc_edge_id) and pd.isna(out_edge_id)):
        io2turn = self.n2io2turn[node_id]
        if (inc_edge_id, out_edge_id) in io2turn:
            turn_type = io2turn[(inc_edge_id, out_edge_id)]
            self.match5.at[i, 'turn_type'] = turn_type

In [23]:
# 2-1-6
self.uturn = pd.merge(self.uturn, self.u_condition, on='child_id')

# p2inc_edge2angle : node_id to inc_edge2angle
p2inc_edge2angle = dict()
# p2out_edge2angle : node_id to out_edge2angle
p2out_edge2angle = dict()
# p2inc_angle2edge : node_id to inc_angle2edge
p2inc_angle2edge = dict()
# p2out_angle2edge : node_id to out_angle2edge
p2out_angle2edge = dict()
for node_id in self.parent_ids:
    m5 = self.match5[self.match5.node_id==node_id]
    m5 = m5.dropna(subset=['inc_edge_id', 'out_edge_id'])
    # inc_edge2angle : inc_edge_id to inc_angle
    inc_edge2angle = dict(zip(m5.inc_edge_id, m5.inc_angle.astype(int)))
    p2inc_edge2angle[node_id] = inc_edge2angle
    # out_edge2angle : out_edge_id to out_angle
    out_edge2angle = dict(zip(m5.out_edge_id, m5.out_angle.astype(int)))
    p2out_edge2angle[node_id] = out_edge2angle
    # inc_angle2edge : inc_angle to inc_edge_id
    inc_angle2edge = dict(zip(m5.inc_angle.astype(int), m5.inc_edge_id))
    p2inc_angle2edge[node_id] = inc_angle2edge
    # out_angle2edge : out_angle to out_edge_id
    out_angle2edge = dict(zip(m5.out_angle.astype(int), m5.out_edge_id))
    p2out_angle2edge[node_id] = out_angle2edge

In [24]:
# 각 uturn node에 대하여 (inc_edge_id, out_edge_id) 부여
cmatches = []
for row in self.uturn.itertuples():
    parent_id = row.parent_id
    child_id = row.child_id
    condition = row.condition
    inc_edge_id = row.inc_edge_id
    out_edge_id = row.out_edge_id
    adj_inc_edge_id = row.adj_inc_edge_id
    adj_out_edge_id = row.adj_out_edge_id
    
    # match5에서 부모노드id에 해당하는 행들을 가져옴 (cmatch)
    cmatch = self.match5.copy()[self.match5.node_id==parent_id] # match dataframe for a child node
    cmatch = cmatch.sort_values(by=['phase_no', 'ring_type']).reset_index(drop=True)
    cmatch['node_id'] = child_id
    cmatch['node_type'] = 'u_turn'

    # 진입엣지 각도
    inc_angle = p2inc_edge2angle[parent_id][adj_inc_edge_id]

    # 이격각도
    self.angle_separation = 10

    # 진입로 각도 목록
    inc_angles = cmatch.dropna(subset=['inc_angle', 'out_angle']).inc_angle.astype(int).unique()
    inc_angles = np.sort(inc_angles)
    inc_angles = list(inc_angles - 360) + list(inc_angles) + list(inc_angles + 360)
    inc_angles = np.array(inc_angles)

    # 보행신호시의 진입로 각도
    inc_angles_left = inc_angles[inc_angles >= inc_angle + self.angle_separation]
    inc_angle_pedes = np.sort(inc_angles_left)[0] % 360

    # 보행신호시의 진입로 엣지id
    inc_angle2edge = p2inc_angle2edge[parent_id]
    inc_edge_id_pedes = inc_angle2edge[inc_angle_pedes]

    # 진출로 각도 목록
    out_angles = cmatch.dropna(subset=['inc_angle', 'out_angle']).out_angle.astype(int).unique()
    out_angles = np.sort(out_angles)
    out_angles = list(out_angles - 360) + list(out_angles) + list(out_angles + 360)
    out_angles = np.array(out_angles)

    # 보행신호시의 진출로 각도
    out_angles_right = out_angles[out_angles <= inc_angle - self.angle_separation]
    out_angle_pedes = np.sort(out_angles_right)[-1] % 360

    # 보행신호시의 진출로 엣지id
    out_angle2edge = p2out_angle2edge[parent_id]
    out_edge_id_pedes = out_angle2edge[out_angle_pedes]

    # 진입엣지/진출엣지 포함 조건
    inc_true = (cmatch.inc_edge_id==adj_inc_edge_id)
    out_true = (cmatch.out_edge_id==adj_out_edge_id)

    # 보행신호시 조건
    pedes_flag = (cmatch.inc_edge_id==inc_edge_id_pedes) & (cmatch.out_edge_id==out_edge_id_pedes)

    # 좌회전시 조건
    right_flag = inc_true & (cmatch.turn_type=='left')

    # 보행신호이동류(17) 조건
    crosswalk_on = (cmatch.move_no==17) & ~ out_true

    # 신호없음이동류(18) 조건
    all_redsigns = (cmatch.move_no==18) & ~ out_true

    # 보행신호시/좌회전시 진입/진출 엣지id 배정
    cmatch[['inc_edge_id', 'out_edge_id']] = np.nan
    if condition == "보행신호시":
        cmatch.loc[pedes_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
    elif condition == "좌회전시":
        cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]

    uturn_not_assigned = cmatch[['inc_edge_id','out_edge_id']].isna().any(axis=1).all()

    if uturn_not_assigned:
        # 좌회전시
        if right_flag.any():
            cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
        # 보행신호시
        elif pedes_flag.any():
            cmatch.loc[pedes_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
        # 보행신호이동류(17) 발생시
        elif crosswalk_on.any():
            cmatch.loc[crosswalk_on, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
        # 신호없음이동류(18) 발생시
        elif all_redsigns.any():
            cmatch.loc[all_redsigns, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
        # 진출엣지 미포함시
        elif out_true.any():
            cmatch.loc[~ out_true, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
    cmatches.append(cmatch)

# 각 연등교차로(coordination node)에 대하여 (inc_edge_id, out_edge_id) 부여
self.coord['inter_no'] = self.coord['parent_id'].map(self.node2inter)
self.coord = self.coord.rename(columns={'child_id':'node_id'})
self.coord[['inc_dire', 'out_dire', 'inc_angle','out_angle']] = np.nan
self.coord['move_no'] = 20
self.coord = self.coord[['inter_no', 'phase_no', 'ring_type', 'move_no', 'inc_dire', 'out_dire', 'inc_angle','out_angle', 'inc_edge_id', 'out_edge_id', 'node_id']]
self.coord['node_type'] = 'coord'

cmatches = pd.concat(cmatches)
self.match6 = pd.concat([self.match5, cmatches, self.coord]).drop_duplicates().sort_values(by=['inter_no', 'node_id', 'phase_no', 'ring_type'])
self.match6 = self.match6.reset_index(drop=True)

  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
  cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id'

In [25]:
self.match6[52:52+60]

Unnamed: 0,inter_no,phase_no,ring_type,move_no,inc_dire,out_dire,inc_angle,out_angle,inc_edge_id,out_edge_id,node_id,node_type,turn_type
52,443,1,A,6,동,서,41.0,225.0,516916.0,513863.0,108769,normal,straight
53,443,1,B,2,서,동,221.0,43.0,513862.0,516917.0,108769,normal,straight
54,443,2,A,5,서,북,222.0,320.0,513862.0,518550.0,108769,normal,left
55,443,2,B,2,서,동,221.0,43.0,513862.0,516917.0,108769,normal,straight
56,443,3,A,7,북,동,321.0,39.0,518549.0,516917.0,108769,normal,left
57,443,3,B,18,,,,,,,108769,normal,
58,443,1,A,6,동,서,41.0,225.0,,,109333,u_turn,straight
59,443,1,B,2,서,동,221.0,43.0,,,109333,u_turn,straight
60,443,2,A,5,서,북,222.0,320.0,519873.0,519874.0,109333,u_turn,left
61,443,2,B,2,서,동,221.0,43.0,,,109333,u_turn,straight


In [26]:
# 2-1-7
self.match7 = self.match6.copy()
self.match7 = self.match7[['inter_no', 'node_id', 'move_no', 'inc_angle', 'out_angle', 'inc_dire', 'out_dire', 'inc_edge_id', 'out_edge_id', 'node_type', 'turn_type']]

# (1) 각 교차로별 방향 목록 : pdires (possible directions)
p2dires = {} # parent_id to directions
for parent_id in self.parent_ids:
    dires = self.match7[self.match7.node_id == parent_id][['inc_dire','out_dire']].values.flatten()
    dires = {dire for dire in dires if type(dire)==str}
    p2dires[parent_id] = dires

# (2) 각 (교차로, 진입방향) 별 진입id 목록 : inc2id (incoming direction to incoming edge_id)
inc2id = {}
for parent_id in self.parent_ids:
    for inc_dire in p2dires[parent_id]:
        df = self.match7[(self.match7.node_id==parent_id) & (self.match7.inc_dire==inc_dire)]
        inc2id[(parent_id, inc_dire)] = df.inc_edge_id.iloc[0]

# (3) 각 (교차로, 진출방향) 별 진출id 목록 : out2id (outgoing direction to outgoing edge_id)
out2id = {}
for parent_id in self.parent_ids:
    for out_dire in p2dires[parent_id]:
        df = self.match7[(self.match7.node_id==parent_id) & (self.match7.out_dire==out_dire)]
        out2id[(parent_id, out_dire)] = df.out_edge_id.iloc[0]

# (4) 각 parent_id별 이동류번호 목록
p2move = dict() # parent id to a list of aligned movement numbers
for parent_id in self.parent_ids:
    pnema = self.nema[self.nema.inc_dire.isin(p2dires[parent_id]) & self.nema.out_dire.isin(p2dires[parent_id])]
    p2move[parent_id] = list(pnema.move_no)

# (5) 방위별 방향벡터
dire2vec = dict() # direction to unit vector
theta = np.pi/2
for dire in self.dires:
    dire2vec[dire] = np.array([np.cos(theta), np.sin(theta)])
    theta -= np.pi/4

# (6) 각 parent_id별 : 각 이동류별 진입/진출 엣지 id
p2move2inc_edge_id = dict() # parent id to move2inc_edge_id
p2move2out_edge_id = dict() # parent id to move2out_edge_id
for parent_id in self.parent_ids:
    move2inc_edge_id = dict() # plain movement to incoming edge id
    move2out_edge_id = dict() # plain movement to outgoing edge id
    for move_no in range(1,17):
        row = self.nema[self.nema.move_no==move_no].iloc[0]
        inc_dire = row.inc_dire
        out_dire = row.out_dire
        inc_vec_true = dire2vec[inc_dire]
        out_vec_true = dire2vec[out_dire]

        node = self.net.getNode(parent_id)
        # 교차로의 모든 (from / to) edges
        inc_edges = [edge for edge in node.getIncoming() if edge.getFunction() == ''] # incoming edges
        out_edges = [edge for edge in node.getOutgoing() if edge.getFunction() == ''] # outgoing edges
        # 교차로의 모든 (from / to) unit vector
        inc_vecs = []
        for inc_edge in inc_edges:
            start = inc_edge.getShape()[-1]
            end = inc_edge.getShape()[-2]
            inc_vec = np.array(end) - np.array(start)
            inc_vec = inc_vec / (inc_vec ** 2).sum() ** 0.5
            inc_vecs.append(inc_vec)
        out_vecs = []
        for out_edge in out_edges:
            start = out_edge.getShape()[0]
            end = out_edge.getShape()[1]
            out_vec = np.array(end) - np.array(start)
            out_vec = out_vec / (out_vec ** 2).sum() ** 0.5
            out_vecs.append(out_vec)
        # 매칭 엣지 반환
        inc_index = np.array([np.dot(inc_vec, inc_vec_true) for inc_vec in inc_vecs]).argmax()
        out_index = np.array([np.dot(out_vec, out_vec_true) for out_vec in out_vecs]).argmax()
        inc_edge_id = inc_edges[inc_index].getID()
        out_edge_id = out_edges[out_index].getID()
        move2inc_edge_id[move_no] = inc_edge_id
        move2out_edge_id[move_no] = out_edge_id
    p2move2inc_edge_id[parent_id] = move2inc_edge_id
    p2move2out_edge_id[parent_id] = move2out_edge_id

# (7) 각 이동류별 진입/진출 방위
m2inc_dire = dict()
m2out_dire = dict()
for move_no in range(1,17):
    row = self.nema[self.nema.move_no==move_no].iloc[0]
    m2inc_dire[move_no] = row.inc_dire
    m2out_dire[move_no] = row.out_dire

# (8) 가능한 모든 이동류에 대하여 진입id, 진출id 배정 : matching
self.matching = []
for parent_id in self.parent_ids:
    inter_no = self.node2inter[parent_id]
    # 좌회전과 직진(1 ~ 16)
    for move_no in range(1,17):
        inc_dire = m2inc_dire[move_no]
        out_dire = m2out_dire[move_no]
        if move_no in p2move[parent_id]:
            inc_edge_id = inc2id[(parent_id, inc_dire)]
            out_edge_id = out2id[(parent_id, out_dire)]
        else:
            inc_edge_id = p2move2inc_edge_id[parent_id][move_no]
            out_edge_id = p2move2out_edge_id[parent_id][move_no]
        if (inc_edge_id, out_edge_id) in self.n2io2turn[parent_id]:
            turn_type = self.n2io2turn[parent_id][inc_edge_id, out_edge_id]
        else:
            turn_type = 'left' if move_no % 2 else 'straight'
        new_row = pd.DataFrame({'inter_no':[inter_no], 'node_id':[parent_id], 'move_no':[move_no],
                                'inc_dire':[inc_dire], 'out_dire':[out_dire],
                                'inc_edge_id':[inc_edge_id], 'out_edge_id':[out_edge_id],
                                'turn_type': turn_type})
        self.matching.append(new_row)
child_matching = self.match7[self.match7.node_id.isin(self.child_ids)]
child_matching = child_matching.drop(columns=['inc_angle', 'out_angle'])
self.matching = pd.concat(self.matching)
self.matching = self.matching.dropna(subset=['inc_edge_id', 'out_edge_id'])\
                .sort_values(by=['inter_no', 'node_id', 'move_no']).reset_index(drop=True)
self.matching['move_no'] = self.matching['move_no'].astype(int)

In [27]:
self.match6

Unnamed: 0,inter_no,phase_no,ring_type,move_no,inc_dire,out_dire,inc_angle,out_angle,inc_edge_id,out_edge_id,node_id,node_type,turn_type
0,436,1,A,5,서,북,262,358,517505,517507,109836,normal,left
1,436,1,B,2,서,동,262,074,517505,517004,109836,normal,straight
2,436,2,A,8,남,북,174,355,517509,517507,109836,normal,straight
3,436,2,B,3,남,서,172,263,517509,517504,109836,normal,left
4,436,3,A,7,북,동,356,074,517002,517004,109836,normal,left
...,...,...,...,...,...,...,...,...,...,...,...,...,...
111,457,5,B,4,북,남,344,164,,,109297,u_turn,straight
112,458,1,A,8,남,북,073,250,513193,513188,106238,normal,straight
113,458,1,B,4,북,남,249,072,513189,513192,106238,normal,straight
114,458,2,A,17,,,,,,,106238,normal,


In [28]:
self.matching

Unnamed: 0,inter_no,node_id,move_no,inc_dire,out_dire,inc_edge_id,out_edge_id,turn_type
0,436,109836,1,동,남,517003,517506,left
1,436,109836,2,서,동,517505,517004,straight
2,436,109836,3,남,서,517509,517504,left
3,436,109836,4,북,남,517002,517506,straight
4,436,109836,5,서,북,517505,517507,left
...,...,...,...,...,...,...,...,...
139,458,106238,12,북서,남동,513195,513190,straight
140,458,106238,13,남서,북서,513189,513194,left
141,458,106238,14,북동,남서,513193,513188,straight
142,458,106238,15,북서,북동,513195,513192,left


In [29]:
node_id = '109901'
node = self.net.getNode(node_id)
conns = node.getConnections()
[(c.getJunctionIndex(), c) for c in node.getConnections()]

[(0, <sumolib.net.connection.Connection at 0x248f3615940>),
 (1, <sumolib.net.connection.Connection at 0x248f36159a0>),
 (2, <sumolib.net.connection.Connection at 0x248f3615a60>),
 (3, <sumolib.net.connection.Connection at 0x248f3615ac0>)]

In [30]:
# 2-2 신호 초기화
self.nodes = [self.net.getNode(node_id) for node_id in self.node_ids]
self.node2init = {}
# 유턴노드를 제외한 모든 노드 (우회전, 삼지교차로직진 : g, 그외 : r)
for node_id in sorted(set(self.node_ids) - set(self.uturn_ids)):
    node = self.net.getNode(node_id)
    conns = [(c.getJunctionIndex(), c) for c in node.getConnections()]
    conns = [c for c in conns if c[0] >= 0]
    conns = sorted(conns, key=lambda x: x[0])
    # print(node_id, len(conns),sep='\n')
    state = []
    for i, ci in conns: # i번째 connection : ci
        # print(ci.getTLLinkIndex())
        if ci.getTLLinkIndex() < 0:
            continue
        are_foes = False
        # 합류지점이 다르면서 상충되는 cj가 존재하면 r, 그외에는 g
        for j, cj in conns: # j번째 connection : cj
            # ci, cj의 합류지점이 같으면 통과
            if ci.getTo() == cj.getTo():
                continue
            # ci, cj가 상충되면 are_foes를 True로 지정.
            if node.areFoes(i, j):
                are_foes = True
                break
        state.append('r' if are_foes else 'g')
    self.node2init[node_id] = state

# 유턴노드 (유턴x : G, 유턴 : G)
for node_id in self.uturn_ids:
    node = self.net.getNode(node_id)
    conns = [(c.getJunctionIndex(), c) for c in node.getConnections()]
    conns = [c for c in conns if c[0] >= 0]
    conns = sorted(conns, key=lambda x: x[0])
    state = []
    for i, ci in conns:
        if ci.getTLLinkIndex() < 0:
            continue
        state.append('G')
    self.node2init[node_id] = state

# 신호가 부여되어 있는 경우에는 r을 부여 (우회전 : g, 그외 : r / 유턴x : G, 유턴 : r)
for _, row in self.match6.dropna(subset=['inc_edge_id', 'out_edge_id']).iterrows():
    node_id = row.node_id
    inc_edge_id = row.inc_edge_id
    out_edge_id = row.out_edge_id
    inc_edge = self.net.getEdge(inc_edge_id)
    out_edge = self.net.getEdge(out_edge_id)
    for conn in inc_edge.getConnections(out_edge):
        index = conn.getTLLinkIndex()
        if index >= 0:
            self.node2init[node_id][index] = 'r'

# json 파일로 저장
with open(os.path.join(self.path_intermediates, 'node2init.json'), 'w') as file:
    json.dump(self.node2init, file, indent=4)
print('2-2. 초기화 신호가 지정되었습니다. (우회전 : g)')

2-2. 초기화 신호가 지정되었습니다. (우회전 : g)


In [31]:
# 2-3 유턴 인덱스 / 비보호좌회전 인덱스 지정
self.u2uindex = dict() # uturn node id to uturn index
for row in self.uturn.itertuples():
    child_id = row.child_id
    inc_edge_id = row.inc_edge_id
    out_edge_id = row.out_edge_id
    # self.u2uindex 지정
    inc_edge = self.net.getEdge(inc_edge_id)
    out_edge = self.net.getEdge(out_edge_id)
    uturn_conn = inc_edge.getConnections(out_edge)[0]
    self.u2uindex[child_id] = uturn_conn.getTLLinkIndex()

self.p2UPLindices2inc_edge_ids = dict() # parent id to unprotected left index to incoming_edge_ids
for parent_id in self.parent_ids:
    init_state = self.node2init[parent_id]
    # 우회전 이동류 인덱스
    indices_right = [i for i in range(len(init_state)) if init_state[i]=='g']
    # from-to가 지정된 이동류 인덱스
    indices_assigned = []
    m5 = self.match5[(self.match5.node_id==parent_id)].dropna(subset=['inc_edge_id', 'out_edge_id'])
    for row in m5.itertuples():
        inc_edge = self.net.getEdge(row.inc_edge_id)
        out_edge = self.net.getEdge(row.out_edge_id)
        conns = inc_edge.getConnections(out_edge)
        indices = [conn for conn in conns if conn.getTLLinkIndex()>=0]
        indices = [conn for conn in conns if conn.getJunctionIndex()>=0]
        indices = [conn.getTLLinkIndex() for conn in conns]
        indices_assigned.extend(indices)
    # 좌회전 이동류 인덱스
    indices_left = []
    for row in self.turn_type[self.turn_type.turn_type=='left'].itertuples():
        inc_edge = self.net.getEdge(row.inc_edge_id)
        out_edge = self.net.getEdge(row.out_edge_id)
        conns = inc_edge.getConnections(out_edge)
        indices = [conn for conn in conns if conn.getTLLinkIndex()>=0]
        indices = [conn for conn in conns if conn.getJunctionIndex()>=0]
        indices = [conn.getTLLinkIndex() for conn in conns]
        indices_left.extend(indices)
    # 비보호좌회전 인덱스 (unprotected left index)
    UPLindices2inc_edge_ids = list((set(range(len(init_state))) - set(indices_right) - set(indices_assigned)).intersection(indices_left))
    self.p2UPLindices2inc_edge_ids[parent_id] = dict()
    for UPLindex in UPLindices2inc_edge_ids:
        node = self.net.getNode(parent_id)
        conns = node.getConnections()
        conns = [conn for conn in conns if conn.getTLLinkIndex() == UPLindex]
        inc_edge_ids = [conn.getFrom().getID() for conn in conns]
        self.p2UPLindices2inc_edge_ids[parent_id][UPLindex] = inc_edge_ids


In [32]:
# 2-4 신호배정
for i, row in self.matching.iterrows():
    node_id = row.node_id
    move_no = row.move_no
    inc_edge_id = row.inc_edge_id
    out_edge_id = row.out_edge_id
    state_list = copy.deepcopy(self.node2init[node_id])
    self.matching.at[i, 'state'] = ''.join(state_list)
    if (pd.isna(inc_edge_id)) or (pd.isna(out_edge_id)):
        continue
    inc_edge = self.net.getEdge(inc_edge_id)
    out_edge = self.net.getEdge(out_edge_id)
    # 신호가 부여되어 있으면 (from, to가 존재하면) G 부여 (우회전 : g, 신호 : G, 그외 : r)
    for conn in inc_edge.getConnections(out_edge):
        index = conn.getTLLinkIndex()
        if index >= 0:
            state_list[index] = 'G'
    self.matching.at[i, 'state'] = ''.join(state_list)
self.matching = self.matching.dropna(subset='state')
self.matching = self.matching.reset_index(drop=True)
self.matching = self.matching[['inter_no', 'node_id', 'move_no', 'inc_edge_id', 'out_edge_id', 'state', 'turn_type']]


In [33]:
# match6 : 신호 배정
for i, row in self.match6.iterrows():
    node_id = row.node_id
    move_no = row.move_no
    inc_edge_id = row.inc_edge_id
    out_edge_id = row.out_edge_id
    state_list = copy.deepcopy(self.node2init[node_id])
    self.match6.at[i, 'state'] = ''.join(state_list)
    if (pd.isna(inc_edge_id)) or (pd.isna(out_edge_id)):
        continue
    inc_edge = self.net.getEdge(inc_edge_id)
    out_edge = self.net.getEdge(out_edge_id)
    # 신호가 부여되어 있으면 (from, to가 존재하면) G 부여 (우회전 : g, 신호 : G, 그외 : r)
    for conn in inc_edge.getConnections(out_edge):
        index = conn.getTLLinkIndex()
        if index >= 0:
            state_list[index] = 'G'
    self.match6.at[i, 'state'] = ''.join(state_list)

# match6 : 비보호좌회전 신호 배정
for i, row in self.match6[self.match6.node_id.isin(self.parent_ids)].iterrows():
    parent_id = row.node_id
    state = row.state
    UPLindices2inc_edge_ids = self.p2UPLindices2inc_edge_ids[parent_id]
    for UPLindex in UPLindices2inc_edge_ids:
        # 비보호좌회전 이동류에 대한 진입엣지에 신호가 부여되어 있으면
        inc_edge_ids = UPLindices2inc_edge_ids[UPLindex]
        if inc_edge_ids:
            if inc_edge_id in inc_edge_ids:
                # 해당 비보호좌회전 인덱스(UPLindex)에, 해당 진입엣지의 직진신호가 있을 때 g를 부여
                state = state[:UPLindex] + 'g' + state[UPLindex+1:]
                self.match6.at[i, 'state'] = state
        else: # 직진신호가 없는 비보호좌회전 발생시 멈춤 및 오류메시지 출력
            raise Exception(
            f"비보호좌회전 신호를 부여할 수 없습니다. \
            신호가 부여되어 있지 않은 직진 또는 좌회전 연결이 존재하는데\
            (node_id : {parent_id}, index : {UPLindex})\
            이 연결의 진입엣지(inc_edge_id : {inc_edge_id})에 부여된 신호가 없습니다.")


# match6 : 유턴 신호가 한번도 배정되지 않은 경우에 대해서는 유턴이동류의 신호를 항상 g로 배정
for node_id in self.uturn_ids:
    m6 = self.match6[self.match6.node_id==node_id]
    if not len(m6):
        continue
    state_list = copy.deepcopy(self.node2init[node_id])
    state = ''.join(state_list)
    uindex = self.u2uindex[node_id]
    values_at_uindex = [state[uindex] for state in m6.state]
    # 유턴신호가 한번도 배정되지 않았으면
    uturn_assigned = ('G' in values_at_uindex)
    if not uturn_assigned:
        # 해당 유턴 인덱스(uindex)에 g를 항상 부여
        state = state[:uindex] + 'g' + state[uindex+1:]
        self.match6.loc[self.match6.node_id==node_id, 'state'] = state

self.match6 = self.match6.dropna(subset='state')
self.match6 = self.match6.reset_index(drop=True)
self.match6 = self.match6[['inter_no', 'node_id', 'phase_no', 'ring_type', 'move_no', 'inc_edge_id', 'out_edge_id', 'state', 'turn_type']]
self.match6.to_csv(os.path.join(self.path_intermediates, 'match6.csv'), index=0)
self.matching.to_csv(os.path.join(self.path_intermediates, 'matching.csv'), index=0)

In [35]:
# 신호계획 저장
self.plan.to_csv(os.path.join(self.path_tables, 'plan.csv'), index=0)

# 노드별 주기 개수 저장
Aplan = self.plan.copy()[['inter_no'] + [f'dura_A{j}' for j in range(1,9)] + ['cycle']]
grouped = Aplan.groupby('inter_no')
df = grouped.agg({'cycle': 'min'}).reset_index()
df = df.rename(columns={'cycle': 'min_cycle'})
df['num_cycle'] = 300 // df['min_cycle'] + 2
inter2num_cycles = dict(zip(df['inter_no'], df['num_cycle']))
node2num_cycles = {node_id : inter2num_cycles[self.node2inter[node_id]] for node_id in self.node_ids}
with open(os.path.join(self.path_intermediates,'node2num_cycles.json'), 'w') as file:
    json.dump(node2num_cycles, file, indent=4)