신호생성 repo (24. 1. 5 ~).
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

786 lines
42 KiB

import pandas as pd
import numpy as np
import os, sys
import json
import copy
from tqdm import tqdm
import sumolib, traci
from datetime import datetime
class SignalGenerator():
def __init__(self):
self.path_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.issues = []
self.midnight = int(datetime(2024, 1, 5, 0, 0, 0).timestamp())
self.next_day = int(datetime(2024, 1, 6, 0, 0, 0).timestamp())
self.fsecs = range(self.midnight, self.next_day, 5) # fsecs : unix time by Five SECondS
self.fmins = range(self.midnight, self.next_day, 300) # fmins : unix time by Five MINuteS
self.present_time = datetime.now().replace(month=1, day=5).timestamp()
print(self.present_time)
self.present_time = max([fmin for fmin in list(self.fmins) if fmin <= self.present_time])
self.adder = 600
# 1. 데이터 준비
def prepare_data(self):
print("1. 데이터를 준비합니다.")
self.load_networks()
self.load_tables()
self.check_networks()
self.check_tables()
self.prepare_auxiliaries()
# 1-1. 네트워크 불러오기
def load_networks(self):
self.net = sumolib.net.readNet(os.path.join(self.path_root, 'Data', 'networks', 'sn.net.xml'))
print("1-1. 네트워크가 로드되었습니다.")
# 1-2. 테이블 불러오기
def load_tables(self):
# 모든 컬럼에 대하여 데이터타입 지정
loading_dtype = {
'inter_no':'int', 'start_hour':'int', 'start_minute':'int', 'cycle':'int','offset':'int',
'node_id':'str', 'inter_type':'str', 'parent_id':'str','child_id':'str',
'direction':'str', 'condition':'str', 'inc_edge':'str', 'out_edge':'str',
'end_unix':'int', 'inter_name':'str', 'inter_lat':'float', 'inter_lon':'float',
'group_no':'int', 'main_phase_no':'int', 'phase_no':'int','ring_type':'str'
}
for alph in ['A', 'B']:
for j in range(1,9):
loading_dtype[f'angle_{alph}{j}'] = 'str'
loading_dtype[f'dura_{alph}{j}'] = 'int'
self.path_table = os.path.join(self.path_root, 'Data', 'tables')
# 테이블 불러오기
self.inter_info = pd.read_csv(os.path.join(self.path_table, 'inter_info.csv'), dtype=loading_dtype)
self.plan = pd.read_csv(os.path.join(self.path_table, 'plan.csv'), dtype=loading_dtype)
self.history = pd.read_csv(os.path.join(self.path_table, 'history.csv'), dtype=loading_dtype)
self.inter_node = pd.read_csv(os.path.join(self.path_table, 'inter_node.csv'), dtype=loading_dtype)
self.matching = pd.read_csv(os.path.join(self.path_root, 'Intermediates', 'matching.csv'), dtype=loading_dtype)
self.movements = pd.read_csv(os.path.join(self.path_root, 'Intermediates', 'movements.csv'), dtype=loading_dtype)
self.match6 = pd.read_csv(os.path.join(self.path_root, 'Intermediates', 'match6.csv'), dtype=loading_dtype)
self.match6 = self.match6[['node_id', 'phase_no', 'ring_type', 'inc_edge', 'out_edge']].reset_index(drop=True)
# 교차로목록 정의
self.inter_nos = sorted(self.inter_info.inter_no.unique())
print("1-2. 테이블들이 로드되었습니다.")
# 1-3. 네트워크 무결성 검사
def check_networks(self):
# https://sumo.dlr.de/docs/Netedit/neteditUsageExamples.html#simplify_tls_program_state_after_changing_connections
if 'SUMO_HOME' in os.environ:
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
if tools not in sys.path:
sys.path.append(tools)
else:
raise EnvironmentError("please declare environment variable 'SUMO_HOME'")
traci.start([sumolib.checkBinary('sumo'), "-n", os.path.join(self.path_root, 'Data', 'networks', 'sn.net.xml')])
nodes = [node for node in self.net.getNodes() if node.getType()=='traffic_light']
for node in nodes:
node_id = node.getID()
from_xml = len([c for c in node.getConnections() if c.getTLLinkIndex() >= 0])
from_traci = len(traci.trafficlight.getRedYellowGreenState(node_id))
if from_xml != from_traci:
sub = {'id': node_id, 'type': 'node', 'note': '유효하지 않은 연결이있음. netedit에서 clean states 필요.'}
self.issues.append(sub)
traci.close()
print("1-3. 네트워크의 모든 clean state requirement들을 체크했습니다.")
# 1-4. 테이블 무결성 검사
def check_tables(self):
self.check_history()
# 교차로정보, 방위각정보, 신호계획에 대해서는 preprocess_daily.py에서
# 무결성검사를 완료했으므로 여기에서는 따로 검사하지 않음.
# self.check_moves() # 이동류번호에 대한 무결성검사 필요하나 아직 작성하지 않음. (24. 2. 5 화)
print("1-4. 테이블들의 무결성 검사를 완료했습니다.")
# 1-4-1. 신호이력(history) 검사
def check_history(self):
# 1-4-1-1. inter_no 검사
# self.history.loc[0, 'inter_no'] = '4' # 에러 발생을 위한 코드
missing_inter_nos = set(self.history.inter_no) - set(self.inter_nos)
if missing_inter_nos:
msg = f"1-4-1-1. history의 inter_no 중 교차로 목록(inter_nos)에 포함되지 않는 항목이 있습니다: {missing_inter_nos}"
self.issues.append(msg)
# 1-4-1-2. 종료유닉스 검사
# self.history.loc[0, 'end_unix'] = 38.0 # 에러 발생을 위한 코드
self.min_unix, self.max_unix = int(datetime(2020, 1, 1).timestamp()), int(datetime(2038, 1, 1).timestamp())
for _, row in self.history.iterrows():
unixbool = self.min_unix <= row['end_unix'] <= self.max_unix
if not unixbool:
msg = f"1-4-1-2. 적정 범위를 벗어난 유닉스시각(end_unix)이 존재합니다 : inter_no : {row['inter_no']}"
self.issues.append(msg)
# 1-4-1-3. 현시시간 검사
# self.history.loc[0, 'dura_A1'] = -2 # 에러 발생을 위한 코드
durations = self.history[[f'dura_{alph}{j}' for alph in ['A','B'] for j in range(1, 9)]]
valid_indices = ((durations >= 0) & (durations <= 200)).all(axis=1)
invalid_inter_nos = sorted(self.history[~ valid_indices].inter_no.unique())
if invalid_inter_nos:
msg = f"1-4-1-3. 음수이거나 200보다 큰 현시시간이 존재합니다. : {invalid_inter_nos}"
# 1-5. 보조 딕셔너리, 데이터프레임, 리스트 등 만들기
def prepare_auxiliaries(self):
# inter2node : a dictionary that maps inter_no to the node_id
inter_node_p = self.inter_node[self.inter_node.inter_type=='parent']
self.inter2node = dict(zip(inter_node_p['inter_no'], inter_node_p['node_id']))
self.node2inter = dict(zip(self.inter_node['node_id'], self.inter_node['inter_no']))
# hours : 정각에 해당하는 시각들 목록
self.hours = np.array(range(self.midnight - 7200, self.next_day + 1, 3600))
# split, isplit : A,B 분리 혹은 통합시 사용될 수 있는 딕셔너리
self.splits = {} # splits maps (inter_no, start_hour, start_minute) to split
for i, row in self.plan.iterrows():
inter_no = row.inter_no
start_hour = row.start_hour
start_minute = row.start_minute
cycle = row.cycle
cums_A = row[[f'dura_A{j}' for j in range(1,9)]].cumsum()
cums_B = row[[f'dura_B{j}' for j in range(1,9)]].cumsum()
self.splits[(inter_no, start_hour, start_minute)] = {} # split maps (phas_A, phas_B) to k
k = 0
for t in range(cycle):
new_phas_A = len(cums_A[cums_A < t]) + 1
new_phas_B = len(cums_B[cums_B < t]) + 1
if k == 0 or ((new_phas_A, new_phas_B) != (phas_A, phas_B)):
k += 1
phas_A = new_phas_A
phas_B = new_phas_B
self.splits[(inter_no, start_hour, start_minute)][(phas_A, phas_B)] = k
self.isplits = {} # the inverse of splits
for i in self.splits:
self.isplits[i] = {self.splits[i][k]:k for k in self.splits[i]} # isplit maps k to (phas_A, phas_B)
# timetable : 교차로별 프로그램 시작시각
self.timetable = self.plan[['start_hour', 'start_minute']].drop_duplicates()
self.timetable['start_seconds'] = self.midnight + self.timetable['start_hour'] * 3600 + self.timetable['start_minute'] * 60
# A dictionary that maps parent_id to a list of child_ids
self.pa2ch = {'i0':['u00'], 'i1':[], 'i2':['u20'], 'i3':['c30', 'u30', 'u31', 'u32'], 'i6':['u60'], 'i7':[], 'i8':[], 'i9':[]}
self.node_ids = sorted(self.inter_node.node_id.unique())
self.parent_ids = sorted(self.inter_node[self.inter_node.inter_type=='parent'].node_id.unique())
self.nodes = [self.net.getNode(node_id) for node_id in self.node_ids]
# node2num_cycles : A dictionary that maps a node_id to the number of cycles
with open(os.path.join(self.path_root, 'Intermediates', 'node2num_cycles.json'), 'r') as file:
# json.load() 함수를 사용해 파일 내용을 Python 딕셔너리로 불러옵니다.
self.node2num_cycles = json.load(file)
# 2. 신호이력 전처리
def process_history(self):
print("2. 신호이력 테이블을 변환합니다.")
self.make_rhistory()
self.make_rhists()
self.make_hrhists()
# 2-1. rhistory
def make_rhistory(self):
# 1. 조회시점의 유닉스 타임 이전의 신호이력 수집
self.rhistory = self.history.copy() # recent history
self.rhistory = self.rhistory[(self.rhistory.end_unix <= self.present_time) & (self.rhistory.end_unix > self.present_time - 9000)] # 두 시간 반 전부터 현재까지의 신호이력을 가져옴. 9000 = 3600 * 2.5
# rhistory에 모든 교차로번호가 존재하지 않으면 해당 교차로번호에 대한 신호이력을 추가함 (at 최근 프로그램 시작시각)
whole_inter_nos = sorted(self.history.inter_no.unique())
recent_inter_nos = sorted(self.rhistory.inter_no.unique())
if not whole_inter_nos==recent_inter_nos:
for inter_no in set(whole_inter_nos) - set(recent_inter_nos):
program_start, prow = self.load_prow(inter_no, self.present_time - 9000)
cycle = prow.cycle.iloc[0]
row1 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()
row2 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()
# prow에서 필요한 부분을 rhistory에 추가
row1['end_unix'] = program_start
row2['end_unix'] = program_start + cycle
self.rhistory = pd.concat([self.rhistory, row1, row2]).reset_index(drop=True)
# present_time + adder 의 시각에 한 주기의 신호 추가
for inter_no in set(whole_inter_nos):
program_start, prow = self.load_prow(inter_no, self.present_time)
cycle = prow.cycle.iloc[0]
row3 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()
# prow에서 필요한 부분을 rhistory에 추가
row3['end_unix'] = self.present_time + self.adder
self.rhistory = pd.concat([self.rhistory, row3]).reset_index(drop=True)
# 2. 시작 유닉스 타임컬럼 생성 후 종류 유닉스 타임에서 현시별 현시기간 컬럼의 합을 뺀 값으로 입력
# - 현시시간의 합을 뺀 시간의 +- 10초 이내에 이전 주기정보가 존재하면 그 유닉스 시간을 시작 유닉스시간 값으로 하고, 존재하지 않으면 현시시간의 합을 뺀 유닉스 시간을 시작 유닉스 시간으로 지정
for i, row in self.rhistory.iterrows():
inter_no = row.inter_no
end_unix = row.end_unix
elapsed_time = row[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]].sum() // 2 # 현시시간 합
# 이전 유닉스 존재하지 않음 : 현시시간 합의 차
start_unix = end_unix - elapsed_time
pre_rows = self.history[:i] # previous rows
if inter_no in pre_rows.inter_no.unique(): # 이전 유닉스 존재
pre_unix = pre_rows[pre_rows.inter_no == inter_no]['end_unix'].iloc[-1] # previous unix time
# 이전 유닉스 존재, abs < 10 : 이전 유닉스
if abs(pre_unix - start_unix) < 10:
start_unix = pre_unix
# 이전 유닉스 존재, abs >=10 : 현시시간 합의 차
else:
pass
self.rhistory.loc[i, 'start_unix'] = start_unix
self.rhistory[self.rhistory.isna()] = 0
self.rhistory['start_unix'] = self.rhistory['start_unix'].astype(int)
self.rhistory = self.rhistory[['inter_no', 'start_unix'] + [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)] + ['cycle']]
def load_prow(self, inter_no, time):
'''
load planned row
'''
# 프로그램 시작시각
program_starts = np.array(self.timetable.start_seconds)
idx = (program_starts <= time).sum() - 1
program_start = program_starts[idx]
# 최근 프로그램 시작시각에 대한 신호계획
start_hour = self.timetable.iloc[idx].start_hour
start_minute = self.timetable.iloc[idx].start_minute
prow = self.plan[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row
return program_start, prow
# 2-2. rhists
def make_rhists(self):
self.rhists = []
for inter_no in sorted(self.rhistory.inter_no.unique()):
self.rhist = self.rhistory.copy()[self.rhistory.inter_no==inter_no]
self.rhist = self.rhist.drop_duplicates(subset=['start_unix']).reset_index(drop=True)
# D_n 및 S_n 값 정의
self.rhist['D_n'] = 0 # D_n : 시간차이
self.rhist['S_n'] = 0 # S_n : 현시시간합
for n in range(len(self.rhist)):
curr_unix = self.rhist.iloc[n].start_unix # current start_unix
self.rhist.loc[n, ['D_n', 'S_n']] = self.calculate_DS(self.rhist, curr_unix)
# 이전시각, 현재시각
prev_unix = self.rhist.loc[0, 'start_unix'] # previous start_unix
curr_unix = self.rhist.loc[1, 'start_unix'] # current start_unix
# rhist의 마지막 행에 도달할 때까지 반복
while True:
n = self.rhist[self.rhist.start_unix==curr_unix].index[0]
cycle = self.rhist.loc[n, 'cycle']
D_n = self.rhist.loc[n, 'D_n']
S_n = self.rhist.loc[n, 'S_n']
# 참값인 경우
if (abs(D_n - S_n) <= 5):
pass
# 참값이 아닌 경우
else:
# 2-1-1. 결측치 처리 : 인접한 두 start_unix의 차이가 계획된 주기의 두 배보다 크면 결측이 일어났다고 판단, 신호계획의 현시시간으로 "대체"
if curr_unix - prev_unix >= 2 * cycle:
# prev_unix를 계획된 주기만큼 늘려가면서 한 행씩 채워나간다.
# (curr_unix와의 차이가 계획된 주기보다 작거나 같아질 때까지)
while curr_unix - prev_unix > cycle:
prev_unix += cycle
# 신호 계획(prow) 불러오기
start_seconds = np.array(self.timetable.start_seconds)
idx = (start_seconds <= prev_unix).sum() - 1
start_hour = self.timetable.iloc[idx].start_hour
start_minute = self.timetable.iloc[idx].start_minute
prow = self.plan.copy()[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row
# prow에서 필요한 부분을 rhist에 추가
prow['start_unix'] = prev_unix
prow = prow.drop(['start_hour', 'start_minute', 'offset'], axis=1)
cycle = prow.iloc[0].cycle
self.rhist = pd.concat([self.rhist, prow])
self.rhist = self.rhist.sort_values(by='start_unix').reset_index(drop=True)
n += 1
# 2-1-2. 이상치 처리 : 비율에 따라 해당 행을 "삭제"(R_n <= 0.5) 또는 "조정"(R_n > 0.5)한다
R_n = (curr_unix - prev_unix) / cycle # R_n : 비율
# R_n이 0.5보다 작거나 같으면 해당 행을 삭제
if R_n <= 0.5:
self.rhist = self.rhist.drop(index=n).reset_index(drop=True)
if n >= self.rhist.index[-1]:
break
# 행삭제에 따른 curr_unix, R_n 재정의
curr_unix = self.rhist.loc[n, 'start_unix']
R_n = (curr_unix - prev_unix) / cycle # R_n : 비율
# R_n이 0.5보다 크면 해당 행 조정 (비율을 유지한 채로 현시시간 대체)
if R_n > 0.5:
# 신호 계획(prow) 불러오기
start_seconds = np.array(self.timetable.start_seconds)
idx = (start_seconds <= curr_unix).sum() - 1
start_hour = self.timetable.iloc[idx].start_hour
start_minute = self.timetable.iloc[idx].start_minute
prow = self.plan[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row
# 조정된 현시시간 (prow에 R_n을 곱하고 정수로 바꿈)
adjusted_dur = prow.copy()[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] * R_n
int_parts = adjusted_dur.iloc[0].apply(lambda x: int(x))
frac_parts = adjusted_dur.iloc[0] - int_parts
difference = round(adjusted_dur.iloc[0].sum()) - int_parts.sum()
for _ in range(difference): # 소수 부분이 가장 큰 상위 'difference'개의 값에 대해 올림 처리
max_frac_index = frac_parts.idxmax()
int_parts[max_frac_index] += 1
frac_parts[max_frac_index] = 0 # 이미 처리된 항목은 0으로 설정
# rhist에 조정된 현시시간을 반영
self.rhist.loc[n, [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] = int_parts.values
self.rhist.loc[n, 'cycle'] = int_parts.sum().sum() // 2
if n >= self.rhist.index[-1]:
break
prev_unix = curr_unix
curr_unix = self.rhist.loc[n+1, 'start_unix']
# 생략해도 무방할 코드
self.rhist = self.rhist.reset_index(drop=True)
self.rhist = self.rhist.sort_values(by=['start_unix'])
# D_n 및 S_n 값 재정의
for n in range(len(self.rhist)):
curr_unix = self.rhist.iloc[n].start_unix # current start_unix
self.rhist.loc[n, ['D_n', 'S_n']] = self.calculate_DS(self.rhist, curr_unix)
self.rhists.append(self.rhist)
self.rhists = pd.concat(self.rhists).sort_values(by=['start_unix','inter_no'])
self.rhists = self.rhists[self.rhists.start_unix >= self.present_time - 3600]
self.rhists = self.rhists.drop(columns=['D_n', 'S_n'])
def calculate_DS(self, rhist, curr_unix):
program_starts = np.array(self.timetable.start_seconds)
idx = (program_starts <= self.present_time).sum() - 1
program_start = program_starts[idx]
if list(self.hours[self.hours <= curr_unix]):
ghour_lt_curr_unix = self.hours[self.hours <= curr_unix].max() # the greatest hour less than or equal to curr_unix
else:
ghour_lt_curr_unix = program_start
start_unixes = rhist.start_unix.unique()
start_unixes_lt_ghour = np.sort(start_unixes[start_unixes < ghour_lt_curr_unix]) # start unixes less than ghour_lt_curr_unix
# 기준유닉스(base_unix) : curr_unix보다 작은 hour 중에서 가장 큰 값으로부터 다섯 번째로 작은 start_unix
if len(start_unixes_lt_ghour) > 5:
base_unix = start_unixes_lt_ghour[-5]
# start_unixes_lt_ghour의 길이가 5 미만일 경우에는 맨 앞 start_unix로 base_unix를 지정
else:
base_unix = rhist.start_unix.min()
D_n = curr_unix - base_unix
S_n_durs = rhist[(rhist.start_unix > base_unix) & (rhist.start_unix <= curr_unix)] \
[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]]
S_n = S_n_durs.values.sum() // 2
return D_n, S_n
# 2-2. hrhists
def make_hrhists(self):
# 계층화된 형태로 변환
self.hrhists = [] # hierarchied recent history
for i, row in self.rhists.iterrows():
inter_no = row.inter_no
start_unix = row.start_unix
ind = (self.timetable['start_seconds'] <= row.start_unix).sum() - 1
start_hour = self.timetable.iloc[ind].start_hour
start_minute = self.timetable.iloc[ind].start_minute
self.isplit = self.isplits[(inter_no, start_hour, start_minute)]
phas_As = [self.isplit[j][0] for j in self.isplit.keys()]
phas_Bs = [self.isplit[j][1] for j in self.isplit.keys()]
durs_A = row[[f'dura_A{j}' for j in range(1,9)]]
durs_B = row[[f'dura_B{j}' for j in range(1,9)]]
durations = []
for j in range(1, len(self.isplit)+1):
ja = self.isplit[j][0]
jb = self.isplit[j][1]
if ja == jb:
durations.append(min(durs_A[ja-1], durs_B[jb-1]))
else:
durations.append(abs(durs_A[ja-1] - durs_B[ja-1]))
new_rows = pd.DataFrame({'inter_no':[inter_no] * len(durations), 'start_unix':[start_unix] * len(durations),
'phas_A':phas_As, 'phas_B':phas_Bs, 'duration':durations})
self.hrhists.append(new_rows)
self.hrhists = pd.concat(self.hrhists)
self.hrhists = self.hrhists.sort_values(by = ['start_unix', 'inter_no', 'phas_A', 'phas_B']).reset_index(drop=True)
# 3. 이동류정보 전처리
def process_movement(self):
print("3. 이동류정보 테이블을 변환합니다.")
self.make_movement()
self.update_movement()
# 3-1. movement
def make_movement(self):
# # - 아래 절차를 5초마다 반복
# for fsec in range(self.midnight, self.present_time + 1, 5): # fsec : unix time by Five SECond
# # 1. 상태 테이블 조회해서 전체 데이터중 필요데이터(교차로번호, A링 현시번호, A링 이동류번호, B링 현시번호, B링 이동류번호)만 수집 : A
# # move = time2move[fsec]
# move = pd.read_csv(f'../Data/tables/move/move_{fsec}.csv', index_col=0)
# # 2. 이력 테이블 조회해서 교차로별로 유닉스시간 최대인 데이터(교차로변호, 종료유닉스타임)만 수집 : B
# recent_histories = [group.iloc[-1:] for _, group in self.history[self.history['end_unix'] < fsec].groupby('inter_no')] # 교차로별로 유닉스시간이 최대인 행들
# if not recent_histories:
# rhistory = pd.DataFrame({'inter_no':[], 'end_unix':[]}) # recent history
# else:
# rhistory = pd.concat(recent_histories)
# recent_unix = rhistory[['inter_no', 'end_unix']]
# # 3. 상태 테이블 조회정보(A)와 이력 테이블 조회정보(B) 조인(키값 : 교차로번호) : C
# move = pd.merge(move, recent_unix, how='left', on='inter_no')
# move['end_unix'] = move['end_unix'].fillna(0).astype(int)
# move = move.drop_duplicates()
# # 4. C데이터 프레임에 신규 컬럼(시작 유닉스타임) 생성 후 종료유닉스 타임 값 입력, 종료 유닉스 타임 컬럼 제거
# move = move.rename(columns = {'end_unix':'start_unix'})
# # 5. 이동류 이력정보 READ
# # - CSV 파일로 서버에 저장된 이동류정보를 읽어옴(파일이 없는 경우에는 데이터가 없는 프레임 D 생성)
# try:
# if isinstance(self.movement, pd.DataFrame): # movement가 존재할 경우 그걸 그대로 씀.
# pass
# else:
# self.movement = pd.DataFrame()
# except NameError: # movement가 존재하지 않는 경우 생성
# self.movement = pd.DataFrame()
# # 6. 이동류 이력정보 데이터테이블(D)에 C데이터 add
# self.movement = pd.concat([self.movement, move])
# # 7. D데이터 프레임에서 중복데이터 제거(교차로번호, 시작 유닉스타임, A링 현시번호, B링 현시번호 같은 행은 제거)
# self.movement = self.movement.drop_duplicates(['inter_no','phas_A','phas_B','start_unix'])
# # 8. D데이터 보관 시간 기준시간을 시작 유닉스 타임의 최대값 - 3600을 값으로 산출하고, 보관 시간 기준시간보다 작은 시작 유닉스 타임을 가진 행은 모두 제거(1시간 데이터만 보관)
# self.movement = self.movement[self.movement.start_unix > fsec - 3600]
# self.movement = self.movement.sort_values(by=['start_unix','inter_no','phas_A','phas_B']).reset_index(drop=True)
self.movement = pd.read_csv(os.path.join(self.path_root, 'Intermediates', 'movement', f'movement_{self.present_time}.csv'), index_col=0)
# 3-2. movement_updated
def update_movement(self):
# 중복을 제거하고 (inter_no, start_unix) 쌍을 만듭니다.
hrhists_inter_unix = set(self.hrhists[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None))
movement_inter_unix = set(self.movement[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None))
# hrhists에는 있지만 movement에는 없는 (inter_no, start_unix) 쌍을 찾습니다.
missing_in_movement = hrhists_inter_unix - movement_inter_unix
# 새로운 행들을 생성합니다.
new_rows = []
if missing_in_movement:
for inter_no, start_unix in missing_in_movement:
# movements에서 해당 inter_no의 데이터를 찾습니다.
new_row = self.movements[self.movements['inter_no'] == inter_no].copy()
# start_unix 값을 설정합니다.
new_row['start_unix'] = start_unix
new_rows.append(new_row)
# 새로운 데이터프레임을 생성하고 기존 movement 데이터프레임과 합칩니다.
new_movement = pd.concat(new_rows, ignore_index=True)
self.movement_updated = pd.concat([self.movement, new_movement], ignore_index=True)
else:
self.movement_updated = self.movement
# 4. 통합테이블 생성
def make_histids(self):
print("4. 통합 테이블을 생성합니다.")
self.merge_dfs()
self.attach_children()
# 4-1. histid
def merge_dfs(self):
# movements and durations
movedur = pd.merge(self.hrhists, self.movement_updated, how='inner', on=['inter_no', 'start_unix', 'phas_A', 'phas_B'])
movedur = movedur.sort_values(by=['start_unix', 'inter_no', 'phas_A','phas_B'])
movedur = movedur[['inter_no', 'start_unix', 'phas_A', 'phas_B', 'move_A', 'move_B', 'duration']]
# 이동류 매칭 테이블에서 진입id, 진출id를 가져와서 붙임.
for i, row in movedur.iterrows():
inter_no = row.inter_no
start_unix = row.start_unix
# incoming and outgoing edges A
move_A = row.move_A
if move_A in [17, 18]:
inc_edge_A = np.nan
outhedge_A = np.nan
else:
match_A = self.matching[(self.matching.inter_no == inter_no) & (self.matching.move_no == move_A)].iloc[0]
inc_edge_A = match_A.inc_edge
out_edge_A = match_A.out_edge
movedur.loc[i, ['inc_edge_A', 'out_edge_A']] = [inc_edge_A, out_edge_A]
# incoming and outgoing edges B
move_B = row.move_B
if move_B in [17, 18]:
inc_edge_B = np.nan
out_edge_B = np.nan
else:
match_B = self.matching[(self.matching.inter_no == inter_no) & (self.matching.move_no == move_B)].iloc[0]
inc_edge_B = match_B.inc_edge
out_edge_B = match_B.out_edge
movedur.loc[i, ['inc_edge_B', 'out_edge_B']] = [inc_edge_B, out_edge_B]
# 이동류 컬럼 제거
movedur = movedur.drop(['move_A', 'move_B'], axis=1)
self.histid = movedur.copy() # history with edge ids (incoming and outgoing edge ids)
self.histid['node_id'] = self.histid['inter_no'].map(self.inter2node)
self.histid = self.histid[['inter_no', 'node_id', 'start_unix', 'phas_A', 'phas_B', 'duration', 'inc_edge_A', 'out_edge_A', 'inc_edge_B', 'out_edge_B']]
histid_start = self.present_time - 600
self.histid = self.histid[self.histid.start_unix > histid_start]
# 4-2. histids
def attach_children(self):
'''
자식교차로에 대한 진입·진출 엣지 정보를 붙여주는 함수
input :
(1) histid
- 각 교차로에 대한 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
- 부모교차로(주교차로)에 대해서만 값이 지정되어 있음
(2) match6
- (현시, 링)별 진입·진출엣지
- 자식교차로(유턴 및 연동교차로)에 대해서도 값이 지정되어 있음
(3) parent_ids : 부모교차로 목록
(4) pa2ch : 각 부모교차로id를 부모교차로가 포함하고 있는 자식교차로들의 id들의 리스트로 대응시키는 딕셔너리
output : histids
- 모든(부모 및 자식) 교차로에 대한 시작유닉스 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
'''
new_histids = []
for parent_id in self.parent_ids:
for child_id in self.pa2ch[parent_id]:
new_histid = self.histid.copy()[self.histid.node_id==parent_id]
new_histid[['inc_edge_A', 'out_edge_A', 'inc_edge_B', 'out_edge_B']] = np.nan
for i, row in new_histid.iterrows():
phas_A = row.phas_A
phas_B = row.phas_B
new_match = self.match6[self.match6.node_id==child_id]
Arow = new_match[(new_match.phase_no==phas_A) & (new_match.ring_type=='A')]
if ~ Arow[['inc_edge', 'out_edge']].isna().all().all():
inc_edge = Arow.iloc[0].inc_edge
out_edge = Arow.iloc[0].out_edge
new_histid.loc[i, ['inc_edge_A', 'out_edge_A']] = [inc_edge, out_edge]
Brow = new_match[(new_match.phase_no==phas_B) & (new_match.ring_type=='B')]
if ~ Brow[['inc_edge', 'out_edge']].isna().all().all():
inc_edge = Brow.iloc[0].inc_edge
out_edge = Brow.iloc[0].out_edge
new_histid.loc[i, ['inc_edge_B', 'out_edge_B']] = [inc_edge, out_edge]
new_histid.loc[i, 'node_id'] = child_id
new_histids.append(new_histid)
new_histids = pd.concat(new_histids)
self.histids = pd.concat([self.histid.copy(), new_histids])
self.histids = self.histids.sort_values(by=['start_unix', 'node_id', 'phas_A', 'phas_B']).reset_index(drop=True)
# 5. 신호 생성
print("5. 신호를 생성합니다.")
def get_signals(self):
self.initialize_states()
self.assign_signals()
self.set_timepoints()
self.assign_red_yellow()
self.make_tl_file()
# 5-1. 신호초기화
def initialize_states(self):
'''
신호 초기화
input :
(1) net : 네트워크
(2) nodes : 노드 목록
(3) histids : 모든 교차로에 대한 시작유닉스 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
output : node2init
- 각 노드를 초기화된 신호로 맵핑하는 딕셔너리
- 초기화된 신호란, 우회전을 g로 나머지는 r로 지정한 신호를 말함.
'''
self.node2init = {}
for node in self.nodes:
node_id = node.getID()
conns = [(c.getJunctionIndex(), c) for c in node.getConnections()]
conns = [c for c in conns if c[0] >= 0]
conns = sorted(conns, key=lambda x: x[0])
state = []
for i, ci in conns:
if ci.getTLLinkIndex() < 0:
continue
are_foes = False
for j, cj in conns:
if ci.getTo() == cj.getTo():
continue
if node.areFoes(i, j):
are_foes = True
break
state.append('r' if are_foes else 'g')
self.node2init[node_id] = state
# 어떤 연결과도 상충이 일어나지는 않지만, 신호가 부여되어 있는 경우에는 r을 부여
for _, row in self.histids.iterrows():
node_id = row['node_id']
inc_edge_A = row.inc_edge_A
inc_edge_B = row.inc_edge_B
out_edge_A = row.out_edge_A
out_edge_B = row.out_edge_B
if pd.isna(inc_edge_A) or pd.isna(out_edge_A):
pass
else:
inc_edge_A = self.net.getEdge(inc_edge_A)
out_edge_A = self.net.getEdge(out_edge_A)
for conn in inc_edge_A.getConnections(out_edge_A):
index = conn.getTLLinkIndex()
if index >= 0:
self.node2init[node_id][index] = 'r'
if pd.isna(inc_edge_B) or pd.isna(out_edge_B):
pass
else:
inc_edge_B = self.net.getEdge(inc_edge_B)
out_edge_B = self.net.getEdge(out_edge_B)
for conn in inc_edge_B.getConnections(out_edge_B):
index = conn.getTLLinkIndex()
if index >= 0:
self.node2init[node_id][index] = 'r'
# 5-2. 녹색신호 부여
def assign_signals(self):
'''
진입·진출엣지를 신호문자열로 배정
input :
(1) histids : 모든 교차로에 대한 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
(2) node2init : 각 노드를 초기화된 신호로 맵핑하는 딕셔너리
(3) net : 네트워크
output : sigtable
- 모든 교차로에 대한 (시작유닉스, A현시, B현시)별 현시시간, 신호문자열
- 황색 및 적색신호는 아직 반영되지 않았음.
'''
self.sigtable = self.histids.copy()
self.sigtable['init_state'] = self.sigtable['node_id'].map(self.node2init)
self.sigtable['state'] = self.sigtable['init_state'].map(lambda x:''.join(x))
for i, row in self.sigtable.iterrows():
node_id = row.node_id
inc_edge_A = row.inc_edge_A
inc_edge_B = row.inc_edge_B
out_edge_A = row.out_edge_A
out_edge_B = row.out_edge_B
state = copy.deepcopy(self.node2init)[node_id]
if pd.isna(inc_edge_A) or pd.isna(out_edge_A):
pass
else:
inc_edge_A = self.net.getEdge(inc_edge_A)
out_edge_A = self.net.getEdge(out_edge_A)
for conn in inc_edge_A.getConnections(out_edge_A):
index = conn.getTLLinkIndex()
if index >= 0:
state[index] = 'G'
self.sigtable.at[i, 'state'] = ''.join(state)
if pd.isna(inc_edge_B) or pd.isna(out_edge_B):
pass
else:
inc_edge_B = self.net.getEdge(inc_edge_B)
out_edge_B = self.net.getEdge(out_edge_B)
for conn in inc_edge_B.getConnections(out_edge_B):
index = conn.getTLLinkIndex()
if index >= 0:
state[index] = 'G'
self.sigtable.at[i, 'state'] = ''.join(state)
self.sigtable = self.sigtable.dropna(subset='state')
self.sigtable = self.sigtable.reset_index(drop=True)
self.sigtable['phase_sumo'] = self.sigtable.groupby(['node_id', 'start_unix']).cumcount()
self.sigtable = self.sigtable[['node_id', 'start_unix', 'phase_sumo', 'duration', 'state']]
self.sigtable = self.sigtable.sort_values(by=['start_unix', 'node_id'])
self.sigtable['start_dt'] = self.sigtable['start_unix'].apply(lambda x:datetime.fromtimestamp(x))
# 5-3. 신호 파일의 시작 및 종료시각 설정
def set_timepoints(self):
self.offsets = {}
self.Sigtable = []
sim_start = self.present_time - 300
for node_id, group in self.sigtable.groupby('node_id'):
lsbs = group[group['start_unix'] < sim_start]['start_unix'].max() # the last start_unix before sim_start
self.offsets[node_id] = lsbs - sim_start
group = group[group.start_unix >= lsbs]
start_unixes = np.array(group.start_unix)
start_unixes = np.sort(np.unique(start_unixes))[:self.node2num_cycles[node_id]]
group = group[group.start_unix.isin(start_unixes)]
self.Sigtable.append(group)
self.Sigtable = pd.concat(self.Sigtable)
# 5-4. 적색 및 황색신호 부여
def assign_red_yellow(self):
'''
적색, 황색신호를 반영한 신호문자열 배정
input : Sigtable
- 모든 교차로에 대한 (시작유닉스, 세부현시번호)별 현시시간, 신호문자열, 진입·진출엣지
* 세부현시란 오버랩을 반영한 현시번호를 뜻함.
output : SIGTABLE
- 모든 교차로에 대한 (시작유닉스, 녹황적세부현시번호)별 현시시간, (황·적색신호가 포함된) 신호문자열
* 녹황적세부현시번호란 세부현시번호에 r, g, y 옵션까지 포함된 현시번호를 뜻함.
'''
self.SIGTABLE = []
for node_id, group in self.Sigtable.groupby('node_id'):
new_rows_list = []
for i in range(1, len(group)):
prev_row = group.iloc[i-1:i].copy()
next_row = group.iloc[i:i+1].copy()
new_rows = pd.concat([prev_row, prev_row, next_row]).reset_index(drop=True)
new_rows.loc[0, 'phase_sumo'] = str(prev_row.phase_sumo.iloc[0]) + '_g'
new_rows.loc[0, 'duration'] = new_rows.loc[0, 'duration'] - 5
new_rows.loc[1, 'phase_sumo'] = str(prev_row.phase_sumo.iloc[0]) + '_y'
new_rows.loc[1, 'duration'] = 4
yellow_state = ''
red_state = ''
for a, b in zip(prev_row.state.iloc[0], next_row.state.iloc[0]):
if a == 'G' and b == 'r':
yellow_state += 'y'
red_state += 'r'
else:
yellow_state += a
red_state += a
new_rows.loc[2, 'phase_sumo'] = str(next_row.phase_sumo.iloc[0]) + '__r'
new_rows.loc[2, 'duration'] = 1
new_rows.loc[1, 'state'] = yellow_state
new_rows.loc[2, 'state'] = red_state
new_rows_list.append(new_rows)
next_row['phase_sumo'] = str(next_row.phase_sumo.iloc[0]) + '_g'
next_row['duration'] -= 5
# next_row.loc['duration'] -= 5
new_rows_list.append(next_row)
new_rows = pd.concat(new_rows_list)
self.SIGTABLE.append(new_rows)
self.SIGTABLE = pd.concat(self.SIGTABLE).sort_values(by=['node_id', 'start_unix', 'phase_sumo']).reset_index(drop=True)
# 5-5. 신호파일 생성
def make_tl_file(self):
strings = ['<additional>\n']
for node_id, group in self.SIGTABLE.groupby('node_id'):
strings.append(f' <tlLogic id="{node_id}" type="static" programID="{node_id}_prog" offset="{self.offsets[node_id]}">\n')
for i, row in group.iterrows():
duration = row.duration
state = row.state
strings.append(f' <phase duration="{duration}" state="{state}"/>\n')
strings.append(' </tlLogic>\n')
strings.append('</additional>')
strings = ''.join(strings)
# 저장
self.path_output = os.path.join(self.path_root, 'Results', f'sn_{self.present_time}.add.xml')
with open(self.path_output, 'w') as f:
f.write(strings)
# 6. 이슈사항 저장
def write_issues(self):
print('6. 이슈사항을 저장합니다.')
path_issues = os.path.join(self.path_root, "Results", "issues_generate_signals.txt")
with open(path_issues, "w", encoding="utf-8") as file:
for item in self.issues:
file.write(item + "\n")
if self.issues:
print("데이터 처리 중 발생한 특이사항은 다음과 같습니다. :")
for review in self.issues:
print(review)
def main(self):
# 1. 데이터 준비
self.prepare_data()
# 2. 신호이력 전처리
self.process_history()
# 3. 이동류정보 전처리
self.process_movement()
# 4. 통합테이블 생성
self.make_histids()
# 5. 신호 생성
self.get_signals()
# 6. 이슈사항 저장
self.write_issues()
if __name__ == '__main__':
self = SignalGenerator()
self.main()
# self.histid.to_csv(os.path.join('Intermediates', 'histid', f'histid_{self.present_time}_.csv'))