|
# (siggen) PS C:\Github\snits_siggen> python .\Scripts\generate_signals.py
|
|
import pandas as pd
|
|
import numpy as np
|
|
import os, sys
|
|
import json
|
|
import copy
|
|
from tqdm import tqdm
|
|
import sumolib, traci
|
|
from datetime import datetime
|
|
|
|
class SignalGenerator():
|
|
def __init__(self):
|
|
# 루트폴더 지정
|
|
self.path_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
with open(os.path.join(self.path_root, 'Scripts', 'config.json'), 'r') as config_file:
|
|
config = json.load(config_file)
|
|
# 주요 폴더 경로 지정
|
|
self.paths = config['paths']
|
|
self.path_data = os.path.join(self.path_root, *self.paths['data'])
|
|
self.path_intermediates = os.path.join(self.path_root, *self.paths['intermediates'])
|
|
self.path_results = os.path.join(self.path_root, *self.paths['results'])
|
|
self.path_tables = os.path.join(self.path_root, *self.paths['tables'])
|
|
self.path_networks = os.path.join(self.path_root, *self.paths['networks'])
|
|
self.path_scripts = os.path.join(self.path_root, *self.paths['scripts'])
|
|
# 이슈사항 목록
|
|
self.issues = []
|
|
|
|
self.midnight = int(datetime(2024, 1, 5, 0, 0, 0).timestamp())
|
|
self.next_day = int(datetime(2024, 1, 6, 0, 0, 0).timestamp())
|
|
self.fsecs = range(self.midnight, self.next_day, 5) # fsecs : unix time by Five SECondS
|
|
self.fmins = range(self.midnight, self.next_day, 300) # fmins : unix time by Five MINuteS
|
|
|
|
self.present_time = datetime.now().replace(month=1, day=5).timestamp()
|
|
print(self.present_time)
|
|
self.present_time = max([fmin for fmin in list(self.fmins) if fmin <= self.present_time])
|
|
|
|
self.adder = 600
|
|
|
|
# 1. 데이터 준비
|
|
def prepare_data(self):
|
|
print("1. 데이터를 준비합니다.")
|
|
self.load_networks()
|
|
self.load_tables()
|
|
self.check_networks()
|
|
self.check_tables()
|
|
self.prepare_auxiliaries()
|
|
|
|
# 1-1. 네트워크 불러오기
|
|
def load_networks(self):
|
|
self.net = sumolib.net.readNet(os.path.join(self.path_networks, 'sn.net.xml'))
|
|
print("1-1. 네트워크가 로드되었습니다.")
|
|
|
|
# 1-2. 테이블 불러오기
|
|
def load_tables(self):
|
|
# 모든 컬럼에 대하여 데이터타입 지정
|
|
loading_dtype = {
|
|
'inter_no':'int', 'start_hour':'int', 'start_minute':'int', 'cycle':'int','offset':'int',
|
|
'node_id':'str', 'inter_type':'str', 'parent_id':'str','child_id':'str',
|
|
'direction':'str', 'condition':'str', 'inc_edge':'str', 'out_edge':'str',
|
|
'end_unix':'int', 'inter_name':'str', 'inter_lat':'float', 'inter_lon':'float',
|
|
'group_no':'int', 'main_phase_no':'int', 'phase_no':'int','ring_type':'str'
|
|
}
|
|
for alph in ['A', 'B']:
|
|
for j in range(1,9):
|
|
loading_dtype[f'angle_{alph}{j}'] = 'str'
|
|
loading_dtype[f'dura_{alph}{j}'] = 'int'
|
|
|
|
|
|
# 테이블 불러오기
|
|
self.inter_info = pd.read_csv(os.path.join(self.path_tables, 'inter_info.csv'), dtype=loading_dtype)
|
|
self.plan = pd.read_csv(os.path.join(self.path_tables, 'plan.csv'), dtype=loading_dtype)
|
|
self.history = pd.read_csv(os.path.join(self.path_tables, 'history.csv'), dtype=loading_dtype)
|
|
self.inter_node = pd.read_csv(os.path.join(self.path_tables, 'inter_node.csv'), dtype=loading_dtype)
|
|
self.matching = pd.read_csv(os.path.join(self.path_intermediates, 'matching.csv'), dtype=loading_dtype)
|
|
self.match1 = pd.read_csv(os.path.join(self.path_intermediates, 'match1.csv'), dtype=loading_dtype)
|
|
self.match6 = pd.read_csv(os.path.join(self.path_intermediates, 'match6.csv'), dtype=loading_dtype)
|
|
self.match6 = self.match6[['node_id', 'phase_no', 'ring_type', 'inc_edge', 'out_edge']].reset_index(drop=True)
|
|
|
|
# 교차로목록 정의
|
|
self.inter_nos = sorted(self.inter_info.inter_no.unique())
|
|
print("1-2. 테이블들이 로드되었습니다.")
|
|
|
|
# 1-3. 네트워크 무결성 검사
|
|
def check_networks(self):
|
|
# https://sumo.dlr.de/docs/Netedit/neteditUsageExamples.html#simplify_tls_program_state_after_changing_connections
|
|
if 'SUMO_HOME' in os.environ:
|
|
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
|
|
if tools not in sys.path:
|
|
sys.path.append(tools)
|
|
else:
|
|
raise EnvironmentError("please declare environment variable 'SUMO_HOME'")
|
|
traci.start([sumolib.checkBinary('sumo'), "-n", os.path.join(self.path_networks, 'sn.net.xml')])
|
|
nodes = [node for node in self.net.getNodes() if node.getType()=='traffic_light']
|
|
for node in nodes:
|
|
node_id = node.getID()
|
|
from_xml = len([c for c in node.getConnections() if c.getTLLinkIndex() >= 0])
|
|
from_traci = len(traci.trafficlight.getRedYellowGreenState(node_id))
|
|
if from_xml != from_traci:
|
|
sub = {'id': node_id, 'type': 'node', 'note': '유효하지 않은 연결이있음. netedit에서 clean states 필요.'}
|
|
self.issues.append(sub)
|
|
traci.close()
|
|
print("1-3. 네트워크의 모든 clean state requirement들을 체크했습니다.")
|
|
|
|
# 1-4. 테이블 무결성 검사
|
|
def check_tables(self):
|
|
self.check_history()
|
|
# 교차로정보, 방위각정보, 신호계획에 대해서는 preprocess_daily.py에서
|
|
# 무결성검사를 완료했으므로 여기에서는 따로 검사하지 않음.
|
|
# self.check_moves() # 이동류번호에 대한 무결성검사 필요하나 아직 작성하지 않음. (24. 2. 5 화)
|
|
print("1-4. 테이블들의 무결성 검사를 완료했습니다.")
|
|
|
|
# 1-4-1. 신호이력(history) 검사
|
|
def check_history(self):
|
|
# 1-4-1-1. inter_no 검사
|
|
# self.history.loc[0, 'inter_no'] = '4' # 에러 발생을 위한 코드
|
|
missing_inter_nos = set(self.history.inter_no) - set(self.inter_nos)
|
|
if missing_inter_nos:
|
|
msg = f"1-4-1-1. history의 inter_no 중 교차로 목록(inter_nos)에 포함되지 않는 항목이 있습니다: {missing_inter_nos}"
|
|
self.issues.append(msg)
|
|
|
|
# 1-4-1-2. 종료유닉스 검사
|
|
# self.history.loc[0, 'end_unix'] = 38.0 # 에러 발생을 위한 코드
|
|
self.min_unix, self.max_unix = int(datetime(2020, 1, 1).timestamp()), int(datetime(2038, 1, 1).timestamp())
|
|
for _, row in self.history.iterrows():
|
|
unixbool = self.min_unix <= row['end_unix'] <= self.max_unix
|
|
if not unixbool:
|
|
msg = f"1-4-1-2. 적정 범위를 벗어난 유닉스시각(end_unix)이 존재합니다 : inter_no : {row['inter_no']}"
|
|
self.issues.append(msg)
|
|
|
|
# 1-4-1-3. 현시시간 검사
|
|
# self.history.loc[0, 'dura_A1'] = -2 # 에러 발생을 위한 코드
|
|
durations = self.history[[f'dura_{alph}{j}' for alph in ['A','B'] for j in range(1, 9)]]
|
|
valid_indices = ((durations >= 0) & (durations <= 200)).all(axis=1)
|
|
invalid_inter_nos = sorted(self.history[~ valid_indices].inter_no.unique())
|
|
if invalid_inter_nos:
|
|
msg = f"1-4-1-3. 음수이거나 200보다 큰 현시시간이 존재합니다. : {invalid_inter_nos}"
|
|
|
|
# 1-5. 보조 딕셔너리, 데이터프레임, 리스트 등 만들기
|
|
def prepare_auxiliaries(self):
|
|
# inter2node : a dictionary that maps inter_no to the node_id
|
|
inter_node_p = self.inter_node[self.inter_node.inter_type=='parent']
|
|
self.inter2node = dict(zip(inter_node_p['inter_no'], inter_node_p['node_id']))
|
|
self.node2inter = dict(zip(self.inter_node['node_id'], self.inter_node['inter_no']))
|
|
|
|
# hours : 정각에 해당하는 시각들 목록
|
|
self.hours = np.array(range(self.midnight - 7200, self.next_day + 1, 3600))
|
|
|
|
# split, isplit : A,B 분리 혹은 통합시 사용될 수 있는 딕셔너리
|
|
self.splits = {} # splits maps (inter_no, start_hour, start_minute) to split
|
|
for i, row in self.plan.iterrows():
|
|
inter_no = row.inter_no
|
|
start_hour = row.start_hour
|
|
start_minute = row.start_minute
|
|
cycle = row.cycle
|
|
cums_A = row[[f'dura_A{j}' for j in range(1,9)]].cumsum()
|
|
cums_B = row[[f'dura_B{j}' for j in range(1,9)]].cumsum()
|
|
self.splits[(inter_no, start_hour, start_minute)] = {} # split maps (phas_A, phas_B) to k
|
|
k = 0
|
|
for t in range(cycle):
|
|
new_phas_A = len(cums_A[cums_A < t]) + 1
|
|
new_phas_B = len(cums_B[cums_B < t]) + 1
|
|
if k == 0 or ((new_phas_A, new_phas_B) != (phas_A, phas_B)):
|
|
k += 1
|
|
phas_A = new_phas_A
|
|
phas_B = new_phas_B
|
|
self.splits[(inter_no, start_hour, start_minute)][(phas_A, phas_B)] = k
|
|
self.isplits = {} # the inverse of splits
|
|
for i in self.splits:
|
|
self.isplits[i] = {self.splits[i][k]:k for k in self.splits[i]} # isplit maps k to (phas_A, phas_B)
|
|
|
|
# timetable : 교차로별 프로그램 시작시각
|
|
self.timetable = self.plan[['start_hour', 'start_minute']].drop_duplicates()
|
|
self.timetable['start_seconds'] = self.midnight + self.timetable['start_hour'] * 3600 + self.timetable['start_minute'] * 60
|
|
|
|
# A dictionary that maps parent_id to a list of child_ids
|
|
self.pa2ch = {'i0':['u00'], 'i1':[], 'i2':['u20'], 'i3':['c30', 'u30', 'u31', 'u32'], 'i6':['u60'], 'i7':[], 'i8':[], 'i9':[]}
|
|
self.node_ids = sorted(self.inter_node.node_id.unique())
|
|
self.parent_ids = sorted(self.inter_node[self.inter_node.inter_type=='parent'].node_id.unique())
|
|
self.nodes = [self.net.getNode(node_id) for node_id in self.node_ids]
|
|
|
|
# node2num_cycles : A dictionary that maps a node_id to the number of cycles
|
|
with open(os.path.join(self.path_intermediates, 'node2num_cycles.json'), 'r') as file:
|
|
# json.load() 함수를 사용해 파일 내용을 Python 딕셔너리로 불러옵니다.
|
|
self.node2num_cycles = json.load(file)
|
|
|
|
# 2. 신호이력 전처리
|
|
def process_history(self):
|
|
print("2. 신호이력 테이블을 변환합니다.")
|
|
self.make_rhistory()
|
|
self.make_rhists()
|
|
self.make_hrhists()
|
|
|
|
# 2-1. rhistory
|
|
def make_rhistory(self):
|
|
# 1. 조회시점의 유닉스 타임 이전의 신호이력 수집
|
|
self.rhistory = self.history.copy() # recent history
|
|
self.rhistory = self.rhistory[(self.rhistory.end_unix <= self.present_time) & (self.rhistory.end_unix > self.present_time - 9000)] # 두 시간 반 전부터 현재까지의 신호이력을 가져옴. 9000 = 3600 * 2.5
|
|
|
|
# rhistory에 모든 교차로번호가 존재하지 않으면 해당 교차로번호에 대한 신호이력을 추가함 (at 최근 프로그램 시작시각)
|
|
whole_inter_nos = sorted(self.history.inter_no.unique())
|
|
recent_inter_nos = sorted(self.rhistory.inter_no.unique())
|
|
if not whole_inter_nos==recent_inter_nos:
|
|
for inter_no in set(whole_inter_nos) - set(recent_inter_nos):
|
|
program_start, prow = self.load_prow(inter_no, self.present_time - 9000)
|
|
cycle = prow.cycle.iloc[0]
|
|
row1 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()
|
|
row2 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()
|
|
# prow에서 필요한 부분을 rhistory에 추가
|
|
row1['end_unix'] = program_start
|
|
row2['end_unix'] = program_start + cycle
|
|
self.rhistory = pd.concat([self.rhistory, row1, row2]).reset_index(drop=True)
|
|
# present_time + adder 의 시각에 한 주기의 신호 추가
|
|
for inter_no in set(whole_inter_nos):
|
|
program_start, prow = self.load_prow(inter_no, self.present_time)
|
|
cycle = prow.cycle.iloc[0]
|
|
row3 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()
|
|
# prow에서 필요한 부분을 rhistory에 추가
|
|
row3['end_unix'] = self.present_time + self.adder
|
|
self.rhistory = pd.concat([self.rhistory, row3]).reset_index(drop=True)
|
|
|
|
# 2. 시작 유닉스 타임컬럼 생성 후 종류 유닉스 타임에서 현시별 현시기간 컬럼의 합을 뺀 값으로 입력
|
|
# - 현시시간의 합을 뺀 시간의 +- 10초 이내에 이전 주기정보가 존재하면 그 유닉스 시간을 시작 유닉스시간 값으로 하고, 존재하지 않으면 현시시간의 합을 뺀 유닉스 시간을 시작 유닉스 시간으로 지정
|
|
for i, row in self.rhistory.iterrows():
|
|
inter_no = row.inter_no
|
|
end_unix = row.end_unix
|
|
elapsed_time = row[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]].sum() // 2 # 현시시간 합
|
|
# 이전 유닉스 존재하지 않음 : 현시시간 합의 차
|
|
start_unix = end_unix - elapsed_time
|
|
pre_rows = self.history[:i] # previous rows
|
|
if inter_no in pre_rows.inter_no.unique(): # 이전 유닉스 존재
|
|
pre_unix = pre_rows[pre_rows.inter_no == inter_no]['end_unix'].iloc[-1] # previous unix time
|
|
# 이전 유닉스 존재, abs < 10 : 이전 유닉스
|
|
if abs(pre_unix - start_unix) < 10:
|
|
start_unix = pre_unix
|
|
# 이전 유닉스 존재, abs >=10 : 현시시간 합의 차
|
|
else:
|
|
pass
|
|
self.rhistory.loc[i, 'start_unix'] = start_unix
|
|
self.rhistory[self.rhistory.isna()] = 0
|
|
self.rhistory['start_unix'] = self.rhistory['start_unix'].astype(int)
|
|
self.rhistory = self.rhistory[['inter_no', 'start_unix'] + [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)] + ['cycle']]
|
|
|
|
def load_prow(self, inter_no, time):
|
|
'''
|
|
load planned row
|
|
'''
|
|
# 프로그램 시작시각
|
|
program_starts = np.array(self.timetable.start_seconds)
|
|
idx = (program_starts <= time).sum() - 1
|
|
program_start = program_starts[idx]
|
|
|
|
# 최근 프로그램 시작시각에 대한 신호계획
|
|
start_hour = self.timetable.iloc[idx].start_hour
|
|
start_minute = self.timetable.iloc[idx].start_minute
|
|
prow = self.plan[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row
|
|
return program_start, prow
|
|
|
|
# 2-2. rhists
|
|
def make_rhists(self):
|
|
self.rhists = []
|
|
for inter_no in sorted(self.rhistory.inter_no.unique()):
|
|
self.rhist = self.rhistory.copy()[self.rhistory.inter_no==inter_no]
|
|
self.rhist = self.rhist.drop_duplicates(subset=['start_unix']).reset_index(drop=True)
|
|
|
|
# D_n 및 S_n 값 정의
|
|
self.rhist['D_n'] = 0 # D_n : 시간차이
|
|
self.rhist['S_n'] = 0 # S_n : 현시시간합
|
|
for n in range(len(self.rhist)):
|
|
curr_unix = self.rhist.iloc[n].start_unix # current start_unix
|
|
self.rhist.loc[n, ['D_n', 'S_n']] = self.calculate_DS(self.rhist, curr_unix)
|
|
|
|
# 이전시각, 현재시각
|
|
prev_unix = self.rhist.loc[0, 'start_unix'] # previous start_unix
|
|
curr_unix = self.rhist.loc[1, 'start_unix'] # current start_unix
|
|
|
|
# rhist의 마지막 행에 도달할 때까지 반복
|
|
while True:
|
|
n = self.rhist[self.rhist.start_unix==curr_unix].index[0]
|
|
cycle = self.rhist.loc[n, 'cycle']
|
|
D_n = self.rhist.loc[n, 'D_n']
|
|
S_n = self.rhist.loc[n, 'S_n']
|
|
# 참값인 경우
|
|
if (abs(D_n - S_n) <= 5):
|
|
pass
|
|
# 참값이 아닌 경우
|
|
else:
|
|
# 2-1-1. 결측치 처리 : 인접한 두 start_unix의 차이가 계획된 주기의 두 배보다 크면 결측이 일어났다고 판단, 신호계획의 현시시간으로 "대체"
|
|
if curr_unix - prev_unix >= 2 * cycle:
|
|
# prev_unix를 계획된 주기만큼 늘려가면서 한 행씩 채워나간다.
|
|
# (curr_unix와의 차이가 계획된 주기보다 작거나 같아질 때까지)
|
|
while curr_unix - prev_unix > cycle:
|
|
prev_unix += cycle
|
|
# 신호 계획(prow) 불러오기
|
|
start_seconds = np.array(self.timetable.start_seconds)
|
|
idx = (start_seconds <= prev_unix).sum() - 1
|
|
start_hour = self.timetable.iloc[idx].start_hour
|
|
start_minute = self.timetable.iloc[idx].start_minute
|
|
prow = self.plan.copy()[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row
|
|
# prow에서 필요한 부분을 rhist에 추가
|
|
prow['start_unix'] = prev_unix
|
|
prow = prow.drop(['start_hour', 'start_minute', 'offset'], axis=1)
|
|
cycle = prow.iloc[0].cycle
|
|
self.rhist = pd.concat([self.rhist, prow])
|
|
self.rhist = self.rhist.sort_values(by='start_unix').reset_index(drop=True)
|
|
n += 1
|
|
|
|
# 2-1-2. 이상치 처리 : 비율에 따라 해당 행을 "삭제"(R_n <= 0.5) 또는 "조정"(R_n > 0.5)한다
|
|
R_n = (curr_unix - prev_unix) / cycle # R_n : 비율
|
|
# R_n이 0.5보다 작거나 같으면 해당 행을 삭제
|
|
if R_n <= 0.5:
|
|
self.rhist = self.rhist.drop(index=n).reset_index(drop=True)
|
|
if n >= self.rhist.index[-1]:
|
|
break
|
|
# 행삭제에 따른 curr_unix, R_n 재정의
|
|
curr_unix = self.rhist.loc[n, 'start_unix']
|
|
R_n = (curr_unix - prev_unix) / cycle # R_n : 비율
|
|
|
|
# R_n이 0.5보다 크면 해당 행 조정 (비율을 유지한 채로 현시시간 대체)
|
|
if R_n > 0.5:
|
|
# 신호 계획(prow) 불러오기
|
|
start_seconds = np.array(self.timetable.start_seconds)
|
|
idx = (start_seconds <= curr_unix).sum() - 1
|
|
start_hour = self.timetable.iloc[idx].start_hour
|
|
start_minute = self.timetable.iloc[idx].start_minute
|
|
prow = self.plan[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row
|
|
# 조정된 현시시간 (prow에 R_n을 곱하고 정수로 바꿈)
|
|
adjusted_dur = prow.copy()[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] * R_n
|
|
int_parts = adjusted_dur.iloc[0].apply(lambda x: int(x))
|
|
frac_parts = adjusted_dur.iloc[0] - int_parts
|
|
difference = round(adjusted_dur.iloc[0].sum()) - int_parts.sum()
|
|
for _ in range(difference): # 소수 부분이 가장 큰 상위 'difference'개의 값에 대해 올림 처리
|
|
max_frac_index = frac_parts.idxmax()
|
|
int_parts[max_frac_index] += 1
|
|
frac_parts[max_frac_index] = 0 # 이미 처리된 항목은 0으로 설정
|
|
# rhist에 조정된 현시시간을 반영
|
|
self.rhist.loc[n, [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] = int_parts.values
|
|
self.rhist.loc[n, 'cycle'] = int_parts.sum().sum() // 2
|
|
|
|
if n >= self.rhist.index[-1]:
|
|
break
|
|
prev_unix = curr_unix
|
|
curr_unix = self.rhist.loc[n+1, 'start_unix']
|
|
|
|
# 생략해도 무방할 코드
|
|
self.rhist = self.rhist.reset_index(drop=True)
|
|
self.rhist = self.rhist.sort_values(by=['start_unix'])
|
|
|
|
# D_n 및 S_n 값 재정의
|
|
for n in range(len(self.rhist)):
|
|
curr_unix = self.rhist.iloc[n].start_unix # current start_unix
|
|
self.rhist.loc[n, ['D_n', 'S_n']] = self.calculate_DS(self.rhist, curr_unix)
|
|
self.rhists.append(self.rhist)
|
|
self.rhists = pd.concat(self.rhists).sort_values(by=['start_unix','inter_no'])
|
|
self.rhists = self.rhists[self.rhists.start_unix >= self.present_time - 3600]
|
|
self.rhists = self.rhists.drop(columns=['D_n', 'S_n'])
|
|
|
|
def calculate_DS(self, rhist, curr_unix):
|
|
program_starts = np.array(self.timetable.start_seconds)
|
|
idx = (program_starts <= self.present_time).sum() - 1
|
|
program_start = program_starts[idx]
|
|
if list(self.hours[self.hours <= curr_unix]):
|
|
ghour_lt_curr_unix = self.hours[self.hours <= curr_unix].max() # the greatest hour less than or equal to curr_unix
|
|
else:
|
|
ghour_lt_curr_unix = program_start
|
|
start_unixes = rhist.start_unix.unique()
|
|
start_unixes_lt_ghour = np.sort(start_unixes[start_unixes < ghour_lt_curr_unix]) # start unixes less than ghour_lt_curr_unix
|
|
# 기준유닉스(base_unix) : curr_unix보다 작은 hour 중에서 가장 큰 값으로부터 다섯 번째로 작은 start_unix
|
|
if len(start_unixes_lt_ghour) > 5:
|
|
base_unix = start_unixes_lt_ghour[-5]
|
|
# start_unixes_lt_ghour의 길이가 5 미만일 경우에는 맨 앞 start_unix로 base_unix를 지정
|
|
else:
|
|
base_unix = rhist.start_unix.min()
|
|
D_n = curr_unix - base_unix
|
|
S_n_durs = rhist[(rhist.start_unix > base_unix) & (rhist.start_unix <= curr_unix)] \
|
|
[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]]
|
|
S_n = S_n_durs.values.sum() // 2
|
|
return D_n, S_n
|
|
|
|
# 2-2. hrhists
|
|
def make_hrhists(self):
|
|
# 계층화된 형태로 변환
|
|
self.hrhists = [] # hierarchied recent history
|
|
for i, row in self.rhists.iterrows():
|
|
inter_no = row.inter_no
|
|
start_unix = row.start_unix
|
|
|
|
ind = (self.timetable['start_seconds'] <= row.start_unix).sum() - 1
|
|
start_hour = self.timetable.iloc[ind].start_hour
|
|
start_minute = self.timetable.iloc[ind].start_minute
|
|
self.isplit = self.isplits[(inter_no, start_hour, start_minute)]
|
|
phas_As = [self.isplit[j][0] for j in self.isplit.keys()]
|
|
phas_Bs = [self.isplit[j][1] for j in self.isplit.keys()]
|
|
durs_A = row[[f'dura_A{j}' for j in range(1,9)]]
|
|
durs_B = row[[f'dura_B{j}' for j in range(1,9)]]
|
|
durations = []
|
|
for j in range(1, len(self.isplit)+1):
|
|
ja = self.isplit[j][0]
|
|
jb = self.isplit[j][1]
|
|
if ja == jb:
|
|
durations.append(min(durs_A[ja-1], durs_B[jb-1]))
|
|
else:
|
|
durations.append(abs(durs_A[ja-1] - durs_B[ja-1]))
|
|
new_rows = pd.DataFrame({'inter_no':[inter_no] * len(durations), 'start_unix':[start_unix] * len(durations),
|
|
'phas_A':phas_As, 'phas_B':phas_Bs, 'duration':durations})
|
|
self.hrhists.append(new_rows)
|
|
self.hrhists = pd.concat(self.hrhists)
|
|
self.hrhists = self.hrhists.sort_values(by = ['start_unix', 'inter_no', 'phas_A', 'phas_B']).reset_index(drop=True)
|
|
|
|
# 3. 이동류정보 전처리
|
|
def process_movement(self):
|
|
print("3. 이동류정보 테이블을 변환합니다.")
|
|
self.make_movement()
|
|
self.update_movement()
|
|
|
|
# 3-1. movement
|
|
def make_movement(self):
|
|
# # - 아래 절차를 5초마다 반복
|
|
# for fsec in range(self.midnight, self.present_time + 1, 5): # fsec : unix time by Five SECond
|
|
# # 1. 상태 테이블 조회해서 전체 데이터중 필요데이터(교차로번호, A링 현시번호, A링 이동류번호, B링 현시번호, B링 이동류번호)만 수집 : A
|
|
# # move = time2move[fsec]
|
|
# move = pd.read_csv(f'../Data/tables/move/move_{fsec}.csv', index_col=0)
|
|
# # 2. 이력 테이블 조회해서 교차로별로 유닉스시간 최대인 데이터(교차로변호, 종료유닉스타임)만 수집 : B
|
|
# recent_histories = [group.iloc[-1:] for _, group in self.history[self.history['end_unix'] < fsec].groupby('inter_no')] # 교차로별로 유닉스시간이 최대인 행들
|
|
# if not recent_histories:
|
|
# rhistory = pd.DataFrame({'inter_no':[], 'end_unix':[]}) # recent history
|
|
# else:
|
|
# rhistory = pd.concat(recent_histories)
|
|
# recent_unix = rhistory[['inter_no', 'end_unix']]
|
|
# # 3. 상태 테이블 조회정보(A)와 이력 테이블 조회정보(B) 조인(키값 : 교차로번호) : C
|
|
# move = pd.merge(move, recent_unix, how='left', on='inter_no')
|
|
# move['end_unix'] = move['end_unix'].fillna(0).astype(int)
|
|
# move = move.drop_duplicates()
|
|
# # 4. C데이터 프레임에 신규 컬럼(시작 유닉스타임) 생성 후 종료유닉스 타임 값 입력, 종료 유닉스 타임 컬럼 제거
|
|
# move = move.rename(columns = {'end_unix':'start_unix'})
|
|
# # 5. 이동류 이력정보 READ
|
|
# # - CSV 파일로 서버에 저장된 이동류정보를 읽어옴(파일이 없는 경우에는 데이터가 없는 프레임 D 생성)
|
|
# try:
|
|
# if isinstance(self.movement, pd.DataFrame): # movement가 존재할 경우 그걸 그대로 씀.
|
|
# pass
|
|
# else:
|
|
# self.movement = pd.DataFrame()
|
|
# except NameError: # movement가 존재하지 않는 경우 생성
|
|
# self.movement = pd.DataFrame()
|
|
# # 6. 이동류 이력정보 데이터테이블(D)에 C데이터 add
|
|
# self.movement = pd.concat([self.movement, move])
|
|
# # 7. D데이터 프레임에서 중복데이터 제거(교차로번호, 시작 유닉스타임, A링 현시번호, B링 현시번호 같은 행은 제거)
|
|
# self.movement = self.movement.drop_duplicates(['inter_no','phas_A','phas_B','start_unix'])
|
|
# # 8. D데이터 보관 시간 기준시간을 시작 유닉스 타임의 최대값 - 3600을 값으로 산출하고, 보관 시간 기준시간보다 작은 시작 유닉스 타임을 가진 행은 모두 제거(1시간 데이터만 보관)
|
|
# self.movement = self.movement[self.movement.start_unix > fsec - 3600]
|
|
# self.movement = self.movement.sort_values(by=['start_unix','inter_no','phas_A','phas_B']).reset_index(drop=True)
|
|
self.movement = pd.read_csv(os.path.join(self.path_intermediates, 'movement', f'movement_{self.present_time}.csv'), index_col=0)
|
|
|
|
# 3-2. movement_updated
|
|
def update_movement(self):
|
|
# 중복을 제거하고 (inter_no, start_unix) 쌍을 만듭니다.
|
|
hrhists_inter_unix = set(self.hrhists[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None))
|
|
movement_inter_unix = set(self.movement[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None))
|
|
|
|
# hrhists에는 있지만 movement에는 없는 (inter_no, start_unix) 쌍을 찾습니다.
|
|
missing_in_movement = hrhists_inter_unix - movement_inter_unix
|
|
|
|
# 새로운 행들을 생성합니다.
|
|
new_rows = []
|
|
if missing_in_movement:
|
|
for inter_no, start_unix in missing_in_movement:
|
|
# match1에서 해당 inter_no의 데이터를 찾습니다.
|
|
new_row = self.match1[self.match1['inter_no'] == inter_no].copy()
|
|
# start_unix 값을 설정합니다.
|
|
new_row['start_unix'] = start_unix
|
|
new_rows.append(new_row)
|
|
|
|
# 새로운 데이터프레임을 생성하고 기존 movement 데이터프레임과 합칩니다.
|
|
new_movement = pd.concat(new_rows, ignore_index=True)
|
|
self.movement_updated = pd.concat([self.movement, new_movement], ignore_index=True)
|
|
else:
|
|
self.movement_updated = self.movement
|
|
|
|
# 4. 통합테이블 생성
|
|
def make_histids(self):
|
|
print("4. 통합 테이블을 생성합니다.")
|
|
self.merge_dfs()
|
|
self.attach_children()
|
|
|
|
# 4-1. histid
|
|
def merge_dfs(self):
|
|
# movements and durations
|
|
movedur = pd.merge(self.hrhists, self.movement_updated, how='inner', on=['inter_no', 'start_unix', 'phas_A', 'phas_B'])
|
|
movedur = movedur.sort_values(by=['start_unix', 'inter_no', 'phas_A','phas_B'])
|
|
movedur = movedur[['inter_no', 'start_unix', 'phas_A', 'phas_B', 'move_A', 'move_B', 'duration']]
|
|
|
|
# 이동류 매칭 테이블에서 진입id, 진출id를 가져와서 붙임.
|
|
for i, row in movedur.iterrows():
|
|
inter_no = row.inter_no
|
|
start_unix = row.start_unix
|
|
# incoming and outgoing edges A
|
|
move_A = row.move_A
|
|
if move_A in [17, 18]:
|
|
inc_edge_A = np.nan
|
|
outhedge_A = np.nan
|
|
else:
|
|
match_A = self.matching[(self.matching.inter_no == inter_no) & (self.matching.move_no == move_A)].iloc[0]
|
|
inc_edge_A = match_A.inc_edge
|
|
out_edge_A = match_A.out_edge
|
|
movedur.loc[i, ['inc_edge_A', 'out_edge_A']] = [inc_edge_A, out_edge_A]
|
|
# incoming and outgoing edges B
|
|
move_B = row.move_B
|
|
if move_B in [17, 18]:
|
|
inc_edge_B = np.nan
|
|
out_edge_B = np.nan
|
|
else:
|
|
match_B = self.matching[(self.matching.inter_no == inter_no) & (self.matching.move_no == move_B)].iloc[0]
|
|
inc_edge_B = match_B.inc_edge
|
|
out_edge_B = match_B.out_edge
|
|
movedur.loc[i, ['inc_edge_B', 'out_edge_B']] = [inc_edge_B, out_edge_B]
|
|
|
|
# 이동류 컬럼 제거
|
|
movedur = movedur.drop(['move_A', 'move_B'], axis=1)
|
|
|
|
self.histid = movedur.copy() # history with edge ids (incoming and outgoing edge ids)
|
|
self.histid['node_id'] = self.histid['inter_no'].map(self.inter2node)
|
|
self.histid = self.histid[['inter_no', 'node_id', 'start_unix', 'phas_A', 'phas_B', 'duration', 'inc_edge_A', 'out_edge_A', 'inc_edge_B', 'out_edge_B']]
|
|
histid_start = self.present_time - 600
|
|
self.histid = self.histid[self.histid.start_unix > histid_start]
|
|
|
|
# 4-2. histids
|
|
def attach_children(self):
|
|
'''
|
|
자식교차로에 대한 진입·진출 엣지 정보를 붙여주는 함수
|
|
|
|
input :
|
|
(1) histid
|
|
- 각 교차로에 대한 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
|
|
- 부모교차로(주교차로)에 대해서만 값이 지정되어 있음
|
|
(2) match6
|
|
- (현시, 링)별 진입·진출엣지
|
|
- 자식교차로(유턴 및 연동교차로)에 대해서도 값이 지정되어 있음
|
|
(3) parent_ids : 부모교차로 목록
|
|
(4) pa2ch : 각 부모교차로id를 부모교차로가 포함하고 있는 자식교차로들의 id들의 리스트로 대응시키는 딕셔너리
|
|
|
|
output : histids
|
|
- 모든(부모 및 자식) 교차로에 대한 시작유닉스 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
|
|
'''
|
|
new_histids = []
|
|
for parent_id in self.parent_ids:
|
|
for child_id in self.pa2ch[parent_id]:
|
|
new_histid = self.histid.copy()[self.histid.node_id==parent_id]
|
|
new_histid[['inc_edge_A', 'out_edge_A', 'inc_edge_B', 'out_edge_B']] = np.nan
|
|
for i, row in new_histid.iterrows():
|
|
phas_A = row.phas_A
|
|
phas_B = row.phas_B
|
|
new_match = self.match6[self.match6.node_id==child_id]
|
|
Arow = new_match[(new_match.phase_no==phas_A) & (new_match.ring_type=='A')]
|
|
if ~ Arow[['inc_edge', 'out_edge']].isna().all().all():
|
|
inc_edge = Arow.iloc[0].inc_edge
|
|
out_edge = Arow.iloc[0].out_edge
|
|
new_histid.loc[i, ['inc_edge_A', 'out_edge_A']] = [inc_edge, out_edge]
|
|
Brow = new_match[(new_match.phase_no==phas_B) & (new_match.ring_type=='B')]
|
|
if ~ Brow[['inc_edge', 'out_edge']].isna().all().all():
|
|
inc_edge = Brow.iloc[0].inc_edge
|
|
out_edge = Brow.iloc[0].out_edge
|
|
new_histid.loc[i, ['inc_edge_B', 'out_edge_B']] = [inc_edge, out_edge]
|
|
new_histid.loc[i, 'node_id'] = child_id
|
|
new_histids.append(new_histid)
|
|
new_histids = pd.concat(new_histids)
|
|
self.histids = pd.concat([self.histid.copy(), new_histids])
|
|
self.histids = self.histids.sort_values(by=['start_unix', 'node_id', 'phas_A', 'phas_B']).reset_index(drop=True)
|
|
|
|
# 5. 신호 생성
|
|
print("5. 신호를 생성합니다.")
|
|
def get_signals(self):
|
|
self.initialize_states()
|
|
self.assign_signals()
|
|
self.set_timepoints()
|
|
self.assign_red_yellow()
|
|
self.make_tl_file()
|
|
|
|
# 5-1. 신호초기화
|
|
def initialize_states(self):
|
|
'''
|
|
신호 초기화
|
|
|
|
input :
|
|
(1) net : 네트워크
|
|
(2) nodes : 노드 목록
|
|
(3) histids : 모든 교차로에 대한 시작유닉스 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
|
|
|
|
output : node2init
|
|
- 각 노드를 초기화된 신호로 맵핑하는 딕셔너리
|
|
- 초기화된 신호란, 우회전을 g로 나머지는 r로 지정한 신호를 말함.
|
|
'''
|
|
self.node2init = {}
|
|
for node in self.nodes:
|
|
node_id = node.getID()
|
|
conns = [(c.getJunctionIndex(), c) for c in node.getConnections()]
|
|
conns = [c for c in conns if c[0] >= 0]
|
|
conns = sorted(conns, key=lambda x: x[0])
|
|
state = []
|
|
for i, ci in conns:
|
|
if ci.getTLLinkIndex() < 0:
|
|
continue
|
|
are_foes = False
|
|
for j, cj in conns:
|
|
if ci.getTo() == cj.getTo():
|
|
continue
|
|
if node.areFoes(i, j):
|
|
are_foes = True
|
|
break
|
|
state.append('r' if are_foes else 'g')
|
|
self.node2init[node_id] = state
|
|
|
|
# 어떤 연결과도 상충이 일어나지는 않지만, 신호가 부여되어 있는 경우에는 r을 부여
|
|
for _, row in self.histids.iterrows():
|
|
node_id = row['node_id']
|
|
inc_edge_A = row.inc_edge_A
|
|
inc_edge_B = row.inc_edge_B
|
|
out_edge_A = row.out_edge_A
|
|
out_edge_B = row.out_edge_B
|
|
|
|
if pd.isna(inc_edge_A) or pd.isna(out_edge_A):
|
|
pass
|
|
else:
|
|
inc_edge_A = self.net.getEdge(inc_edge_A)
|
|
out_edge_A = self.net.getEdge(out_edge_A)
|
|
for conn in inc_edge_A.getConnections(out_edge_A):
|
|
index = conn.getTLLinkIndex()
|
|
if index >= 0:
|
|
self.node2init[node_id][index] = 'r'
|
|
|
|
if pd.isna(inc_edge_B) or pd.isna(out_edge_B):
|
|
pass
|
|
else:
|
|
inc_edge_B = self.net.getEdge(inc_edge_B)
|
|
out_edge_B = self.net.getEdge(out_edge_B)
|
|
for conn in inc_edge_B.getConnections(out_edge_B):
|
|
index = conn.getTLLinkIndex()
|
|
if index >= 0:
|
|
self.node2init[node_id][index] = 'r'
|
|
|
|
# 5-2. 녹색신호 부여
|
|
def assign_signals(self):
|
|
'''
|
|
진입·진출엣지를 신호문자열로 배정
|
|
|
|
input :
|
|
(1) histids : 모든 교차로에 대한 (시작유닉스, A현시, B현시)별 현시시간, 진입·진출엣지
|
|
(2) node2init : 각 노드를 초기화된 신호로 맵핑하는 딕셔너리
|
|
(3) net : 네트워크
|
|
|
|
output : sigtable
|
|
- 모든 교차로에 대한 (시작유닉스, A현시, B현시)별 현시시간, 신호문자열
|
|
- 황색 및 적색신호는 아직 반영되지 않았음.
|
|
'''
|
|
self.sigtable = self.histids.copy()
|
|
self.sigtable['init_state'] = self.sigtable['node_id'].map(self.node2init)
|
|
self.sigtable['state'] = self.sigtable['init_state'].map(lambda x:''.join(x))
|
|
for i, row in self.sigtable.iterrows():
|
|
node_id = row.node_id
|
|
inc_edge_A = row.inc_edge_A
|
|
inc_edge_B = row.inc_edge_B
|
|
out_edge_A = row.out_edge_A
|
|
out_edge_B = row.out_edge_B
|
|
state = copy.deepcopy(self.node2init)[node_id]
|
|
if pd.isna(inc_edge_A) or pd.isna(out_edge_A):
|
|
pass
|
|
else:
|
|
inc_edge_A = self.net.getEdge(inc_edge_A)
|
|
out_edge_A = self.net.getEdge(out_edge_A)
|
|
for conn in inc_edge_A.getConnections(out_edge_A):
|
|
index = conn.getTLLinkIndex()
|
|
if index >= 0:
|
|
state[index] = 'G'
|
|
self.sigtable.at[i, 'state'] = ''.join(state)
|
|
|
|
if pd.isna(inc_edge_B) or pd.isna(out_edge_B):
|
|
pass
|
|
else:
|
|
inc_edge_B = self.net.getEdge(inc_edge_B)
|
|
out_edge_B = self.net.getEdge(out_edge_B)
|
|
for conn in inc_edge_B.getConnections(out_edge_B):
|
|
index = conn.getTLLinkIndex()
|
|
if index >= 0:
|
|
state[index] = 'G'
|
|
self.sigtable.at[i, 'state'] = ''.join(state)
|
|
self.sigtable = self.sigtable.dropna(subset='state')
|
|
self.sigtable = self.sigtable.reset_index(drop=True)
|
|
self.sigtable['phase_sumo'] = self.sigtable.groupby(['node_id', 'start_unix']).cumcount()
|
|
self.sigtable = self.sigtable[['node_id', 'start_unix', 'phase_sumo', 'duration', 'state']]
|
|
self.sigtable = self.sigtable.sort_values(by=['start_unix', 'node_id'])
|
|
self.sigtable['start_dt'] = self.sigtable['start_unix'].apply(lambda x:datetime.fromtimestamp(x))
|
|
|
|
# 5-3. 신호 파일의 시작 및 종료시각 설정
|
|
def set_timepoints(self):
|
|
self.offsets = {}
|
|
self.Sigtable = []
|
|
sim_start = self.present_time - 300
|
|
for node_id, group in self.sigtable.groupby('node_id'):
|
|
lsbs = group[group['start_unix'] < sim_start]['start_unix'].max() # the last start_unix before sim_start
|
|
self.offsets[node_id] = lsbs - sim_start
|
|
group = group[group.start_unix >= lsbs]
|
|
start_unixes = np.array(group.start_unix)
|
|
start_unixes = np.sort(np.unique(start_unixes))[:self.node2num_cycles[node_id]]
|
|
group = group[group.start_unix.isin(start_unixes)]
|
|
self.Sigtable.append(group)
|
|
self.Sigtable = pd.concat(self.Sigtable)
|
|
|
|
# 5-4. 적색 및 황색신호 부여
|
|
def assign_red_yellow(self):
|
|
'''
|
|
적색, 황색신호를 반영한 신호문자열 배정
|
|
|
|
input : Sigtable
|
|
- 모든 교차로에 대한 (시작유닉스, 세부현시번호)별 현시시간, 신호문자열, 진입·진출엣지
|
|
* 세부현시란 오버랩을 반영한 현시번호를 뜻함.
|
|
|
|
output : SIGTABLE
|
|
- 모든 교차로에 대한 (시작유닉스, 녹황적세부현시번호)별 현시시간, (황·적색신호가 포함된) 신호문자열
|
|
* 녹황적세부현시번호란 세부현시번호에 r, g, y 옵션까지 포함된 현시번호를 뜻함.
|
|
'''
|
|
self.SIGTABLE = []
|
|
for node_id, group in self.Sigtable.groupby('node_id'):
|
|
new_rows_list = []
|
|
for i in range(1, len(group)):
|
|
prev_row = group.iloc[i-1:i].copy()
|
|
next_row = group.iloc[i:i+1].copy()
|
|
new_rows = pd.concat([prev_row, prev_row, next_row]).reset_index(drop=True)
|
|
new_rows.loc[0, 'phase_sumo'] = str(prev_row.phase_sumo.iloc[0]) + '_g'
|
|
new_rows.loc[0, 'duration'] = new_rows.loc[0, 'duration'] - 5
|
|
new_rows.loc[1, 'phase_sumo'] = str(prev_row.phase_sumo.iloc[0]) + '_y'
|
|
new_rows.loc[1, 'duration'] = 4
|
|
yellow_state = ''
|
|
red_state = ''
|
|
for a, b in zip(prev_row.state.iloc[0], next_row.state.iloc[0]):
|
|
if a == 'G' and b == 'r':
|
|
yellow_state += 'y'
|
|
red_state += 'r'
|
|
else:
|
|
yellow_state += a
|
|
red_state += a
|
|
new_rows.loc[2, 'phase_sumo'] = str(next_row.phase_sumo.iloc[0]) + '__r'
|
|
new_rows.loc[2, 'duration'] = 1
|
|
new_rows.loc[1, 'state'] = yellow_state
|
|
new_rows.loc[2, 'state'] = red_state
|
|
new_rows_list.append(new_rows)
|
|
next_row['phase_sumo'] = str(next_row.phase_sumo.iloc[0]) + '_g'
|
|
next_row['duration'] -= 5
|
|
# next_row.loc['duration'] -= 5
|
|
new_rows_list.append(next_row)
|
|
new_rows = pd.concat(new_rows_list)
|
|
self.SIGTABLE.append(new_rows)
|
|
self.SIGTABLE = pd.concat(self.SIGTABLE).sort_values(by=['node_id', 'start_unix', 'phase_sumo']).reset_index(drop=True)
|
|
|
|
# 5-5. 신호파일 생성
|
|
def make_tl_file(self):
|
|
strings = ['<additional>\n']
|
|
for node_id, group in self.SIGTABLE.groupby('node_id'):
|
|
strings.append(f' <tlLogic id="{node_id}" type="static" programID="{node_id}_prog" offset="{self.offsets[node_id]}">\n')
|
|
for i, row in group.iterrows():
|
|
duration = row.duration
|
|
state = row.state
|
|
strings.append(f' <phase duration="{duration}" state="{state}"/>\n')
|
|
strings.append(' </tlLogic>\n')
|
|
strings.append('</additional>')
|
|
strings = ''.join(strings)
|
|
# 저장
|
|
self.path_output = os.path.join(self.path_results, f'sn_{self.present_time}.add.xml')
|
|
with open(self.path_output, 'w') as f:
|
|
f.write(strings)
|
|
|
|
# 6. 이슈사항 저장
|
|
def write_issues(self):
|
|
print('6. 이슈사항을 저장합니다.')
|
|
path_issues = os.path.join(self.path_results, "issues_generate_signals.txt")
|
|
with open(path_issues, "w", encoding="utf-8") as file:
|
|
for item in self.issues:
|
|
file.write(item + "\n")
|
|
if self.issues:
|
|
print("데이터 처리 중 발생한 특이사항은 다음과 같습니다. :")
|
|
for review in self.issues:
|
|
print(review)
|
|
|
|
def main(self):
|
|
# 1. 데이터 준비
|
|
self.prepare_data()
|
|
# 2. 신호이력 전처리
|
|
self.process_history()
|
|
# 3. 이동류정보 전처리
|
|
self.process_movement()
|
|
# 4. 통합테이블 생성
|
|
self.make_histids()
|
|
# 5. 신호 생성
|
|
self.get_signals()
|
|
# 6. 이슈사항 저장
|
|
self.write_issues()
|
|
|
|
if __name__ == '__main__':
|
|
self = SignalGenerator()
|
|
self.main()
|