# python .\Scripts\generate_signals.py import pandas as pd import numpy as np import os, sys, json, argparse import sumolib, traci from tqdm import tqdm from datetime import datetime class SignalGenerator(): def __init__(self, config_name='test_0721', month=7, day=22, hour=9, minute=25): self.config_name = config_name self.month, self.day, self.hour, self.minute = month, day, hour, minute # 루트폴더 지정 self.path_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) with open(os.path.join(self.path_root, 'configs', f'config_{config_name}.json'), 'r') as config_file: config = json.load(config_file) # 주요 폴더 경로 지정 self.paths = config['paths'] self.path_data = os.path.join(self.path_root, *self.paths['data']) self.path_intermediates = os.path.join(self.path_root, *self.paths['intermediates']) self.path_results = os.path.join(self.path_root, *self.paths['results']) self.path_tables = os.path.join(self.path_root, *self.paths['tables']) self.path_networks = os.path.join(self.path_root, *self.paths['networks']) self.path_scripts = os.path.join(self.path_root, *self.paths['scripts']) # 이슈사항 목록 self.issues = [] self.midnight = int(datetime(2024, self.month, self.day, 0, 0, 0).timestamp()) self.next_day = int(datetime(2024, self.month, self.day+1, 0, 0, 0).timestamp()) self.fsecs = range(self.midnight, self.next_day, 5) # fsecs : unix time by Five SECondS self.fmins = range(self.midnight, self.next_day, 300) # fmins : unix time by Five MINuteS self.present_time = int(datetime(2024, self.month, self.day, self.hour, self.minute).timestamp()) self.present_time = max([fmin for fmin in list(self.fmins) if fmin <= self.present_time]) self.sim_timespan = 300 self.adder = 600 # 10분 : '현재시점 + 10분'에 가상신호를 생성하기 위함. self.subtractor = 1800 # 30분 : '현재시점 - 30분'의 신호이력을 가져온다. # 1. 데이터 준비 def prepare_data(self): print("1. 데이터를 준비합니다.") self.load_networks() self.time11 = datetime.now() self.load_tables() self.time12 = datetime.now() # self.check_networks() self.time13 = datetime.now() # self.check_tables() self.time14 = datetime.now() self.prepare_auxiliaries() self.time15 = datetime.now() # 1-1. 네트워크 불러오기 def load_networks(self): self.net = sumolib.net.readNet(os.path.join(self.path_networks, 'sn.net.xml')) print("1-1. 네트워크가 로드되었습니다.") # 1-2. 테이블 불러오기 def load_tables(self): # 모든 컬럼에 대하여 데이터타입 지정 loading_dtype = { 'inter_no':'int', 'start_hour':'int', 'start_minute':'int', 'cycle':'int','offset':'int', 'node_id':'str', 'inter_type':'str', 'parent_id':'str','child_id':'str', 'direction':'str', 'condition':'str', 'inc_edge':'str', 'out_edge':'str', 'end_unix':'int', 'inter_name':'str', 'inter_lat':'float', 'inter_lon':'float', 'group_no':'int', 'main_phase_no':'int', 'phase_no':'int','ring_type':'str' } for alph in ['A', 'B']: for j in range(1,9): loading_dtype[f'angle_{alph}{j}'] = 'str' loading_dtype[f'dura_{alph}{j}'] = 'int' # 테이블 불러오기 self.inter_info = pd.read_csv(os.path.join(self.path_tables, 'inter_info.csv'), dtype=loading_dtype) self.plan = pd.read_csv(os.path.join(self.path_tables, 'plan.csv'), dtype=loading_dtype) self.history = pd.read_csv(os.path.join(self.path_tables, 'history.csv'), dtype=loading_dtype).sort_values(by='end_unix') self.inter_node = pd.read_csv(os.path.join(self.path_tables, 'inter_node.csv'), dtype=loading_dtype) self.matching = pd.read_csv(os.path.join(self.path_intermediates, 'matching.csv'), dtype=loading_dtype) self.match1 = pd.read_csv(os.path.join(self.path_intermediates, 'match1.csv'), dtype=loading_dtype) self.match6 = pd.read_csv(os.path.join(self.path_intermediates, 'match6.csv'), dtype=loading_dtype) if self.config_name == 'draft': pass else: self.alphs = ['A', 'B'] # 컬럼명 변경 rename_cname_1 = {'CRSRD_ID':'inter_no', 'CRSRD_NM':'inter_name', 'CRSRD_TYPE':'inter_type', 'CYCL':'cycle', 'DAY':'DD', 'FLOW_NO':'move_no', 'GRP_NO':'group_no', 'HOUR':'hh', 'LGTD':'inter_lon', 'LTTD':'inter_lat', 'MAIN_CRSRD_ID':'parent_id', 'MAIN_PHASE':'main_phase', 'MIN':'mm', 'MNTH':'MM', 'NODE_ID':'node_id', 'OFFSET':'offset', 'PHASE':'phase_no', 'RING':'ring_type', 'RINGA_RED_SEC':'red_A', 'RINGA_YELLO_SEC':'yel_A', 'RINGB_RED_SEC':'red_B', 'RINGB_YELLO_SEC':'yel_B', 'SIGL_ANGLE':'angle_code', 'PLAN_NO':'plan_no', # 이상 preprocess_daily와 동일 'hh':'start_hour', 'mm':'start_minute'} # generate_signals에만 rename_cname_2 = {f'RING{alph}_PHASE{i}':f'dura_{alph}{i}' for alph in self.alphs for i in range(1,9)} rename_cname = {**rename_cname_1, **rename_cname_2} # 컬럼명 변경 적용 self.inter_info = self.inter_info.rename(columns=rename_cname) self.plan = self.plan.rename(columns=rename_cname) self.history = self.history.rename(columns=rename_cname) self.inter_node = self.inter_node.rename(columns=rename_cname) self.matching = self.matching.rename(columns=rename_cname) self.match1 = self.match1.rename(columns=rename_cname) self.match6 = self.match6.rename(columns=rename_cname) print("1-2. 테이블들이 로드되었습니다.") # 1-3. 네트워크 무결성 검사 def check_networks(self): # https://sumo.dlr.de/docs/Netedit/neteditUsageExamples.html#simplify_tls_program_state_after_changing_connections if 'SUMO_HOME' in os.environ: tools = os.path.join(os.environ['SUMO_HOME'], 'tools') if tools not in sys.path: sys.path.append(tools) else: raise EnvironmentError("please declare environment variable 'SUMO_HOME'") traci.start([sumolib.checkBinary('sumo'), "-n", os.path.join(self.path_networks, 'sn.net.xml')]) nodes = [node for node in self.net.getNodes() if node.getType()=='traffic_light'] for node in nodes: node_id = node.getID() from_xml = len([c for c in node.getConnections() if c.getTLLinkIndex() >= 0]) from_traci = len(traci.trafficlight.getRedYellowGreenState(node_id)) if from_xml != from_traci: sub = {'id': node_id, 'type': 'node', 'note': '유효하지 않은 연결이있음. netedit에서 clean states 필요.'} self.issues.append(sub) traci.close() print("1-3. 네트워크의 모든 clean state requirement들을 체크했습니다.") # 1-4. 테이블 무결성 검사 def check_tables(self): self.check_history() # 교차로정보, 방위각정보, 신호계획에 대해서는 preprocess_daily.py에서 # 무결성검사를 완료했으므로 여기에서는 따로 검사하지 않음. # self.check_moves() # 이동류번호에 대한 무결성검사 필요하나 아직 작성하지 않음. (24. 2. 5 화) print("1-4. 테이블들의 무결성 검사를 완료했습니다.") # 1-4-1. 신호이력(history) 검사 def check_history(self): # 1-4-1-1. inter_no 검사 # self.history.loc[0, 'inter_no'] = '4' # 에러 발생을 위한 코드 missing_inter_nos = set(self.history.inter_no) - set(self.inter_nos) if missing_inter_nos: msg = f"1-4-1-1. history의 inter_no 중 교차로 목록(inter_nos)에 포함되지 않는 항목이 있습니다: {missing_inter_nos}" self.issues.append(msg) # 1-4-1-2. 종료유닉스 검사 # self.history.loc[0, 'end_unix'] = 38.0 # 에러 발생을 위한 코드 self.min_unix, self.max_unix = int(datetime(2020, 1, 1).timestamp()), int(datetime(2038, 1, 1).timestamp()) for row in self.history.itertuples(index=True): unixbool = self.min_unix <= row['end_unix'] <= self.max_unix if not unixbool: msg = f"1-4-1-2. 적정 범위를 벗어난 유닉스시각(end_unix)이 존재합니다 : inter_no : {row['inter_no']}" self.issues.append(msg) # 1-4-1-3. 현시시간 검사 # self.history.loc[0, 'dura_A1'] = -2 # 에러 발생을 위한 코드 durations = self.history[[f'dura_{alph}{j}' for alph in ['A','B'] for j in range(1, 9)]] valid_indices = ((durations >= 0) & (durations <= 200)).all(axis=1) invalid_inter_nos = sorted(self.history[~ valid_indices].inter_no.unique()) if invalid_inter_nos: msg = f"1-4-1-3. 음수이거나 200보다 큰 현시시간이 존재합니다. : {invalid_inter_nos}" # 1-5. 보조 딕셔너리, 데이터프레임, 리스트 등 만들기 def prepare_auxiliaries(self): # inter2node : a dictionary that maps inter_no to the node_id inter_node_p = self.inter_node[self.inter_node.inter_type=='parent'] self.inter2node = dict(zip(inter_node_p['inter_no'], inter_node_p['node_id'])) self.node2inter = dict(zip(self.inter_node['node_id'], self.inter_node['inter_no'])) # split, isplit : A,B 분리 혹은 통합시 사용될 수 있는 딕셔너리 self.splits = {} # splits maps (inter_no, start_hour, start_minute) to split for i, row in self.plan.iterrows(): inter_no = row.inter_no start_hour = row.start_hour start_minute = row.start_minute cycle = row.cycle dura_A = np.array(row[[f'dura_A{j}' for j in range(1, 9)]]) dura_B = np.array(row[[f'dura_B{j}' for j in range(1, 9)]]) cums_A = dura_A.cumsum() cums_B = dura_B.cumsum() combined_row = np.unique(np.concatenate((cums_A,cums_B))) detailed_durations = np.concatenate(([combined_row[0]], np.diff(combined_row))) self.splits[(inter_no, start_hour, start_minute)] = {} # split maps (phas_A, phas_B) to k ja = 0 jb = 0 for k in range(len(detailed_durations)): dura_A[ja] -= detailed_durations[k] dura_B[jb] -= detailed_durations[k] self.splits[(inter_no, start_hour, start_minute)][(ja+1, jb+1)] = k+1 if dura_A[ja] == 0: ja += 1 if dura_B[jb] == 0: jb += 1 self.isplits = {} # the inverse of splits for i in self.splits: self.isplits[i] = {self.splits[i][k]:k for k in self.splits[i]} # isplit maps k to (phas_A, phas_B) # timetable : 교차로별 프로그램 시작시각 self.timetable = self.plan[['start_hour', 'start_minute']].drop_duplicates() self.timetable['start_seconds'] = self.midnight + self.timetable['start_hour'] * 3600 + self.timetable['start_minute'] * 60 with open(os.path.join(self.path_intermediates, 'ids.json'), 'r') as file: ids = json.load(file) self.inter_nos = ids['inter_nos'] self.node_ids = ids['node_ids'] self.parent_ids = ids['parent_ids'] self.child_ids = ids['child_ids'] self.uturn_ids = ids['uturn_ids'] self.coord_ids = ids['coord_ids'] # A dictionary that maps parent_id to a list of child_ids self.pa2ch = {} for parent_id in self.parent_ids: inter_no = self.node2inter[parent_id] self.pa2ch[parent_id] = list(self.inter_node[(self.inter_node.inter_no==inter_no) & (self.inter_node.inter_type=='child')].node_id) # node2num_cycles : A dictionary that maps a node_id to the number of cycles with open(os.path.join(self.path_intermediates, 'node2num_cycles.json'), 'r') as file: # json.load() 함수를 사용해 파일 내용을 Python 딕셔너리로 불러옵니다. self.node2num_cycles = json.load(file) # 초기화신호 불러오기 with open(os.path.join(self.path_intermediates, 'node2init.json'), 'r') as file: self.node2init = json.load(file) self.plan_set = self.plan.set_index(['inter_no','start_hour','start_minute']) print("1-5. 필요한 보조 객체들이 모두 준비되었습니다.") # 2. 신호이력 전처리 def process_history(self): print("2. 신호이력 테이블을 변환합니다.") self.make_rhistory() self.time21 = datetime.now() self.make_rhists() self.time22 = datetime.now() self.make_hrhists() self.time23 = datetime.now() # 2-1. rhistory def make_rhistory(self): # 1. 조회시점의 유닉스 타임 이전의 신호이력 수집 self.rhistory = self.history.copy() # recent history self.rhistory = self.rhistory[(self.rhistory.end_unix <= self.present_time) & (self.rhistory.end_unix > self.present_time - self.subtractor)] # rhistory에 모든 교차로번호가 존재하지 않으면 해당 교차로번호에 대한 신호이력을 추가함 (at 최근 프로그램 시작시각) whole_inter_nos = set(self.history.inter_no.unique()) recent_inter_nos = set(self.rhistory.inter_no.unique()) if not whole_inter_nos==recent_inter_nos: for inter_no in whole_inter_nos - recent_inter_nos: program_start, prow = self.load_prow(inter_no, self.present_time - self.subtractor) cycle = prow.cycle.iloc[0] row1 = prow.copy() row2 = prow.copy() # prow에서 필요한 부분을 rhistory에 추가 row1['end_unix'] = program_start row2['end_unix'] = program_start + cycle self.rhistory = pd.concat([self.rhistory, row1, row2])#.reset_index(drop=True) # present_time + adder 의 시각에 한 주기의 신호 추가 for inter_no in set(whole_inter_nos): program_start, prow = self.load_prow(inter_no, self.present_time) cycle = prow.cycle.iloc[0] row3 = prow.copy() # prow에서 필요한 부분을 rhistory에 추가 row3['end_unix'] = self.present_time + self.adder self.rhistory = pd.concat([self.rhistory, row3])#.reset_index(drop=True) # 2. 시작 유닉스 타임컬럼 생성 후 종류 유닉스 타임에서 현시별 현시기간 컬럼의 합을 뺀 값으로 입력 # - 현시시간의 합을 뺀 시간의 +- 10초 이내에 이전 주기정보가 존재하면 그 유닉스 시간을 시작 유닉스시간 값으로 하고, 존재하지 않으면 현시시간의 합을 뺀 유닉스 시간을 시작 유닉스 시간으로 지정 for i, row in self.rhistory.iterrows(): inter_no = row.inter_no end_unix = row.end_unix elapsed_time = row[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]].sum() // 2 # 현시시간 합 # 이전 유닉스 존재하지 않음 : 현시시간 합의 차 start_unix = end_unix - elapsed_time pre_rows = self.history[:i] # previous rows if inter_no in pre_rows.inter_no.unique(): # 이전 유닉스 존재 pre_unix = pre_rows[pre_rows.inter_no == inter_no]['end_unix'].iloc[-1] # previous unix time # 이전 유닉스 존재, abs < 10 : 이전 유닉스 if abs(pre_unix - start_unix) < 10: start_unix = pre_unix # 이전 유닉스 존재, abs >=10 : 현시시간 합의 차 else: pass self.rhistory.loc[i, 'start_unix'] = start_unix self.rhistory[self.rhistory.isna()] = 0 self.rhistory['start_unix'] = self.rhistory['start_unix'].astype(int) self.rhistory = self.rhistory[['inter_no', 'start_unix'] + [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)] + ['cycle']] def load_prow(self, inter_no, time): ''' load the planned row ''' # 프로그램 시작시각 program_starts = np.array(self.timetable.start_seconds) idx = (program_starts <= time).sum() - 1 program_start = program_starts[idx] # 최근 프로그램 시작시각에 대한 신호계획 start_hour = self.timetable.iloc[idx].start_hour start_minute = self.timetable.iloc[idx].start_minute # prow = self.plan[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row prow = self.plan_set.loc[(inter_no, start_hour, start_minute)] prow = pd.DataFrame([prow],index=[0]) prow['inter_no'] = inter_no return program_start, prow # 2-2. rhists def make_rhists(self): self.rhists = [] for inter_no in self.rhistory.inter_no.unique(): self.rhist = self.rhistory.copy()[self.rhistory.inter_no==inter_no] self.rhist = self.rhist.drop_duplicates(subset=['start_unix']).reset_index(drop=True) # D_n 및 S_n 값 정의 self.rhist['D_n'] = 0 # D_n : 시간차이 self.rhist['S_n'] = 0 # S_n : 현시시간합 for n in range(len(self.rhist)): curr_unix = self.rhist.iloc[n].start_unix # current start_unix self.rhist.loc[n, ['D_n', 'S_n']] = self.calculate_DS(self.rhist, curr_unix) # 이전시각, 현재시각 prev_unix = self.rhist.loc[0, 'start_unix'] # previous start_unix curr_unix = self.rhist.loc[1, 'start_unix'] # current start_unix # rhist의 마지막 행에 도달할 때까지 반복 while True: n = self.rhist[self.rhist.start_unix==curr_unix].index[0] cycle = self.rhist.loc[n, 'cycle'] D_n = self.rhist.loc[n, 'D_n'] S_n = self.rhist.loc[n, 'S_n'] # 참값인 경우 if (abs(D_n - S_n) <= 5): pass # continue 쓰면 indentation을 하나 줄일 수 있을 것 같음 (2024. 04. 18) 실험은 안해봄. 나중에 적용하면 좋을지도. # 참값이 아닌 경우 else: # 2-1-1. 결측치 처리 : 인접한 두 start_unix의 차이가 계획된 주기의 두 배보다 크면 결측이 일어났다고 판단, 신호계획의 현시시간으로 "대체" if curr_unix - prev_unix >= 2 * cycle: # prev_unix를 계획된 주기만큼 늘려가면서 한 행씩 채워나간다. # (curr_unix와의 차이가 계획된 주기보다 작거나 같아질 때까지) while curr_unix - prev_unix > cycle: prev_unix += cycle # 신호 계획(prow) 불러오기 start_seconds = np.array(self.timetable.start_seconds) idx = (start_seconds <= prev_unix).sum() - 1 start_hour = self.timetable.iloc[idx].start_hour start_minute = self.timetable.iloc[idx].start_minute prow = self.plan.copy()[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row # prow에서 필요한 부분을 rhist에 추가 prow['start_unix'] = prev_unix prow = prow.drop(['start_hour', 'start_minute', 'offset'], axis=1) cycle = prow.iloc[0].cycle self.rhist = pd.concat([self.rhist, prow]) self.rhist = self.rhist.sort_values(by='start_unix').reset_index(drop=True) n += 1 # 2-1-2. 이상치 처리 : 비율에 따라 해당 행을 "삭제"(R_n <= 0.5) 또는 "조정"(R_n > 0.5)한다 R_n = (curr_unix - prev_unix) / cycle # R_n : 비율 # R_n이 0.5보다 작거나 같으면 해당 행을 삭제 if R_n <= 0.5: self.rhist = self.rhist.drop(index=n).reset_index(drop=True) if n >= self.rhist.index[-1]: break # 행삭제에 따른 curr_unix, R_n 재정의 curr_unix = self.rhist.loc[n, 'start_unix'] R_n = (curr_unix - prev_unix) / cycle # R_n : 비율 # R_n이 0.5보다 크면 해당 행 조정 (비율을 유지한 채로 현시시간 대체) if R_n > 0.5: # 신호 계획(prow) 불러오기 start_seconds = np.array(self.timetable.start_seconds) idx = (start_seconds <= curr_unix).sum() - 1 start_hour = self.timetable.iloc[idx].start_hour start_minute = self.timetable.iloc[idx].start_minute prow = self.plan[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row # 조정된 현시시간 (prow에 R_n을 곱하고 정수로 바꿈) adjusted_dur = prow.copy()[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] * R_n int_parts = adjusted_dur.iloc[0].apply(lambda x: int(x)) frac_parts = adjusted_dur.iloc[0] - int_parts difference = round(adjusted_dur.iloc[0].sum()) - int_parts.sum() for _ in range(difference): # 소수 부분이 가장 큰 상위 'difference'개의 값에 대해 올림 처리 max_frac_index = frac_parts.idxmax() int_parts[max_frac_index] += 1 frac_parts[max_frac_index] = 0 # 이미 처리된 항목은 0으로 설정 # rhist에 조정된 현시시간을 반영 self.rhist.loc[n, [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] = int_parts.values self.rhist.loc[n, 'cycle'] = int_parts.sum().sum() // 2 if n >= self.rhist.index[-1]: break prev_unix = curr_unix curr_unix = self.rhist.loc[n+1, 'start_unix'] # D_n 및 S_n 값 재정의 # 이 함수의 검증시 필요하나 전체 구동에는 필요없으므로 comment해놓음 # for n in range(len(self.rhist)): # curr_unix = self.rhist.iloc[n].start_unix # current start_unix # self.rhist.loc[n, ['D_n', 'S_n']] = self.calculate_DS(self.rhist, curr_unix) self.rhists.append(self.rhist) self.rhists = pd.concat(self.rhists)#.sort_values(by=['start_unix','inter_no']) self.rhists = self.rhists[self.rhists.start_unix >= self.present_time - self.subtractor // 2] def calculate_DS(self, rhist, curr_unix): # program_starts = np.array(self.timetable.start_seconds) # idx = (program_starts <= self.present_time).sum() - 1 # program_start = program_starts[idx] # if list(self.hours[self.hours <= curr_unix]): # ghour_lt_curr_unix = self.hours[self.hours <= curr_unix].max() # the greatest hour less than or equal to curr_unix # else: # ghour_lt_curr_unix = program_start # start_unixes = rhist.start_unix.unique() # start_unixes_lt_ghour = np.sort(start_unixes[start_unixes < ghour_lt_curr_unix]) # start unixes less than ghour_lt_curr_unix # # 기준유닉스(base_unix) : curr_unix보다 작은 hour 중에서 가장 큰 값으로부터 다섯 번째로 작은 start_unix # if len(start_unixes_lt_ghour) > 5: # base_unix = start_unixes_lt_ghour[-5] # # start_unixes_lt_ghour의 길이가 5 미만일 경우에는 맨 앞 start_unix로 base_unix를 지정 # else: # base_unix = rhist.start_unix.min() base_unix = curr_unix - self.subtractor // 2 abs_diff = (self.rhist['start_unix'] - base_unix).abs() closest_index = abs_diff.idxmin() base_unix = self.rhist.loc[closest_index, 'start_unix'] D_n = curr_unix - base_unix S_n_durs = rhist[(rhist.start_unix > base_unix) & (rhist.start_unix <= curr_unix)] \ [[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] S_n = S_n_durs.values.sum() // 2 return D_n, S_n # 2-3. hrhists def make_hrhists(self): # 계층화된 형태로 변환 self.hrhists = [] # hierarchied recent history for row in self.rhists.itertuples(index=True): inter_no = row.inter_no start_unix = row.start_unix ind = (self.timetable['start_seconds'] <= row.start_unix).sum() - 1 start_hour = self.timetable.iloc[ind].start_hour start_minute = self.timetable.iloc[ind].start_minute self.isplit = self.isplits[(inter_no, start_hour, start_minute)] phas_As = [self.isplit[j][0] for j in self.isplit.keys()] phas_Bs = [self.isplit[j][1] for j in self.isplit.keys()] # durs_A = row[[f'dura_A{j}' for j in range(1,9)]] # durs_B = row[[f'dura_B{j}' for j in range(1,9)]] durs_A = [getattr(row, f'dura_A{j}') for j in range(1, 9)] durs_B = [getattr(row, f'dura_B{j}') for j in range(1, 9)] durations = [] for j in range(1, len(self.isplit)+1): ja = self.isplit[j][0] jb = self.isplit[j][1] if ja == jb: durations.append(min(durs_A[ja-1], durs_B[jb-1])) else: durations.append(abs(durs_A[ja-1] - durs_B[ja-1])) new_rows = pd.DataFrame({'inter_no':[inter_no] * len(durations), 'start_unix':[start_unix] * len(durations), 'phas_A':phas_As, 'phas_B':phas_Bs, 'duration':durations}) self.hrhists.append(new_rows) self.hrhists = pd.concat(self.hrhists) # self.hrhists = self.hrhists.sort_values(by = ['start_unix', 'inter_no', 'phas_A', 'phas_B']).reset_index(drop=True) # 3. 이동류정보 전처리 def process_movement(self): print("3. 이동류정보 테이블을 변환합니다.") self.make_movement() self.time31 = datetime.now() self.update_movement() self.time32 = datetime.now() # 3-1. movement def make_movement(self): # - 아래 절차를 5초마다 반복 for fsec in range(self.present_time - self.sim_timespan, self.present_time + 1, 5): # fsec : unix time by Five SECond # 1. 상태 테이블 조회해서 전체 데이터중 필요데이터(교차로번호, A링 현시번호, A링 이동류번호, B링 현시번호, B링 이동류번호)만 수집 : A move = pd.read_csv(os.path.join(self.path_tables, 'move', f'move_{fsec}.csv'), index_col=0) # 2. 이력 테이블 조회해서 교차로별로 유닉스시간 최대인 데이터(교차로번호, 종료유닉스타임)만 수집 : B recent_histories = [group.iloc[-1:] for _, group in self.history[self.history['end_unix'] < fsec].groupby('inter_no')] # 교차로별로 유닉스시간이 최대인 행들 if not recent_histories: rhistory = pd.DataFrame({'inter_no':[], 'end_unix':[]}) # recent history else: rhistory = pd.concat(recent_histories) recent_unix = rhistory[['inter_no', 'end_unix']] # 3. 상태 테이블 조회정보(A)와 이력 테이블 조회정보(B) 조인(키값 : 교차로번호) : C move = pd.merge(move, recent_unix, how='left', on='inter_no') move['end_unix'] = move['end_unix'].fillna(0).astype(int) # 4. C데이터 프레임에 신규 컬럼(시작 유닉스타임) 생성 후 종료유닉스 타임 값 입력, 종료 유닉스 타임 컬럼 제거 move = move.rename(columns = {'end_unix':'start_unix'}) # 5. 이동류 이력정보 READ # - CSV 파일로 서버에 저장된 이동류정보를 읽어옴(파일이 없는 경우에는 데이터가 없는 프레임 D 생성) try: if isinstance(movement, pd.DataFrame): # movement가 존재할 경우 그걸 그대로 씀. pass else: movement = pd.DataFrame() except NameError: # movement가 존재하지 않는 경우 생성 movement = pd.DataFrame() # 6. 이동류 이력정보 데이터테이블(D)에 C데이터 add movement = pd.concat([movement, move]) # 7. D데이터 프레임에서 중복데이터 제거(교차로번호, 시작 유닉스타임, A링 현시번호, B링 현시번호 같은 행은 제거) movement = movement.drop_duplicates(['inter_no','phas_A','phas_B','start_unix']) # 8. D데이터 보관 시간 기준시간을 시작 유닉스 타임의 최대값 - self.subtractor // 2을 값으로 산출하고, 보관 시간 기준시간보다 작은 시작 유닉스 타임을 가진 행은 모두 제거(1시간 데이터만 보관) movement = movement[movement.start_unix > fsec - self.subtractor // 2] # movement = movement.sort_values(by=['start_unix','inter_no','phas_A','phas_B']).reset_index(drop=True) try: self.movement = pd.read_csv(os.path.join(self.path_intermediates, 'movement', f'movement_{self.present_time}.csv'), index_col=0) except FileNotFoundError: self.movement = movement # 3-2. movement_updated def update_movement(self): # 중복을 제거하고 (inter_no, start_unix) 쌍을 만듭니다. hrhists_inter_unix = set(self.hrhists[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None)) movement_inter_unix = set(self.movement[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None)) # hrhists에는 있지만 movement에는 없는 (inter_no, start_unix) 쌍을 찾습니다. missing_in_movement = hrhists_inter_unix - movement_inter_unix # 새로운 행들을 생성합니다. new_rows = [] if missing_in_movement: for inter_no, start_unix in missing_in_movement: # match1에서 해당 inter_no의 데이터를 찾습니다. new_row = self.match1[self.match1['inter_no'] == inter_no].copy() # start_unix 값을 설정합니다. new_row['start_unix'] = start_unix new_rows.append(new_row) # 새로운 데이터프레임을 생성하고 기존 movement 데이터프레임과 합칩니다. new_movement = pd.concat(new_rows, ignore_index=True) self.movement_updated = pd.concat([self.movement, new_movement], ignore_index=True) else: self.movement_updated = self.movement # 4. 통합테이블 생성 def make_histids(self): print("4. 통합 테이블을 생성합니다.") self.merge_dfs() self.time41 = datetime.now() self.assign_signals() self.time42 = datetime.now() self.attach_children() self.time43 = datetime.now() # 4-1. movedur : movements and durations def merge_dfs(self): self.movedur = pd.merge(self.hrhists, self.movement_updated, how='inner', on=['inter_no', 'start_unix', 'phas_A', 'phas_B']) self.movedur = self.movedur[['inter_no', 'start_unix', 'phas_A', 'phas_B', 'move_A', 'move_B', 'duration']] # 4-2. histid def assign_signals(self): self.histid = self.movedur.copy() self.histid['node_id'] = self.histid['inter_no'].map(self.inter2node) histid_start = self.present_time - 600 self.histid = self.histid[self.histid.start_unix > histid_start] mapping_dict = self.matching.set_index(['node_id', 'move_no'])['state'].to_dict() # matching : 가능한 모든 (노드id, 이동류번호)에 대한 신호 * 시차제와 연관 有 for i, row in self.histid.iterrows(): node_id = row.node_id move_A = row.move_A move_B = row.move_B # A링의 state 지정 if (node_id, move_A) in mapping_dict: state_A = mapping_dict[(node_id, move_A)] else: state_A = ''.join(self.node2init[node_id]) self.histid.at[i, 'state_A'] = state_A # B링의 state 지정 if (node_id, move_B) in mapping_dict: state_B = mapping_dict[(node_id, move_B)] else: state_B = ''.join(self.node2init[node_id]) self.histid.at[i, 'state_B'] = state_B # 4-3. histids def attach_children(self): new_histids = [] for parent_id in self.parent_ids: for child_id in self.pa2ch[parent_id]: new_histid = self.histid.copy()[self.histid.node_id==parent_id].drop(columns=['state_A', 'state_B']) # new_histid[['inc_edge_A', 'out_edge_A', 'inc_edge_B', 'out_edge_B']] = np.nan for i, row in new_histid.iterrows(): phas_A = row.phas_A phas_B = row.phas_B new_match = self.match6[self.match6.node_id==child_id] # match6 : (노드id, 현시, 링)에 대한 (이동류번호, 신호) Arow = new_match[(new_match.phase_no==phas_A) & (new_match.ring_type=='A')] # A링의 state 지정 state_A = Arow.iloc[0].state new_histid.at[i, 'state_A'] = state_A # B링의 state 지정 Brow = new_match[(new_match.phase_no==phas_B) & (new_match.ring_type=='B')] state_B = Brow.iloc[0].state new_histid.at[i, 'state_B'] = state_B new_histid.at[i, 'node_id'] = child_id new_histids.append(new_histid) new_histids = pd.concat(new_histids) self.histids = pd.concat([self.histid.copy(), new_histids]) self.histids = self.histids.sort_values(by=['start_unix', 'node_id', 'phas_A', 'phas_B']).reset_index(drop=True) self.histids = self.histids[['inter_no', 'node_id', 'start_unix', 'phas_A', 'phas_B', 'move_A', 'move_B', 'duration', 'state_A', 'state_B']] # 5. 신호 생성 def get_signals(self): print("5. 신호를 생성합니다.") self.set_timepoints() self.time51 = datetime.now() self.assign_red_yellow() self.time52 = datetime.now() self.make_tl_file() self.time53 = datetime.now() # 5-1. 신호 파일의 시작 및 종료시각 설정 def set_timepoints(self): self.offsets = {} self.sigtable = [] sim_start = self.present_time - self.sim_timespan for node_id, group in self.histids.groupby('node_id'): lsbs = group[group['start_unix'] < sim_start]['start_unix'].max() # the last start_unix before sim_start self.offsets[node_id] = lsbs - sim_start group = group[group.start_unix >= lsbs] start_unixes = np.array(group.start_unix) start_unixes = np.sort(np.unique(start_unixes))[:self.node2num_cycles[node_id]] group = group[group.start_unix.isin(start_unixes)] self.sigtable.append(group) self.sigtable = pd.concat(self.sigtable).reset_index(drop=True) self.sigtable['phase_sumo'] = self.sigtable.groupby(['node_id', 'start_unix']).cumcount() # 5-2. 적색 및 황색신호 부여 def assign_red_yellow(self): ''' 적색, 황색신호를 반영한 신호문자열 배정 input : sigtable - 모든 교차로에 대한 (시작유닉스, 세부현시번호)별 현시시간, 신호문자열, 진입·진출엣지 * 세부현시란 오버랩을 반영한 현시번호를 뜻함. output : SIGTABLE - 모든 교차로에 대한 (시작유닉스, 녹황적세부현시번호)별 현시시간, (황·적색신호가 포함된) 신호문자열 * 녹황적세부현시번호란 세부현시번호에 r, g, y 옵션까지 포함된 현시번호를 뜻함. ''' self.SIGTABLE = [] for node_id in self.node_ids: sig = self.sigtable.query('node_id==@node_id') for i, row in sig.iterrows(): inter_no = row.inter_no phas_A = row.phas_A phas_B = row.phas_B start_unix = row.start_unix prow = self.load_prow(inter_no, start_unix)[1].iloc[0] red_A = prow[f'red_A{phas_A}'] yellow_A = prow[f'yellow_A{phas_A}'] red_B = prow[f'red_B{phas_B}'] yellow_B = prow[f'yellow_B{phas_B}'] sig.loc[i, ['red_A', 'red_B', 'yellow_A', 'yellow_B']] = red_A, red_B, yellow_A, yellow_B sig = sig.astype({'red_A': int, 'red_B': int, 'yellow_A': int, 'yellow_B': int, 'phas_A':str, 'phas_B':str}) sig = sig.drop(['move_A','move_B'], axis=1) sig_A = sig[['start_unix', 'phas_A', 'duration', 'state_A', 'red_A', 'yellow_A']].reset_index(drop=True) sig_B = sig[['start_unix', 'phas_B', 'duration', 'state_B', 'red_B', 'yellow_B']].reset_index(drop=True) csig_A = self.cumulate(sig_A, 'A') csig_B = self.cumulate(sig_B, 'B') SIG = pd.merge(csig_A, csig_B, on=['start_time', 'start_unix'], how='outer') SIG = SIG.sort_values(by='start_time').reset_index(drop=True) SIG[['phas_A', 'state_A']] = SIG[['phas_A', 'state_A']].fillna(method='ffill') SIG[['phas_B', 'state_B']] = SIG[['phas_B', 'state_B']].fillna(method='ffill') SIG['phase'] = SIG['phas_A'] + "_" + SIG['phas_B'] SIG['node_id'] = node_id SIG = SIG[['node_id', 'start_unix', 'start_time', 'phase', 'state_A', 'state_B']] SIG['duration'] = SIG['start_time'].shift(-1) - SIG['start_time'] SIG = SIG[:-1] SIG['duration'] = SIG['duration'].astype(int) for row in SIG.itertuples(): state = '' for a, b, in zip(row.state_A, row.state_B): if a == 'r': state += b elif b == 'r': state += a elif a == b: state += a else: raise ValueError(f"예상되지 않은 조합 발생: a={a}, b={b}") SIG.at[row.Index, 'state'] = state SIG = SIG[SIG.duration!=0] SIG = SIG.drop(columns=['start_time', 'state_A', 'state_B']) self.SIGTABLE.append(SIG) self.SIGTABLE = pd.concat(self.SIGTABLE) # 5-2-1 helper function of 5-2 def get_red(self, pre_state:str, cur_state:str): assert len(pre_state) == len(cur_state), "cur_state, nex_state의 길이가 서로 다릅니다." state_r = '' for p, c in zip(pre_state, cur_state): if p == c: state_r += p elif (p == 'r') and (c == 'G'): state_r += 'r' elif (p == 'G') and (c == 'r'): state_r += 'r' else: raise ValueError(f"예상치 못한 신호조합: previous={p}, current={c}") return state_r # 5-2-2 helper function of 5-2 def get_yellow(self, cur_state:str, nex_state:str): assert len(cur_state) == len(nex_state), "cur_state, nex_state의 길이가 서로 다릅니다." state_y = '' for c, n in zip(cur_state, nex_state): if c == n: state_y += c elif (c == 'r') and (n == 'G'): state_y += 'r' elif (c == 'G') and (n == 'r'): state_y += 'y' else: print(c, n) print(cur_state, nex_state) raise ValueError(f"예상치 못한 신호조합: current={c}, next={n}") return state_y # 5-2-3 helper function of 5-2 def cumulate(self, sig, alph): csig = [] # cumulated sig pre = pd.Series({f'phas_{alph}':None}) start_time = 0 elapsed = 0 for i, cur in sig.iterrows(): start_unix = cur.start_unix # pre, nex if i != 0: pre = sig.iloc[i-1] if i != len(sig) - 1: nex = sig.iloc[i+1] # duration if cur[f'phas_{alph}'] == nex[f'phas_{alph}']: continue if cur[f'phas_{alph}'] == pre[f'phas_{alph}']: duration = cur.duration + pre.duration else: duration = cur.duration start_times = [] states = [] phases = [] # red if i != 0: start_time += elapsed start_times.append(start_time) states.append(self.get_red(pre[f'state_{alph}'], cur[f'state_{alph}'])) phases.append(f'{cur[f"phas_{alph}"]}r') elapsed = cur[f'red_{alph}'] # green if i == 0: start_time = 0 else: start_time += elapsed start_times.append(start_time) states.append(cur[f'state_{alph}']) phases.append(f'{cur[f"phas_{alph}"]}g') if i == 0: elapsed = duration - cur[f'yellow_{alph}'] else: elapsed = duration - cur[f'yellow_{alph}'] - cur[f'red_{alph}'] # yellow if i != len(sig) - 1: start_time += elapsed start_times.append(start_time) states.append(self.get_yellow(cur[f'state_{alph}'], nex[f'state_{alph}'])) phases.append(f'{cur[f"phas_{alph}"]}y') elapsed = cur[f'yellow_{alph}'] sig_ = pd.DataFrame({'start_time':start_times, f'phas_{alph}':phases, f'state_{alph}':states}) sig_['start_unix'] = start_unix csig.append(sig_) csig = pd.concat(csig).reset_index(drop=True) return csig # 5-3. 신호파일 생성 def make_tl_file(self): strings = ['\n'] for node_id, group in self.SIGTABLE.groupby('node_id'): strings.append(f' \n') for row in group.itertuples(index=True): duration = row.duration state = row.state strings.append(f' \n') strings.append(' \n') strings.append('') strings = ''.join(strings) # 저장 self.path_output = os.path.join(self.path_results, f'sn_{self.present_time}.add.xml') with open(self.path_output, 'w') as f: f.write(strings) # 6. 이슈사항 저장 def write_issues(self): print('6. 이슈사항을 저장합니다.') path_issues = os.path.join(self.path_results, "issues_generate_signals.txt") with open(path_issues, "w", encoding="utf-8") as file: for item in self.issues: file.write(item + "\n") if self.issues: print("데이터 처리 중 발생한 특이사항은 다음과 같습니다. :") for review in self.issues: print(review) def main(self): self.time0 = datetime.now() # 1. 데이터 준비 self.prepare_data() self.time1 = datetime.now() # 2. 신호이력 전처리 self.process_history() self.time2 = datetime.now() # 3. 이동류정보 전처리 self.process_movement() self.time3 = datetime.now() # 4. 통합테이블 생성 self.make_histids() self.time4 = datetime.now() # 5. 신호 생성 self.get_signals() self.time5 = datetime.now() # 6. 이슈사항 저장 self.write_issues() self.time6 = datetime.now() print('(1)', self.time1 - self.time0) print('(1-1)', self.time11 - self.time0) print('(1-2)', self.time12 - self.time11) print('(1-3)', self.time13 - self.time12) print('(1-4)', self.time14 - self.time13) print('(1-5)', self.time15 - self.time14) print('(2)', self.time2 - self.time1) print('(2-1)', self.time21 - self.time1) print('(2-2)', self.time22 - self.time21) print('(2-3)', self.time23 - self.time22) print('(3)', self.time3 - self.time2) print('(3-1)', self.time31 - self.time2) print('(3-2)', self.time32 - self.time31) print('(4)', self.time4 - self.time3) print('(4-1)', self.time41 - self.time3) print('(4-2)', self.time42 - self.time41) print('(4-2)', self.time43 - self.time42) print('(5)', self.time5 - self.time4) print('(5-1)', self.time51 - self.time4) print('(5-2)', self.time52 - self.time51) print('(5-3)', self.time53 - self.time52) print('(6)', self.time6 - self.time5) print('total time :', self.time6 - self.time0) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-c','--config_name', dest='config_name', type=str, default='test_0721') parser.add_argument('-M','--month', dest='month', type=int, default=7) parser.add_argument('-D','--day', dest='day', type=int, default=22) parser.add_argument('-H','--hour', dest='hour', type=int, default=9) parser.add_argument('-m','--minute', dest='minute', type=int, default=20) args = parser.parse_args() config_name = args.config_name month = args.month day = args.day hour = args.hour minute = args.minute self = SignalGenerator(config_name=config_name, month=month, day=day, hour=hour, minute=minute) self.main() # self.path_unit = os.path.join(self.path_root, 'Analysis', '0207_unit_test') # self.hrhists.to_csv(os.path.join(self.path_unit, 'hrhists.csv')) # self.histids.to_csv(os.path.join(self.path_unit, 'histids.csv')) # self.sigtable.to_csv(os.path.join(self.path_unit, 'sigtable.csv')) # self.Sigtable.to_csv(os.path.join(self.path_unit, 'ssigtable.csv')) # print("elapsed time :", datetime.now() - starting_time)