In [1]:
import pandas as pd
import numpy as np
import os, sys, json, argparse
import sumolib, traci
from tqdm import tqdm
from datetime import datetime
path_root = os.path.dirname(os.path.dirname(os.path.abspath('.')))

In [2]:
class SignalGenerator:
    pass
self = SignalGenerator()
self.config_name = 'test_0729'
self.file_net = 'new_sungnam_network_internal_target_0721.net.xml'
self.month = 7
self.day = 29
self.hour = 9
self.minute = 25

In [3]:
# 루트폴더 지정
self.path_root = path_root
with open(os.path.join(self.path_root, 'configs', f'config_{self.config_name}.json'), 'r') as config_file:
    config = json.load(config_file)

In [4]:
# 주요 폴더 경로 지정
self.paths = config['paths']
self.path_data = os.path.join(self.path_root, *self.paths['data'])
self.path_intermediates = os.path.join(self.path_root, *self.paths['intermediates'])
self.path_results = os.path.join(self.path_root, *self.paths['results'])
self.path_tables = os.path.join(self.path_root, *self.paths['tables'])
self.path_networks = os.path.join(self.path_root, *self.paths['networks'])
self.path_scripts = os.path.join(self.path_root, *self.paths['scripts'])

# 이슈사항 목록
self.issues = []

In [5]:
print(self.path_tables)

c:\github\siggen\test_0729\data\tables


In [6]:
self.midnight = int(datetime(2024, self.month, self.day, 0, 0, 0).timestamp())
self.next_day = int(datetime(2024, self.month, self.day+1, 0, 0, 0).timestamp())
self.fsecs = range(self.midnight, self.next_day, 5) # fsecs : unix time by Five SECondS
self.fmins = range(self.midnight, self.next_day, 300) # fmins : unix time by Five MINuteS

self.present_time = int(datetime(2024, self.month, self.day, self.hour, self.minute).timestamp())
self.present_time = max([fmin for fmin in list(self.fmins) if fmin <= self.present_time])

self.sim_timespan = 300
self.adder = 600 # 10분 : '현재시점 + 10분'에 가상신호를 생성하기 위함.
self.subtractor = 1800 # 30분 : '현재시점 - 30분'의 신호이력을 가져온다.

In [7]:
# 1-1. 네트워크 불러오기
self.net = sumolib.net.readNet(os.path.join(self.path_networks, self.file_net))

In [8]:
# cnames = []
# cnames.extend(list(self.inter_info))
# cnames.extend(list(self.plan))
# cnames.extend(list(self.history))
# cnames.extend(list(self.inter_node))
# cnames.extend(list(self.matching))
# cnames.extend(list(self.match1))
# cnames.extend(list(self.match6))
# sorted(set(cnames))

In [9]:
# 1-2. 테이블 불러오기
# 모든 컬럼에 대하여 데이터타입 지정
loading_dtype_1 = {
                   'CRSRD_ID':int,    'FRST_REG_DT':str,  'LAST_MDFCN_DT':str,
                   'OCRN_DT':str,     'cycle':int,        'group_no':str,
                   'hh':int,          'inc_edge_id':str,  'inter_lat':float,
                   'inter_lon':float, 'inter_name':str,   'inter_no':int,
                   'inter_type':str,  'main_phase_no':str,'mm':int,
                   'move_A':int,      'move_B':int,       'move_no':int,
                   'node_id':str,     'offset':int,       'out_edge_id':str,
                   'phas_A':int,      'phas_B':int,       'phase_no':int,
                   'plan_no':int,     'ring_type':str,    'state':str,
                   'turn_type':str
                   }
loading_dtype_2 = dict()
for alph in ['A', 'B']:
    for j in range(1,9):
        loading_dtype_2[f'RING{alph}_PHASE{j}'] = int
        loading_dtype_2[f'dura_{alph}{j}'] = int
        loading_dtype_2[f'red_{alph}{j}'] = int
        loading_dtype_2[f'yellow_{alph}{j}'] = int
loading_dtype = {**loading_dtype_1, **loading_dtype_2}

# 테이블 불러오기
self.inter_info = pd.read_csv(os.path.join(self.path_tables, 'inter_info.csv'), dtype=loading_dtype)
self.plan       = pd.read_csv(os.path.join(self.path_tables, 'plan.csv'), dtype=loading_dtype)
self.history    = pd.read_csv(os.path.join(self.path_tables, 'TL_IF_SIGL_CYCL.csv'), dtype=loading_dtype)
self.inter_node = pd.read_csv(os.path.join(self.path_tables, 'inter_node.csv'), dtype=loading_dtype)
self.matching   = pd.read_csv(os.path.join(self.path_intermediates, 'matching.csv'), dtype=loading_dtype)
self.match1  = pd.read_csv(os.path.join(self.path_intermediates, 'match1.csv'), dtype=loading_dtype)
self.match6     = pd.read_csv(os.path.join(self.path_intermediates, 'match6.csv'), dtype=loading_dtype)

In [10]:
# 컬럼명 변경 적용
self.history = self.history.rename(columns={'OCRN_DT':'end_unix',   'CRSRD_ID':'inter_no'})
self.history = self.history.rename(columns={f'RING{alph}_PHASE{i}':f'dura_{alph}{i}' for alph in ['A', 'B'] for i in range(1,9)})
self.history = self.history.drop(columns='FRST_REG_DT')
self.history['end_unix'] = pd.to_datetime(self.history['end_unix'])
self.history['end_unix'] = self.history['end_unix'].astype(int) // 10**9
self.plan = self.plan.rename(columns={'hh':'start_hour', 'mm':'start_minute'})

In [11]:
# 1-5. 보조 딕셔너리, 데이터프레임, 리스트 등 만들기

# inter2node : a dictionary that maps inter_no to the node_id
inter_node_p = self.inter_node[self.inter_node.inter_type=='parent']
self.inter2node = dict(zip(inter_node_p['inter_no'], inter_node_p['node_id']))
self.node2inter = dict(zip(self.inter_node['node_id'], self.inter_node['inter_no']))

# split, isplit : A,B 분리 혹은 통합시 사용될 수 있는 딕셔너리 
self.splits = {} # splits maps (inter_no, start_hour, start_minute) to split 
for i, row in self.plan.iterrows():
    inter_no = row.inter_no
    start_hour = row.start_hour
    start_minute = row.start_minute
    cycle = row.cycle

    dura_A = np.array(row[[f'dura_A{j}' for j in range(1, 9)]])
    dura_B = np.array(row[[f'dura_B{j}' for j in range(1, 9)]])

    cums_A = dura_A.cumsum()
    cums_B = dura_B.cumsum()
    combined_row = np.unique(np.concatenate((cums_A,cums_B)))
    detailed_durations = np.concatenate(([combined_row[0]], np.diff(combined_row)))

    self.splits[(inter_no, start_hour, start_minute)] = {} # split maps (phas_A, phas_B) to k
    ja = 0
    jb = 0
    for k in range(len(detailed_durations)):
        dura_A[ja] -= detailed_durations[k]
        dura_B[jb] -= detailed_durations[k]
        self.splits[(inter_no, start_hour, start_minute)][(ja+1, jb+1)] = k+1
        if dura_A[ja] == 0:
            ja += 1
        if dura_B[jb] == 0:
            jb += 1

self.isplits = {} # the inverse of splits
for i in self.splits:
    self.isplits[i] = {self.splits[i][k]:k for k in self.splits[i]} # isplit maps k to (phas_A, phas_B)

# timetable : 교차로별 프로그램 시작시각
self.timetable = self.plan[['start_hour', 'start_minute']].drop_duplicates()
self.timetable['start_seconds'] = self.midnight + self.timetable['start_hour'] * 3600 + self.timetable['start_minute'] * 60

with open(os.path.join(self.path_intermediates, 'ids.json'), 'r') as file:
    ids = json.load(file)
self.inter_nos = ids['inter_nos']
self.node_ids = ids['node_ids']
self.parent_ids = ids['parent_ids']
self.child_ids = ids['child_ids']
self.uturn_ids = ids['uturn_ids']
self.coord_ids = ids['coord_ids']

# A dictionary that maps parent_id to a list of child_ids
self.pa2ch = {}
for parent_id in self.parent_ids:
    inter_no = self.node2inter[parent_id]
    self.pa2ch[parent_id] = list(self.inter_node[(self.inter_node.inter_no==inter_no) & (self.inter_node.inter_type=='child')].node_id)

# node2num_cycles : A dictionary that maps a node_id to the number of cycles
with open(os.path.join(self.path_intermediates, 'node2num_cycles.json'), 'r') as file:
    # json.load() 함수를 사용해 파일 내용을 Python 딕셔너리로 불러옵니다.
    self.node2num_cycles = json.load(file)

# 초기화신호 불러오기
with open(os.path.join(self.path_intermediates, 'node2init.json'), 'r') as file:
    self.node2init = json.load(file)        

self.plan_set = self.plan.set_index(['inter_no','start_hour','start_minute'])

In [15]:
def load_prow(inter_no, time):
    '''
    load the planned row
    '''
    # 프로그램 시작시각
    program_starts = np.array(self.timetable.start_seconds)
    idx = (program_starts <= time).sum() - 1
    program_start = program_starts[idx]

    # 최근 프로그램 시작시각에 대한 신호계획
    start_hour = self.timetable.iloc[idx].start_hour
    start_minute = self.timetable.iloc[idx].start_minute
    # prow = self.plan[(self.plan.inter_no==inter_no) & (self.plan.start_hour==start_hour) & (self.plan.start_minute==start_minute)] # planned row
    prow = self.plan_set.loc[(inter_no, start_hour, start_minute)]
    prow = pd.DataFrame([prow],index=[0])
    prow['inter_no'] = inter_no

    return program_start, prow


In [16]:
# 2. 신호이력 전처리
# 2-1. rhistory

# 1. 조회시점의 유닉스 타임 이전의 신호이력 수집
self.rhistory = self.history.copy() # recent history
self.rhistory = self.rhistory[(self.rhistory.end_unix <= self.present_time) & (self.rhistory.end_unix > self.present_time - self.subtractor)]

# rhistory에 모든 교차로번호가 존재하지 않으면 해당 교차로번호에 대한 신호이력을 추가함 (at 최근 프로그램 시작시각)
whole_inter_nos = set(self.history.inter_no.unique())
recent_inter_nos = set(self.rhistory.inter_no.unique())
if not whole_inter_nos==recent_inter_nos:
    for inter_no in whole_inter_nos - recent_inter_nos:
        program_start, prow = load_prow(inter_no, self.present_time - self.subtractor)
        cycle = prow.cycle.iloc[0]
        row1 = prow.copy()
        row2 = prow.copy()
        # prow에서 필요한 부분을 rhistory에 추가
        row1['end_unix'] = program_start
        row2['end_unix'] = program_start + cycle
        self.rhistory = pd.concat([self.rhistory, row1, row2])#.reset_index(drop=True)

# present_time + adder 의 시각에 한 주기의 신호 추가
for inter_no in set(whole_inter_nos):
    program_start, prow = load_prow(inter_no, self.present_time)
    cycle = prow.cycle.iloc[0]
    row3 = prow.copy()
    # prow에서 필요한 부분을 rhistory에 추가
    row3['end_unix'] = self.present_time + self.adder
    self.rhistory = pd.concat([self.rhistory, row3])#.reset_index(drop=True)

# 2. 시작 유닉스 타임컬럼 생성 후 종류 유닉스 타임에서 현시별 현시기간 컬럼의 합을 뺀 값으로 입력
# - 현시시간의 합을 뺀 시간의 +- 10초 이내에 이전 주기정보가 존재하면 그 유닉스 시간을 시작 유닉스시간 값으로 하고, 존재하지 않으면 현시시간의 합을 뺀 유닉스 시간을 시작 유닉스 시간으로 지정
for i, row in self.rhistory.iterrows():
    inter_no = row.inter_no
    end_unix = row.end_unix
    elapsed_time = row[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]].sum() // 2 # 현시시간 합
    # 이전 유닉스 존재하지 않음 : 현시시간 합의 차
    start_unix = end_unix - elapsed_time
    pre_rows = self.history[:i] # previous rows
    if inter_no in pre_rows.inter_no.unique(): # 이전 유닉스 존재
        pre_unix = pre_rows[pre_rows.inter_no == inter_no]['end_unix'].iloc[-1] # previous unix time
        # 이전 유닉스 존재, abs < 10 : 이전 유닉스
        if abs(pre_unix - start_unix) < 10:
            start_unix = pre_unix
        # 이전 유닉스 존재, abs >=10 : 현시시간 합의 차
        else:
            pass
    self.rhistory.loc[i, 'start_unix'] = start_unix
self.rhistory[self.rhistory.isna()] = 0
self.rhistory['start_unix'] = self.rhistory['start_unix'].astype(int)
self.rhistory = self.rhistory[['inter_no', 'start_unix'] + [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)] + ['cycle']]

KeyError: (np.int64(456), np.int64(6), np.int64(0))