import os import sys if '__file__' in globals(): sys.path.append(os.path.dirname(os.path.abspath(__file__))) import pandas as pd import sumolib from sumolib.net import readNet def read_tllogic(path): df_tllogic = [] for value in sumolib.xml.parse(path, "tlLogic"): tllogic_id = value.id program_id = value.programID for i, phase in enumerate(value.phase): row = { 'tllogic_id': tllogic_id, 'program_id': program_id, 'index': i, 'duration': float(phase.duration), 'state': phase.state, } df_tllogic.append(row) df_tllogic = pd.DataFrame(df_tllogic) return df_tllogic def check_always_red(states): '''Checks whether all characters across multiple state strings at each position are 'r'.''' colors = ['r'] * len(states[0]) for s in states: for i, char in enumerate(s.lower()): if char != 'r': colors[i] = 'G' return ''.join(colors) def check_state_transitions(states): """Check state transitions for missing yellow signals and other specific transitions.""" notes = [] for i, state in enumerate(states[:-1]): next_state = states[i + 1] for j, (p0, p1) in enumerate(zip(state.lower(), next_state.lower())): if p0 == 'g' and p1 == 'r': notes.append(f'녹색 -> 적색, 황색신호 누락. {i}, {i + 1} 사이의 state에서 {j}번째 문자를 확인하세요.') if p0 == 'r' and p1 == 'y': notes.append(f'적색 -> 황색. {i}, {i + 1} 사이의 state에서 {j}번째 문자를 확인하세요.') if p0 == 'y' and p1 == 'g': notes.append(f'황색 -> 녹색. {i}, {i + 1} 사이의 state에서 {j}번째 문자를 확인하세요.') return notes def main(path_tll_xml, path_network_xml, interval): df_tllogic = read_tllogic(path_tll_xml) net = readNet(path_network_xml, withInternal=True, withPrograms=True) review_needed = [] # duration의 합계 비교 df_tmp = df_tllogic \ .groupby(['tllogic_id', 'program_id']) \ .agg(sum_duration=('duration', 'sum')) \ .reset_index() \ .query('sum_duration < @interval') if not df_tmp.empty: for tup in df_tmp.itertuples(): sub = {'note': f'duration의 합계가 {interval}보다 작음. tllogic_id: {tup.tllogic_id}, program_id: {tup.program_id}'} review_needed.append(sub) for (tllogic_id, program_id), group in df_tllogic.groupby(['tllogic_id', 'program_id']): group_sorted = group.sort_values(by='index') states = list(group_sorted['state']) # 항상 적색인 경우 merged_state = check_always_red(states) if 'r' in merged_state: sub = {'note': f'There is always a red signal. state: {merged_state}. tllogic_id: {tllogic_id}, program_id: {program_id}'} review_needed.append(sub) # 신호 색상 순서 확인 for note in check_state_transitions(states): note = note + f' tllogic_id: {tllogic_id}, program_id: {program_id}' review_needed.append({'note': note}) review_needed = pd.DataFrame(review_needed) return review_needed if __name__ == '__main__': file_names = os.listdir(os.path.dirname(os.path.abspath(__file__))) path_tll_xml = file_names[3] path_network_xml = file_names[1] print(path_tll_xml, path_network_xml) interval = 300 review_needed = main(path_tll_xml, path_network_xml, interval) review_needed.to_csv('review_needed.csv', index=False, encoding='cp949')