신호생성 repo (24. 1. 5 ~).
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

1048 lines
55 KiB

# python .\scripts\preprocess_daily.py
import pandas as pd
import numpy as np
import os, sys, copy, argparse, json, pickle
import sumolib, traci
from tqdm import tqdm
from datetime import datetime
class DailyPreprocessor():
def __init__(self, config_name='test_0721', file_net = 'sn.net.xml'):
self.config_name = config_name
self.file_net = file_net
# 루트폴더 지정
self.path_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
with open(os.path.join(self.path_root, 'configs', f'config_{self.config_name}.json'), 'r') as config_file:
config = json.load(config_file)
# 주요 폴더 경로 지정
self.paths = config['paths']
self.path_data = os.path.join(self.path_root, *self.paths['data'])
self.path_intermediates = os.path.join(self.path_root, *self.paths['intermediates'])
self.path_results = os.path.join(self.path_root, *self.paths['results'])
self.path_tables = os.path.join(self.path_root, *self.paths['tables'])
self.path_networks = os.path.join(self.path_root, *self.paths['networks'])
self.path_scripts = os.path.join(self.path_root, *self.paths['scripts'])
# 이슈사항 목록
self.issues = []
# 1. 데이터 불러오기
def load_data(self):
print('1. 데이터를 로드합니다.')
self.load_networks()
self.load_tables()
self.check_networks()
# self.check_tables()
self.standardize()
self.store_objects()
# 1-1. 네트워크 불러오기
def load_networks(self):
self.net = sumolib.net.readNet(os.path.join(self.path_networks, self.file_net))
print("1-1. 네트워크가 로드되었습니다.")
# 1-2. 테이블 불러오기
def load_tables(self):
if self.config_name == 'draft':
# 모든 컬럼에 대하여 데이터타입 지정
loading_dtype = {
'inter_no':'int', 'start_hour':'int', 'start_minute':'int', 'cycle':'int', 'offset':'int',
'node_id':'str', 'inter_type':'str', 'parent_id':'str','child_id':'str',
'direction':'str', 'condition':'str', 'inc_edge_id':'str', 'out_edge_id':'str',
'end_unix':'int', 'inter_name':'str', 'inter_lat':'float', 'inter_lon':'float',
'group_no':'int', 'main_phase_no':'int', 'phase_no':'int','ring_type':'str',
'angle_code':'str', 'turn_type':'str'}
for alph in ['A', 'B']:
for j in range(1,9):
# loading_dtype[f'angle_{alph}{j}'] = 'str'
loading_dtype[f'dura_{alph}{j}'] = 'int'
# 테이블 불러오기
self.inter_node = pd.read_csv(os.path.join(self.path_tables, 'inter_node.csv'), dtype=loading_dtype)
self.turn_type = pd.read_csv(os.path.join(self.path_tables, 'turn_type.csv'), dtype=loading_dtype)
self.uturn = pd.read_csv(os.path.join(self.path_tables, 'uturn.csv'), dtype=loading_dtype)
self.u_condition= pd.read_csv(os.path.join(self.path_tables, 'u_condition.csv'), dtype=loading_dtype)
self.coord = pd.read_csv(os.path.join(self.path_tables, 'coord.csv'), dtype=loading_dtype)
self.plan = pd.read_csv(os.path.join(self.path_tables, 'plan.csv'), dtype=loading_dtype)
self.inter_info = pd.read_csv(os.path.join(self.path_tables, 'inter_info.csv'), dtype=loading_dtype)
self.angle = pd.read_csv(os.path.join(self.path_tables, 'angle.csv'), dtype=loading_dtype)
self.nema = pd.read_csv(os.path.join(self.path_tables, 'nema.csv'), encoding='cp949', dtype=loading_dtype)
elif self.config_name == 'test_0721':
self.alphs = ['A', 'B']
loading_dtype_1 = {'CRSRD_ID':int, 'CRSRD_NM':str, 'CRSRD_TYPE':int,
'CTRLER_TYPE':str, 'CYCL':int, 'DAY':int,
'FLOW_NO':str, 'FRI_PLAN_NO':int, 'FRST_REG_DT':str,
'GRP_NO':int, 'HOUR':int, 'LAST_MDFCN_DT':str,
'LGTD':float, 'LOS_YN':int, 'LTTD':float,
'MAIN_CRSRD_ID':str, 'MAIN_PHASE':int, 'MIN':int,
'MNTH':int, 'MON_PLAN_NO':int, 'NODE_ID':str,
'OFFSET':int, 'PHASE':int, 'PHASE_DT':str,
'PLAN_NO':int, 'PPC_TYPE':int, 'RING':str,
'RINGA_FLOW':int, 'RINGA_MIN_SEC':int, 'RINGA_RED_SEC':int,
'RINGA_YELLO_SEC':int, 'RINGB_FLOW':int, 'RINGB_MIN_SEC':int,
'RINGB_RED_SEC':int, 'RINGB_YELLO_SEC':int,'SAT_PLAN_NO':int,
'SIGL_ANGLE':str, 'STOS_NO':int, 'SUN_PLAN_NO':int,
'THU_PLAN_NO':int, 'TRFLIG_TYPE':int, 'TUE_PLAN_NO':int,
'USE_YN':int, 'WED_PLAN_NO':int, 'adj_inc_edge_id':str,
'adj_out_edge_id':str, 'child_id':str, 'condition':str,
'inc_edge_id':str, 'inter_no':int, 'inter_type':str,
'node_id':str, 'out_edge_id':str, 'parent_id':str,
'phase_no':int, 'ring_type':str, 'turn_type':str}
loading_dtype_2 = {f'RING{alph}_PHASE{i}':int for alph in self.alphs for i in range(1,9)}
loading_dtype = {**loading_dtype_1, **loading_dtype_2}
# 수작업으로 만든 테이블 불러오기
self.inter_node = pd.read_csv(os.path.join(self.path_tables, 'inter_node.csv'), dtype=loading_dtype)
self.turn_type = pd.read_csv(os.path.join(self.path_tables, 'turn_type.csv'), dtype=loading_dtype)
self.uturn = pd.read_csv(os.path.join(self.path_tables, 'uturn.csv'), dtype=loading_dtype)
self.u_condition= pd.read_csv(os.path.join(self.path_tables, 'u_condition.csv'), dtype=loading_dtype)
self.coord = pd.read_csv(os.path.join(self.path_tables, 'coord.csv'), dtype=loading_dtype)
self.nema = pd.read_csv(os.path.join(self.path_tables, 'nema.csv'), encoding='cp949', dtype=loading_dtype)
# DB에서 fetch하는 테이블 불러오기
self.dayplan = pd.read_csv(os.path.join(self.path_tables, 'TC_IF_TOD_DAY_PLAN.csv'), dtype=loading_dtype)
self.holyplan = pd.read_csv(os.path.join(self.path_tables, 'TC_IF_TOD_HOLIDAY_PLAN.csv'), dtype=loading_dtype)
self.weekplan = pd.read_csv(os.path.join(self.path_tables, 'TC_IF_TOD_WEEK_PLAN.csv'), dtype=loading_dtype)
self.red_yel = pd.read_csv(os.path.join(self.path_tables, 'TC_IF_TOD_RED_YELLO.csv'), dtype=loading_dtype)
self.inter_info = pd.read_csv(os.path.join(self.path_tables, 'TM_FA_CRSRD.csv'), dtype=loading_dtype)
self.angle = pd.read_csv(os.path.join(self.path_tables, 'TN_IF_SIGL_FLOW.csv'), dtype=loading_dtype)
# # 교차로목록, 노드목록 정의
if self.config_name == 'draft':
self.inter_nos = [int(x) for x in sorted(self.inter_info.inter_no.unique())]
elif self.config_name == 'test_0721':
self.inter_nos = [int(x) for x in sorted(self.inter_info.CRSRD_ID.unique())]
self.node_ids = sorted(self.inter_node.node_id.unique())
print("1-2. 테이블들이 로드되었습니다.")
# 1-3. 네트워크 무결성 검사
def check_networks(self):
# https://sumo.dlr.de/docs/Netedit/neteditUsageExamples.html#simplify_tls_program_state_after_changing_connections
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
if tools not in sys.path:
sys.path.append(tools)
else:
raise EnvironmentError("please declare environment variable 'SUMO_HOME'")
traci.start([sumolib.checkBinary('sumo'), "-n", os.path.join(self.path_networks, self.file_net)])
nodes = [node for node in self.net.getNodes() if node.getType() in ['traffic_light', 'traffic_light_right_on_red']]
for node in nodes:
try:
node_id = node.getID()
from_xml = len([c for c in node.getConnections() if c.getTLLinkIndex() >= 0])
from_traci = len(traci.trafficlight.getRedYellowGreenState(node_id))
if from_xml != from_traci:
sub = {'id': node_id, 'type': 'node', 'note': '유효하지 않은 연결이있음. netedit에서 clean states 필요.'}
self.issues.append(sub)
except Exception as e:
print(f'Error: {e}')
traci.close()
print("1-3. 네트워크의 모든 clean state requirement들을 체크했습니다.")
# 1-4. 테이블 무결성 검사
def check_tables(self):
self.check_plan()
self.check_inter_info()
self.check_angle()
print("1-4. 테이블들의 무결성 검사를 완료했습니다.")
# 1-4-1. 신호계획(plan) 검사
def check_plan(self):
# 1-4-1-1. inter_no 검사
# self.plan.loc[0, 'inter_no'] = '4' # 에러 발생을 위한 코드
missing_inter_nos = set(self.plan.inter_no) - set(self.inter_nos)
if missing_inter_nos:
msg = f"1-4-1-1. plan의 inter_no 중 교차로 목록(inter_nos)에 포함되지 않는 항목이 있습니다: {missing_inter_nos}"
self.issues.append(msg)
# 1-4-1-2. 시작시각 검사
# self.plan.loc[0, 'start_hour'] = 27 # 에러 발생을 위한 코드
for _, row in self.plan.iterrows():
start_hour = row.start_hour
start_minute = row.start_minute
if not (0 <= start_hour <= 23) or not (0 <= start_minute <= 59):
msg = f"1-4-1-2. plan에 잘못된 형식의 start_time이 존재합니다: {start_hour, start_minute}"
self.issues.append(msg)
# 1-4-1-3. 현시시간 검사
# self.plan.loc[0, 'dura_A1'] = -2 # 에러 발생을 위한 코드
durations = self.plan[[f'dura_{alph}{j}' for alph in ['A','B'] for j in range(1, 9)]]
valid_indices = ((durations >= 0) & (durations <= 200)).all(axis=1)
invalid_inter_nos = sorted(self.plan[~ valid_indices].inter_no.unique())
if invalid_inter_nos:
msg = f"1-4-1-3. plan에 음수이거나 200보다 큰 현시시간이 존재합니다. : {invalid_inter_nos}"
self.issues.append(msg)
# 1-4-1-4. 주기 일관성 검사
# self.plan.loc[0, 'cycle'] = 50 # 에러 발생을 위한 코드
inconsistent_cycle = self.plan.groupby(['inter_no', 'start_hour', 'start_minute'])['cycle'].nunique().gt(1)
if inconsistent_cycle.any():
inc_inter_no, start_hour, start_minute = inconsistent_cycle[inconsistent_cycle].index[0]
msg = f"1-4-1-4. 한 프로그램에 서로 다른 주기가 존재합니다. inter_no:{inc_inter_no}, start_hour:{start_minute}, start_hour:{start_minute}일 때, cycle이 유일하게 결정되지 않습니다."
self.issues.append(msg)
# 1-4-1-5. 현시시간 / 주기 검사
# self.plan.loc[0, 'duration'] = 10 # 에러 발생을 위한 코드
right_duration = True
for (inter_no, start_hour, start_minute), group in self.plan.groupby(['inter_no', 'start_hour', 'start_minute']):
A_sum = group[[f'dura_A{j}' for j in range(1, 9)]].iloc[0].sum()
B_sum = group[[f'dura_B{j}' for j in range(1, 9)]].iloc[0].sum()
# A_sum = group[group['ring_type']=='A']['duration'].sum()
# B_sum = group[group['ring_type']=='B']['duration'].sum()
cycle = group['cycle'].unique()[0]
if not (A_sum == B_sum == cycle):
right_duration = False
inc_inter_no = inter_no
if not right_duration:
msg = f"1-4-1-5. inter_no:{inc_inter_no}, A링현시시간의 합과 B링현시시간의 합이 일치하지 않거나, 현시시간의 합과 주기가 일치하지 않습니다."
self.issues.append(msg)
# 1-4-2. 교차로정보(inter_info) 검사
def check_inter_info(self):
# 1-4-2-1. inter_lat, inter_lon 적절성 검사
# self.inter_info.loc[0, 'inter_lat'] = 38.0 # 에러 발생을 위한 코드
self.max_lon, self.min_lon = 127.3, 127.0
self.max_lat, self.min_lat = 37.5, 37.2
for _, row in self.inter_info.iterrows():
latbool = self.min_lat <= row['inter_lat'] <= self.max_lat
lonbool = self.min_lon <= row['inter_lon'] <= self.max_lon
if not(latbool and lonbool):
msg = f"1-4-2-1. 위도 또는 경도가 범위를 벗어난 교차로가 있습니다: inter_no : {row['inter_no']}"
self.issues.append(msg)
# 1-4-3. 방위각정보(inter_info) 검사
def check_angle(self):
# 1-4-3-1. inter_no 검사
# self.angle.loc[0, 'inter_no'] = '4' # 에러 발생을 위한 코드
missing_inter_nos = set(self.angle.inter_no) - set(self.inter_nos)
if missing_inter_nos:
msg = f"1-4-3-1. angle의 inter_no 중 교차로 목록(inter_nos)에 포함되지 않는 항목이 있습니다: {missing_inter_nos}"
self.issues.append(msg)
# 1-4-3-2. 각도 코드 검사
angle_codes = np.array(self.angle.angle_code)
# angle_codes = [code for code in angle_codes if not pd.isna(code) and code != 'stop']
of_length_6 = [len(code)==6 for code in angle_codes]
if not all(of_length_6):
msg = f"1-4-3-2. 여섯자리가 아닌 각도코드가 존재합니다."
self.issues.append(msg)
angle_codes = [[code[:3],code[3:]] for code in angle_codes]
angle_codes = [int(item) for sublist in angle_codes for item in sublist]
angle_codes = [0<=code<360 for code in angle_codes]
if not all(angle_codes):
msg = f"1-4-3-2. 0과 359 사이의 값을 벗어나는 방위각이 존재합니다."
self.issues.append(msg)
# 1-5. 테이블 표준화
def standardize(self):
if self.config_name == 'draft':
pass
elif self.config_name == 'test_0721':
# 컬럼명 변경
rename_cname_1 = {'CRSRD_ID':'inter_no', 'CRSRD_NM':'inter_name', 'CRSRD_TYPE':'inter_type',
'CYCL':'cycle', 'DAY':'DD', 'FLOW_NO':'move_no',
'GRP_NO':'group_no', 'HOUR':'hh', 'LGTD':'inter_lon',
'LTTD':'inter_lat', 'MAIN_CRSRD_ID':'parent_id', 'MAIN_PHASE':'main_phase',
'MIN':'mm', 'MNTH':'MM', 'NODE_ID':'node_id',
'OFFSET':'offset', 'PHASE':'phase_no', 'RING':'ring_type',
'RINGA_RED_SEC':'red_A', 'RINGA_YELLO_SEC':'yel_A', 'RINGB_RED_SEC':'red_B',
'RINGB_YELLO_SEC':'yel_B', 'SIGL_ANGLE':'angle_code', 'PLAN_NO':'plan_no'}
rename_cname_2 = {f'RING{alph}_PHASE{i}':f'dura_{alph}{i}' for alph in self.alphs for i in range(1,9)}
rename_cname = {**rename_cname_1, **rename_cname_2}
# 컬럼명 변경 적용
self.dayplan = self.dayplan.rename(columns=rename_cname)
self.holyplan = self.holyplan.rename(columns=rename_cname)
self.weekplan = self.weekplan.rename(columns=rename_cname)
self.red_yel = self.red_yel.rename(columns=rename_cname)
self.inter_info = self.inter_info.rename(columns=rename_cname)
self.angle = self.angle.rename(columns=rename_cname)
# 불필요 컬럼 제거
self.dayplan = self.dayplan.drop(columns='LAST_MDFCN_DT')
self.holyplan = self.holyplan.drop(columns='LAST_MDFCN_DT')
self.weekplan = self.weekplan.drop(columns='LAST_MDFCN_DT')
self.red_yel = self.red_yel.drop(columns=['RINGA_FLOW', 'RINGA_MIN_SEC',
'RINGB_FLOW', 'RINGB_MIN_SEC', 'LAST_MDFCN_DT'])
self.inter_info = self.inter_info.drop(columns=['CTRLER_TYPE', 'TRFLIG_TYPE',
'PPC_TYPE', 'LOS_YN', 'USE_YN', 'FRST_REG_DT', 'LAST_MDFCN_DT'])
# 날짜정보 추출
now = datetime.now().replace(month=1, day=5, hour=10)
MM, DD = now.month, now.day # 월, 일
hplan = self.holyplan[(self.holyplan.MM==MM) & (self.holyplan.DD==DD)]
dow_number = now.weekday() # 요일
dows = [dow for dow in self.weekplan.columns if dow.endswith('PLAN_NO')]
dows = dows[1:] + dows[0:1]
dow = dows[dow_number]
# (신호교차로, 신호계획번호) 목록
if len(hplan):
inter_pnos = list(zip(hplan['inter_no'], hplan['plan_no']))
else:
inter_pnos = list(zip(self.weekplan.inter_no, self.weekplan[dow]))
# 신호테이블 통합
self.plan = self.dayplan.copy()
self.plan['inter_pno'] = list(zip(self.plan['inter_no'], self.plan['plan_no']))
self.plan = self.plan[(self.plan.inter_pno.isin(inter_pnos))]
self.plan = self.plan.drop(columns='inter_pno')
for j in range(1,9):
RY = self.red_yel[self.red_yel.phase_no==j].iloc[0]
red_A = RY.red_A
red_B = RY.red_B
yel_A = RY.yel_A
yel_B = RY.yel_B
self.plan[f'red_A{j}'] = red_A
self.plan[f'red_B{j}'] = red_B
self.plan[f'yellow_A{j}'] = yel_A
self.plan[f'yellow_B{j}'] = yel_B
print("1-5. 테이블을 표준화했습니다.")
# 1-6. 주요 객체 (리스트, 딕셔너리) 저장
def store_objects(self):
self.parent_ids = sorted(self.inter_node[self.inter_node.inter_type=='parent'].node_id.unique())
self.child_ids = sorted(self.inter_node[self.inter_node.inter_type=='child'].node_id.unique())
self.uturn_ids = sorted(self.uturn.child_id.unique())
self.coord_ids = sorted(self.coord.child_id.unique())
# node2inter : node_id to inter_no
self.node2inter = dict(zip(self.inter_node['node_id'], self.inter_node['inter_no']))
# inter2node : inter_no to node_id
inter_node1 = self.inter_node[self.inter_node.inter_type == 'parent'].drop('inter_type', axis=1)
inter_info1 = self.inter_info[['inter_no', 'inter_lat', 'inter_lon']]
inter = pd.merge(inter_node1, inter_info1, how='left', left_on=['inter_no'],
right_on=['inter_no']).drop_duplicates()
self.inter2node = dict(zip(inter['inter_no'], inter['node_id']))
# ch2pa : child to parent
self.ch2pa = {}
for child_id in self.child_ids:
parent_no = self.inter_node[self.inter_node.node_id==child_id].inter_no.iloc[0]
sub_inter_node = self.inter_node[self.inter_node.inter_no==parent_no]
self.ch2pa[child_id] = sub_inter_node[sub_inter_node.inter_type=='parent'].iloc[0].node_id
self.dires = ['', '북동', '', '남동', '', '남서', '', '북서'] # 정북기준 시계방향으로 8방향
# ids
self.ids = {'node_ids' : self.node_ids,
'parent_ids': self.parent_ids,
'child_ids' : self.child_ids,
'uturn_ids' : self.uturn_ids,
'coord_ids' : self.coord_ids,
'inter_nos' : self.inter_nos}
with open(os.path.join(self.path_intermediates, 'ids.json'), 'w') as file:
json.dump(self.ids, file)
print("1-6. 주요 객체 (리스트, 딕셔너리)들을 저장했습니다.")
# 2. 중간산출물 만들기
def get_intermediates(self):
print('2. 중간산출물을 생성합니다.')
self.get_matches()
self.initialize_state()
self.assign_indices()
self.assign_signals()
self.save_intermediates()
# 2-1 매칭테이블 생성
def get_matches(self):
self.make_match1()
self.make_match2()
self.make_match3()
self.make_match4()
self.make_match5()
self.make_match6() #
self.make_matching()
print('2-1. 매칭 테이블들을 생성했습니다.')
# 2-1-1
def make_match1(self, fetch_all:bool=False):
'''
모든 inter_no(교차로번호)에 대한 A, B링 현시별 이동류정보
컬럼 : inter_no, phase_A, phas_B, move_A, move_B
match1을 만드는 데 시간이 소요되므로 한 번 만들어서 저장해두고 저장해둔 것을 쓴다.
'''
if fetch_all:
# [이동류번호] 불러오기 (약 1분의 소요시간)
path_move = os.path.join(self.path_tables, 'move')
csv_moves = os.listdir(path_move)
moves = [pd.read_csv(os.path.join(path_move, csv_move), index_col=0) for csv_move in tqdm(csv_moves, desc='이동류정보 불러오는 중')]
df = pd.concat(moves).reset_index(drop=True)
self.match1 = []
for i, group in df.groupby(['inter_no', 'phas_A', 'phas_B']):
inter_no, phas_A, phas_B = i
pairs_array = np.array(group[['move_A', 'move_B']])
unique_pairs, counts = np.unique(pairs_array, axis=0, return_counts=True)
frequent_pair = unique_pairs[np.argmax(counts)]
self.match1.append(pd.DataFrame({'inter_no':[inter_no], 'phas_A':[phas_A], 'phas_B':[phas_B],
'move_A':[frequent_pair[0]], 'move_B':[frequent_pair[1]]}))
self.match1 = pd.concat(self.match1).reset_index(drop=True)
self.match1.to_csv(os.path.join(self.path_intermediates, 'match1.csv'), index=0)
else:
self.match1 = pd.read_csv(os.path.join(self.path_intermediates, 'match1.csv'))
# 2-1-2
def make_match2(self):
'''
match1을 계층화한 테이블
컬럼 : inter_no, phase_no, ring_type, move_no
'''
# 계층화 (inter_no, phas_A, phas_B, move_A, move_B) -> ('inter_no', 'phase_no', 'ring_type', 'move_no')
matchA = self.match1[['inter_no', 'phas_A', 'move_A']].copy()
matchA.columns = ['inter_no', 'phase_no', 'move_no']
matchA['ring_type'] = 'A'
matchB = self.match1[['inter_no', 'phas_B', 'move_B']].copy()
matchB.columns = ['inter_no', 'phase_no', 'move_no']
matchB['ring_type'] = 'B'
self.match2 = pd.concat([matchA, matchB]).drop_duplicates()
self.match2 = self.match2[['inter_no', 'phase_no', 'ring_type', 'move_no']]
self.match2 = self.match2.sort_values(by=list(self.match2.columns))
# 2-1-3
def make_match3(self):
'''
match2의 각 이동류번호에 진입방향, 진출방향을 매칭시킨 테이블
컬럼명 : inter_no, phase_no, ring_type, move_no, inc_dire, out_dire
'''
# nema 정보 불러오기 및 병합
self.match3 = pd.merge(self.match2, self.nema, how='left', on='move_no').drop_duplicates()
# 2-1-4
def make_match4(self):
'''
match3의 각 이동류번호에 진입/진출 방위각을 매칭시킨 테이블
컬럼명 : inter_no, phase_no, ring_type, move_no, inc_dire, out_dire, inc_angle, out_angle
'''
# helper dictionaries
im2inc_angle = dict() # a dictionary that maps (inter_no, move_no) to inc_angle
im2out_angle = dict() # a dictionary that maps (inter_no, move_no) to out_angle
self.angle = self.angle.dropna(subset=['move_no'])
self.angle['move_no'] = self.angle['move_no'].astype(int)
for row in self.angle.itertuples():
inter_no = row.inter_no
move_no = row.move_no
angle_code = row.angle_code
if isinstance(angle_code, str) and len(angle_code) == 6:
im2inc_angle[(inter_no, move_no)] = angle_code[:3]
im2out_angle[(inter_no, move_no)] = angle_code[3:]
else:
im2inc_angle[(inter_no, move_no)] = np.nan
im2out_angle[(inter_no, move_no)] = np.nan
for inter_no in self.inter_nos:
im2inc_angle[(inter_no, 17)] = np.nan
im2out_angle[(inter_no, 17)] = np.nan
im2inc_angle[(inter_no, 18)] = np.nan
im2out_angle[(inter_no, 18)] = np.nan
# 진입, 진출 방위각 매칭
self.match4 = self.match3.copy()
for i, row in self.match4.iterrows():
inter_no = row.inter_no
move_no = row.move_no
self.match4.at[i, 'inc_angle'] = im2inc_angle[(inter_no, move_no)]
self.match4.at[i, 'out_angle'] = im2out_angle[(inter_no, move_no)]
self.match4.head()
# 2-1-5
def make_match5(self):
'''
match4의 각 행에 진입엣지id, 진출엣지id 노드id 추가한 테이블
* 진입, 진출엣지id를 얻은 방법 : cosine similarity
컬럼명 : inter_no, phase_no, ring_type, move_no, inc_dire, out_dire, inc_angle, out_angle, inc_edge_id, out_edge_id, node_id
'''
self.match5 = self.match4.copy()
# 진입진출ID 매칭
for index, row in self.match5.iterrows():
node_id = self.inter2node[row.inter_no]
node = self.net.getNode(node_id)
# 교차로의 모든 (from / to) edges
inc_edges = [edge for edge in node.getIncoming() if edge.getFunction() == ''] # incoming edges
out_edges = [edge for edge in node.getOutgoing() if edge.getFunction() == ''] # outgoing edges
# 교차로의 모든 (from / to) unit vector
inc_vecs = []
for inc_edge in inc_edges:
start = inc_edge.getShape()[-1]
end = inc_edge.getShape()[-2]
inc_vec = np.array(end) - np.array(start)
inc_vec = inc_vec / (inc_vec ** 2).sum() ** 0.5
inc_vecs.append(inc_vec)
out_vecs = []
for out_edge in out_edges:
start = out_edge.getShape()[0]
end = out_edge.getShape()[1]
out_vec = np.array(end) - np.array(start)
out_vec = out_vec / (out_vec ** 2).sum() ** 0.5
out_vecs.append(out_vec)
# 진입각, 진출각 불러오기
if not pd.isna(row.inc_angle):
inc_angle = int(row.inc_angle)
out_angle = int(row.out_angle)
# 방위각을 일반각으로 가공, 라디안 변환, 단위벡터로 변환
inc_angle = (90 - inc_angle) % 360
inc_angle = inc_angle * np.pi / 180.
inc_vec_true = np.array([np.cos(inc_angle), np.sin(inc_angle)])
out_angle = (90 - out_angle) % 360
out_angle = out_angle * np.pi / 180.
out_vec_true = np.array([np.cos(out_angle), np.sin(out_angle)])
# 매칭 엣지 반환
inc_index = np.array([np.dot(inc_vec, inc_vec_true) for inc_vec in inc_vecs]).argmax()
out_index = np.array([np.dot(out_vec, out_vec_true) for out_vec in out_vecs]).argmax()
inc_edge_id = inc_edges[inc_index].getID()
out_edge_id = out_edges[out_index].getID()
self.match5.at[index, 'inc_edge_id'] = inc_edge_id
self.match5.at[index, 'out_edge_id'] = out_edge_id
self.match5['node_id'] = self.match5['inter_no'].map(self.inter2node)
self.match5['node_type'] = 'normal'
self.match5 = self.match5.sort_values(by=['inter_no','phase_no','ring_type']).reset_index(drop=True)
# n2io2turn : dictionary that maps node_id to io2turn
self.n2io2turn = dict()
for node_id in self.parent_ids:
turn = self.turn_type[self.turn_type.node_id==node_id]
io = list(zip(turn.inc_edge_id, turn.out_edge_id))
# io2turn : dictionary that maps (inc_edge_id, out_edge_id) to turn_type
io2turn = dict(zip(io, turn.turn_type))
self.n2io2turn[node_id] = io2turn
# turn_type 지정
for i, row in self.match5.iterrows():
node_id = row.node_id
inc_edge_id = row.inc_edge_id
out_edge_id = row.out_edge_id
if not (pd.isna(inc_edge_id) and pd.isna(out_edge_id)):
turn_type = self.n2io2turn[node_id][(inc_edge_id, out_edge_id)]
self.match5.at[i, 'turn_type'] = turn_type
# 2-1-6
def make_match6(self):
'''
match5에서 부교차로(유턴교차로, 연동교차로)에 대한 행들을 추가함
컬럼명 : inter_no, phase_no, ring_type, move_no, inc_dire, out_dire, inc_angle, out_angle, inc_edge_id, out_edge_id, node_id
'''
self.uturn = pd.merge(self.uturn, self.u_condition, on='child_id')
# p2inc_edge2angle : node_id to inc_edge2angle
p2inc_edge2angle = dict()
# p2out_edge2angle : node_id to out_edge2angle
p2out_edge2angle = dict()
# p2inc_angle2edge : node_id to inc_angle2edge
p2inc_angle2edge = dict()
# p2out_angle2edge : node_id to out_angle2edge
p2out_angle2edge = dict()
for node_id in self.parent_ids:
m5 = self.match5[self.match5.node_id==node_id]
m5 = m5.dropna(subset=['inc_edge_id', 'out_edge_id'])
# inc_edge2angle : inc_edge_id to inc_angle
inc_edge2angle = dict(zip(m5.inc_edge_id, m5.inc_angle.astype(int)))
p2inc_edge2angle[node_id] = inc_edge2angle
# out_edge2angle : out_edge_id to out_angle
out_edge2angle = dict(zip(m5.out_edge_id, m5.out_angle.astype(int)))
p2out_edge2angle[node_id] = out_edge2angle
# inc_angle2edge : inc_angle to inc_edge_id
inc_angle2edge = dict(zip(m5.inc_angle.astype(int), m5.inc_edge_id))
p2inc_angle2edge[node_id] = inc_angle2edge
# out_angle2edge : out_angle to out_edge_id
out_angle2edge = dict(zip(m5.out_angle.astype(int), m5.out_edge_id))
p2out_angle2edge[node_id] = out_angle2edge
# 각 uturn node에 대하여 (inc_edge_id, out_edge_id) 부여
cmatches = []
for row in self.uturn.itertuples():
parent_id = row.parent_id
child_id = row.child_id
condition = row.condition
inc_edge_id = row.inc_edge_id
out_edge_id = row.out_edge_id
adj_inc_edge_id = row.adj_inc_edge_id
adj_out_edge_id = row.adj_out_edge_id
# match5에서 부모노드id에 해당하는 행들을 가져옴 (cmatch)
cmatch = self.match5.copy()[self.match5.node_id==parent_id] # match dataframe for a child node
cmatch = cmatch.sort_values(by=['phase_no', 'ring_type']).reset_index(drop=True)
cmatch['node_id'] = child_id
cmatch['node_type'] = 'u_turn'
# 진입엣지 각도
inc_angle = p2inc_edge2angle[parent_id][adj_inc_edge_id]
# 이격각도
self.angle_separation = 10
# 진입로 각도 목록
inc_angles = cmatch.dropna(subset=['inc_angle', 'out_angle']).inc_angle.astype(int).unique()
inc_angles = np.sort(inc_angles)
inc_angles = list(inc_angles - 360) + list(inc_angles) + list(inc_angles + 360)
inc_angles = np.array(inc_angles)
# 보행신호시의 진입로 각도
inc_angles_left = inc_angles[inc_angles >= inc_angle + self.angle_separation]
inc_angle_pedes = np.sort(inc_angles_left)[0] % 360
# 보행신호시의 진입로 엣지id
inc_angle2edge = p2inc_angle2edge[parent_id]
inc_edge_id_pedes = inc_angle2edge[inc_angle_pedes]
# 진출로 각도 목록
out_angles = cmatch.dropna(subset=['inc_angle', 'out_angle']).out_angle.astype(int).unique()
out_angles = np.sort(out_angles)
out_angles = list(out_angles - 360) + list(out_angles) + list(out_angles + 360)
out_angles = np.array(out_angles)
# 보행신호시의 진출로 각도
out_angles_right = out_angles[out_angles <= inc_angle - self.angle_separation]
out_angle_pedes = np.sort(out_angles_right)[-1] % 360
# 보행신호시의 진출로 엣지id
out_angle2edge = p2out_angle2edge[parent_id]
out_edge_id_pedes = out_angle2edge[out_angle_pedes]
# 진입엣지/진출엣지 포함 조건
inc_true = (cmatch.inc_edge_id==adj_inc_edge_id)
out_true = (cmatch.out_edge_id==adj_out_edge_id)
# 보행신호시 조건
pedes_flag = (cmatch.inc_edge_id==inc_edge_id_pedes) & (cmatch.out_edge_id==out_edge_id_pedes)
# 좌회전시 조건
right_flag = inc_true & (cmatch.turn_type=='left')
# 보행신호이동류(17) 조건
crosswalk_on = (cmatch.move_no==17) & ~ out_true
# 신호없음이동류(18) 조건
all_redsigns = (cmatch.move_no==18) & ~ out_true
# 보행신호시/좌회전시 진입/진출 엣지id 배정
cmatch[['inc_edge_id', 'out_edge_id']] = np.nan
if condition == "보행신호시":
cmatch.loc[pedes_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
elif condition == "좌회전시":
cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
uturn_not_assigned = cmatch[['inc_edge_id','out_edge_id']].isna().any(axis=1).all()
if uturn_not_assigned:
# 좌회전시
if right_flag.any():
cmatch.loc[right_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 보행신호시
elif pedes_flag.any():
cmatch.loc[pedes_flag, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 보행신호이동류(17) 발생시
elif crosswalk_on.any():
cmatch.loc[crosswalk_on, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 신호없음이동류(18) 발생시
elif all_redsigns.any():
cmatch.loc[all_redsigns, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
# 진출엣지 미포함시
elif out_true.any():
cmatch.loc[~ out_true, ['inc_edge_id', 'out_edge_id']] = [inc_edge_id, out_edge_id]
cmatches.append(cmatch)
# 각 연등교차로(coordination node)에 대하여 (inc_edge_id, out_edge_id) 부여
self.coord['inter_no'] = self.coord['parent_id'].map(self.node2inter)
self.coord = self.coord.rename(columns={'child_id':'node_id'})
self.coord[['inc_dire', 'out_dire', 'inc_angle','out_angle']] = np.nan
self.coord['move_no'] = 20
self.coord = self.coord[['inter_no', 'phase_no', 'ring_type', 'move_no', 'inc_dire', 'out_dire', 'inc_angle','out_angle', 'inc_edge_id', 'out_edge_id', 'node_id']]
self.coord['node_type'] = 'coord'
cmatches = pd.concat(cmatches)
self.match6 = pd.concat([self.match5, cmatches, self.coord]).drop_duplicates().sort_values(by=['inter_no', 'node_id', 'phase_no', 'ring_type'])
self.match6 = self.match6.reset_index(drop=True)
# 2-1-7
def make_matching(self):
'''
각 교차로에 대하여, 가능한 모든 이동류 (1~16)에 대한 진입·진출엣지ID를 지정한 테이블
* 시차제에 대비한 테이블
컬럼명 : inter_no, move_no, inc_dire, out_dire, inc_edge_id, out_edge_id, node_id
'''
self.match7 = self.match6.copy()
self.match7 = self.match7[['inter_no', 'node_id', 'move_no', 'inc_angle', 'out_angle', 'inc_dire', 'out_dire', 'inc_edge_id', 'out_edge_id', 'node_type', 'turn_type']]
# (1) 각 교차로별 방향 목록 : pdires (possible directions)
p2dires = {} # parent_id to directions
for parent_id in self.parent_ids:
dires = self.match7[self.match7.node_id == parent_id][['inc_dire','out_dire']].values.flatten()
dires = {dire for dire in dires if type(dire)==str}
p2dires[parent_id] = dires
# (2) 각 (교차로, 진입방향) 별 진입id 목록 : inc2id (incoming direction to incoming edge_id)
inc2id = {}
for parent_id in self.parent_ids:
for inc_dire in p2dires[parent_id]:
df = self.match7[(self.match7.node_id==parent_id) & (self.match7.inc_dire==inc_dire)]
inc2id[(parent_id, inc_dire)] = df.inc_edge_id.iloc[0]
# (3) 각 (교차로, 진출방향) 별 진출id 목록 : out2id (outgoing direction to outgoing edge_id)
out2id = {}
for parent_id in self.parent_ids:
for out_dire in p2dires[parent_id]:
df = self.match7[(self.match7.node_id==parent_id) & (self.match7.out_dire==out_dire)]
out2id[(parent_id, out_dire)] = df.out_edge_id.iloc[0]
# (4) 각 parent_id별 이동류번호 목록
p2move = dict() # parent id to a list of aligned movement numbers
for parent_id in self.parent_ids:
pnema = self.nema[self.nema.inc_dire.isin(p2dires[parent_id]) & self.nema.out_dire.isin(p2dires[parent_id])]
p2move[parent_id] = list(pnema.move_no)
# (5) 방위별 방향벡터
dire2vec = dict() # direction to unit vector
theta = np.pi/2
for dire in self.dires:
dire2vec[dire] = np.array([np.cos(theta), np.sin(theta)])
theta -= np.pi/4
# (6) 각 parent_id별 : 각 이동류별 진입/진출 엣지 id
p2move2inc_edge_id = dict() # parent id to move2inc_edge_id
p2move2out_edge_id = dict() # parent id to move2out_edge_id
for parent_id in self.parent_ids:
move2inc_edge_id = dict() # plain movement to incoming edge id
move2out_edge_id = dict() # plain movement to outgoing edge id
for move_no in range(1,17):
row = self.nema[self.nema.move_no==move_no].iloc[0]
inc_dire = row.inc_dire
out_dire = row.out_dire
inc_vec_true = dire2vec[inc_dire]
out_vec_true = dire2vec[out_dire]
node = self.net.getNode(parent_id)
# 교차로의 모든 (from / to) edges
inc_edges = [edge for edge in node.getIncoming() if edge.getFunction() == ''] # incoming edges
out_edges = [edge for edge in node.getOutgoing() if edge.getFunction() == ''] # outgoing edges
# 교차로의 모든 (from / to) unit vector
inc_vecs = []
for inc_edge in inc_edges:
start = inc_edge.getShape()[-1]
end = inc_edge.getShape()[-2]
inc_vec = np.array(end) - np.array(start)
inc_vec = inc_vec / (inc_vec ** 2).sum() ** 0.5
inc_vecs.append(inc_vec)
out_vecs = []
for out_edge in out_edges:
start = out_edge.getShape()[0]
end = out_edge.getShape()[1]
out_vec = np.array(end) - np.array(start)
out_vec = out_vec / (out_vec ** 2).sum() ** 0.5
out_vecs.append(out_vec)
# 매칭 엣지 반환
inc_index = np.array([np.dot(inc_vec, inc_vec_true) for inc_vec in inc_vecs]).argmax()
out_index = np.array([np.dot(out_vec, out_vec_true) for out_vec in out_vecs]).argmax()
inc_edge_id = inc_edges[inc_index].getID()
out_edge_id = out_edges[out_index].getID()
move2inc_edge_id[move_no] = inc_edge_id
move2out_edge_id[move_no] = out_edge_id
p2move2inc_edge_id[parent_id] = move2inc_edge_id
p2move2out_edge_id[parent_id] = move2out_edge_id
# (7) 각 이동류별 진입/진출 방위
m2inc_dire = dict()
m2out_dire = dict()
for move_no in range(1,17):
row = self.nema[self.nema.move_no==move_no].iloc[0]
m2inc_dire[move_no] = row.inc_dire
m2out_dire[move_no] = row.out_dire
# (8) 가능한 모든 이동류에 대하여 진입id, 진출id 배정 : matching
self.matching = []
for parent_id in self.parent_ids:
inter_no = self.node2inter[parent_id]
# 좌회전과 직진(1 ~ 16)
for move_no in range(1,17):
inc_dire = m2inc_dire[move_no]
out_dire = m2out_dire[move_no]
if move_no in p2move[parent_id]:
inc_edge_id = inc2id[(parent_id, inc_dire)]
out_edge_id = out2id[(parent_id, out_dire)]
else:
inc_edge_id = p2move2inc_edge_id[parent_id][move_no]
out_edge_id = p2move2out_edge_id[parent_id][move_no]
if (inc_edge_id, out_edge_id) in self.n2io2turn[parent_id]:
turn_type = self.n2io2turn[parent_id][inc_edge_id, out_edge_id]
else:
turn_type = 'left' if move_no % 2 else 'straight'
new_row = pd.DataFrame({'inter_no':[inter_no], 'node_id':[parent_id], 'move_no':[move_no],
'inc_dire':[inc_dire], 'out_dire':[out_dire],
'inc_edge_id':[inc_edge_id], 'out_edge_id':[out_edge_id],
'turn_type': turn_type})
self.matching.append(new_row)
child_matching = self.match7[self.match7.node_id.isin(self.child_ids)]
child_matching = child_matching.drop(columns=['inc_angle', 'out_angle'])
self.matching = pd.concat(self.matching)
self.matching = self.matching.dropna(subset=['inc_edge_id', 'out_edge_id'])\
.sort_values(by=['inter_no', 'node_id', 'move_no']).reset_index(drop=True)
self.matching['move_no'] = self.matching['move_no'].astype(int)
# 2-2 신호 초기화
def initialize_state(self):
'''
비보호우회전신호 (g) 배정
input :
(1) net : 네트워크
(2) nodes : 노드 목록
(3) histids : 모든 교차로에 대한 시작유닉스 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
output : node2init
- 각 노드를 초기화된 신호로 맵핑하는 딕셔너리
- 초기화된 신호란, 우회전을 g로 나머지는 r로 지정한 신호를 말함.
'''
self.nodes = [self.net.getNode(node_id) for node_id in self.node_ids]
self.node2init = {}
# 유턴노드를 제외한 모든 노드 (우회전, 삼지교차로직진 : g, 그외 : r)
for node_id in sorted(set(self.node_ids) - set(self.uturn_ids)):
node = self.net.getNode(node_id)
conns = [(c.getJunctionIndex(), c) for c in node.getConnections()]
conns = [c for c in conns if c[0] >= 0]
conns = sorted(conns, key=lambda x: x[0])
state = []
for i, ci in conns: # i번째 connection : ci
if ci.getTLLinkIndex() < 0:
continue
are_foes = False
# 합류지점이 다르면서 상충되는 cj가 존재하면 r, 그외에는 g
for j, cj in conns: # j번째 connection : cj
# ci, cj의 합류지점이 같으면 통과
if ci.getTo() == cj.getTo():
continue
# ci, cj가 상충되면 are_foes를 True로 지정.
if node.areFoes(i, j):
are_foes = True
break
state.append('r' if are_foes else 'g')
self.node2init[node_id] = state
# 유턴노드 (유턴x : G, 유턴 : G)
for node_id in self.uturn_ids:
node = self.net.getNode(node_id)
conns = [(c.getJunctionIndex(), c) for c in node.getConnections()]
conns = [c for c in conns if c[0] >= 0]
conns = sorted(conns, key=lambda x: x[0])
state = []
for i, ci in conns:
if ci.getTLLinkIndex() < 0:
continue
state.append('G')
self.node2init[node_id] = state
# 신호가 부여되어 있는 경우에는 r을 부여 (우회전 : g, 그외 : r / 유턴x : G, 유턴 : r)
for _, row in self.match6.dropna(subset=['inc_edge_id', 'out_edge_id']).iterrows():
node_id = row.node_id
inc_edge_id = row.inc_edge_id
out_edge_id = row.out_edge_id
inc_edge = self.net.getEdge(inc_edge_id)
out_edge = self.net.getEdge(out_edge_id)
for conn in inc_edge.getConnections(out_edge):
index = conn.getTLLinkIndex()
if index >= 0:
self.node2init[node_id][index] = 'r'
# json 파일로 저장
with open(os.path.join(self.path_intermediates, 'node2init.json'), 'w') as file:
json.dump(self.node2init, file)
print('2-2. 초기화 신호가 지정되었습니다. (우회전 : g)')
# 2-3 유턴 인덱스 / 비보호좌회전 인덱스 지정
def assign_indices(self):
self.u2uindex = dict() # uturn node id to uturn index
for row in self.uturn.itertuples():
child_id = row.child_id
inc_edge_id = row.inc_edge_id
out_edge_id = row.out_edge_id
# self.u2uindex 지정
inc_edge = self.net.getEdge(inc_edge_id)
out_edge = self.net.getEdge(out_edge_id)
uturn_conn = inc_edge.getConnections(out_edge)[0]
self.u2uindex[child_id] = uturn_conn.getTLLinkIndex()
self.p2UPLindices2inc_edge_ids = dict() # parent id to unprotected left index to incoming_edge_ids
for parent_id in self.parent_ids:
init_state = self.node2init[parent_id]
# 우회전 이동류 인덱스
indices_right = [i for i in range(len(init_state)) if init_state[i]=='g']
# from-to가 지정된 이동류 인덱스
indices_assigned = []
m5 = self.match5[(self.match5.node_id==parent_id)].dropna(subset=['inc_edge_id', 'out_edge_id'])
for row in m5.itertuples():
inc_edge = self.net.getEdge(row.inc_edge_id)
out_edge = self.net.getEdge(row.out_edge_id)
conns = inc_edge.getConnections(out_edge)
indices = [conn for conn in conns if conn.getTLLinkIndex()>=0]
indices = [conn for conn in conns if conn.getJunctionIndex()>=0]
indices = [conn.getTLLinkIndex() for conn in conns]
indices_assigned.extend(indices)
# 좌회전 이동류 인덱스
indices_left = []
for row in self.turn_type[self.turn_type.turn_type=='left'].itertuples():
inc_edge = self.net.getEdge(row.inc_edge_id)
out_edge = self.net.getEdge(row.out_edge_id)
conns = inc_edge.getConnections(out_edge)
indices = [conn for conn in conns if conn.getTLLinkIndex()>=0]
indices = [conn for conn in conns if conn.getJunctionIndex()>=0]
indices = [conn.getTLLinkIndex() for conn in conns]
indices_left.extend(indices)
# 비보호좌회전 인덱스 (unprotected left index)
UPLindices2inc_edge_ids = list((set(range(len(init_state))) - set(indices_right) - set(indices_assigned)).intersection(indices_left))
self.p2UPLindices2inc_edge_ids[parent_id] = dict()
for UPLindex in UPLindices2inc_edge_ids:
node = self.net.getNode(parent_id)
conns = node.getConnections()
conns = [conn for conn in conns if conn.getTLLinkIndex() == UPLindex]
inc_edge_ids = [conn.getFrom().getID() for conn in conns]
self.p2UPLindices2inc_edge_ids[parent_id][UPLindex] = inc_edge_ids
print('2-3. 유턴 인덱스 / 비보호좌회전 인덱스를 지정했습니다.')
# 2-4 신호배정
def assign_signals(self):
# matching : 신호 배정
for i, row in self.matching.iterrows():
node_id = row.node_id
move_no = row.move_no
inc_edge_id = row.inc_edge_id
out_edge_id = row.out_edge_id
state_list = copy.deepcopy(self.node2init[node_id])
self.matching.at[i, 'state'] = ''.join(state_list)
if (pd.isna(inc_edge_id)) or (pd.isna(out_edge_id)):
continue
inc_edge = self.net.getEdge(inc_edge_id)
out_edge = self.net.getEdge(out_edge_id)
# 신호가 부여되어 있으면 (from, to가 존재하면) G 부여 (우회전 : g, 신호 : G, 그외 : r)
for conn in inc_edge.getConnections(out_edge):
index = conn.getTLLinkIndex()
if index >= 0:
state_list[index] = 'G'
self.matching.at[i, 'state'] = ''.join(state_list)
self.matching = self.matching.dropna(subset='state')
self.matching = self.matching.reset_index(drop=True)
self.matching = self.matching[['inter_no', 'node_id', 'move_no', 'inc_edge_id', 'out_edge_id', 'state', 'turn_type']]
# match6 : 신호 배정
for i, row in self.match6.iterrows():
node_id = row.node_id
move_no = row.move_no
inc_edge_id = row.inc_edge_id
out_edge_id = row.out_edge_id
state_list = copy.deepcopy(self.node2init[node_id])
self.match6.at[i, 'state'] = ''.join(state_list)
if (pd.isna(inc_edge_id)) or (pd.isna(out_edge_id)):
continue
inc_edge = self.net.getEdge(inc_edge_id)
out_edge = self.net.getEdge(out_edge_id)
# 신호가 부여되어 있으면 (from, to가 존재하면) G 부여 (우회전 : g, 신호 : G, 그외 : r)
for conn in inc_edge.getConnections(out_edge):
index = conn.getTLLinkIndex()
if index >= 0:
state_list[index] = 'G'
self.match6.at[i, 'state'] = ''.join(state_list)
# match6 : 비보호좌회전 신호 배정
for i, row in self.match6[self.match6.node_id.isin(self.parent_ids)].iterrows():
parent_id = row.node_id
state = row.state
UPLindices2inc_edge_ids = self.p2UPLindices2inc_edge_ids[parent_id]
for UPLindex in UPLindices2inc_edge_ids:
# 비보호좌회전 이동류에 대한 진입엣지에 신호가 부여되어 있으면
inc_edge_ids = UPLindices2inc_edge_ids[UPLindex]
if inc_edge_ids:
if inc_edge_id in inc_edge_ids:
# 해당 비보호좌회전 인덱스(UPLindex)에, 해당 진입엣지의 직진신호가 있을 때 g를 부여
state = state[:UPLindex] + 'g' + state[UPLindex+1:]
self.match6.at[i, 'state'] = state
else: # 직진신호가 없는 비보호좌회전 발생시 멈춤 및 오류메시지 출력
raise Exception(
f"비보호좌회전 신호를 부여할 수 없습니다. \
신호가 부여되어 있지 않은 직진 또는 좌회전 연결이 존재하는데\
(node_id : {parent_id}, index : {UPLindex})\
이 연결의 진입엣지(inc_edge_id : {inc_edge_id})에 부여된 신호가 없습니다.")
# match6 : 유턴 신호가 한번도 배정되지 않은 경우에 대해서는 유턴이동류의 신호를 항상 g로 배정
for node_id in self.uturn_ids:
m6 = self.match6[self.match6.node_id==node_id]
if not len(m6):
continue
state_list = copy.deepcopy(self.node2init[node_id])
state = ''.join(state_list)
uindex = self.u2uindex[node_id]
values_at_uindex = [state[uindex] for state in m6.state]
# 유턴신호가 한번도 배정되지 않았으면
uturn_assigned = ('G' in values_at_uindex)
if not uturn_assigned:
# 해당 유턴 인덱스(uindex)에 g를 항상 부여
state = state[:uindex] + 'g' + state[uindex+1:]
self.match6.loc[self.match6.node_id==node_id, 'state'] = state
self.match6 = self.match6.dropna(subset='state')
self.match6 = self.match6.reset_index(drop=True)
self.match6 = self.match6[['inter_no', 'node_id', 'phase_no', 'ring_type', 'move_no', 'inc_edge_id', 'out_edge_id', 'state', 'turn_type']]
self.match6.to_csv(os.path.join(self.path_intermediates, 'match6.csv'))
self.matching.to_csv(os.path.join(self.path_intermediates, 'matching.csv'), index=0)
print('2-4. 직진 및 좌회전(G)을 배정했습니다.')
# 2-5 기반파일 저장
def save_intermediates(self):
self.plan.to_csv(os.path.join(self.path_tables, 'plan.csv'), index=0)
Aplan = self.plan.copy()[['inter_no'] + [f'dura_A{j}' for j in range(1,9)] + ['cycle']]
grouped = Aplan.groupby('inter_no')
df = grouped.agg({'cycle': 'min'}).reset_index()
df = df.rename(columns={'cycle': 'min_cycle'})
df['num_cycle'] = 300 // df['min_cycle'] + 2
inter2num_cycles = dict(zip(df['inter_no'], df['num_cycle']))
node2num_cycles = {node_id : inter2num_cycles[self.node2inter[node_id]] for node_id in self.node_ids}
with open(os.path.join(self.path_intermediates,'node2num_cycles.json'), 'w') as file:
json.dump(node2num_cycles, file, indent=4)
print("2-5. node2num_cycles.json를 저장했습니다.")
# 3. 이슈사항 저장
def write_issues(self):
print('3. 이슈사항을 저장합니다.')
path_issues = os.path.join(self.path_results, "issues_preprocess_daily.txt")
with open(path_issues, "w", encoding="utf-8") as file:
for item in self.issues:
file.write(item + "\n")
if self.issues:
print("데이터 처리 중 발생한 특이사항은 다음과 같습니다. :")
for review in self.issues:
print(review)
def main(self):
# 1. 데이터 불러오기
self.load_data()
# 2. 중간산출물 만들기
self.get_intermediates()
# 3. 이슈사항 저장
self.write_issues()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c','--config_name', dest='config_name', type=str, default='revised')
parser.add_argument('-n','--file_net', dest='file_net', type=str, default='sn.net.xml')
args = parser.parse_args()
config_name = args.config_name
file_net = args.file_net
self = DailyPreprocessor(config_name=config_name, file_net=file_net)
self.main()