신호생성 repo (24. 1. 5 ~).
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

94 lines
3.6 KiB

import os
import sys
if '__file__' in globals():
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import pandas as pd
import sumolib
from sumolib.net import readNet
def read_tllogic(path):
df_tllogic = []
for value in sumolib.xml.parse(path, "tlLogic"):
tllogic_id = value.id
program_id = value.programID
for i, phase in enumerate(value.phase):
row = {
'tllogic_id': tllogic_id,
'program_id': program_id,
'index': i,
'duration': float(phase.duration),
'state': phase.state,
}
df_tllogic.append(row)
df_tllogic = pd.DataFrame(df_tllogic)
return df_tllogic
def check_always_red(states):
'''Checks whether all characters across multiple state strings at each position are 'r'.'''
colors = ['r'] * len(states[0])
for s in states:
for i, char in enumerate(s.lower()):
if char != 'r':
colors[i] = 'G'
return ''.join(colors)
def check_state_transitions(states):
"""Check state transitions for missing yellow signals and other specific transitions."""
notes = []
for i, state in enumerate(states[:-1]):
next_state = states[i + 1]
for j, (p0, p1) in enumerate(zip(state.lower(), next_state.lower())):
if p0 == 'g' and p1 == 'r':
notes.append(f'녹색 -> 적색, 황색신호 누락. {i}, {i + 1} 사이의 state에서 {j}번째 문자를 확인하세요.')
if p0 == 'r' and p1 == 'y':
notes.append(f'적색 -> 황색. {i}, {i + 1} 사이의 state에서 {j}번째 문자를 확인하세요.')
if p0 == 'y' and p1 == 'g':
notes.append(f'황색 -> 녹색. {i}, {i + 1} 사이의 state에서 {j}번째 문자를 확인하세요.')
return notes
def main(path_tll_xml, path_network_xml, interval):
df_tllogic = read_tllogic(path_tll_xml)
net = readNet(path_network_xml, withInternal=True, withPrograms=True)
review_needed = []
# duration의 합계 비교
df_tmp = df_tllogic \
.groupby(['tllogic_id', 'program_id']) \
.agg(sum_duration=('duration', 'sum')) \
.reset_index() \
.query('sum_duration < @interval')
if not df_tmp.empty:
for tup in df_tmp.itertuples():
sub = {'note': f'duration의 합계가 {interval}보다 작음. tllogic_id: {tup.tllogic_id}, program_id: {tup.program_id}'}
review_needed.append(sub)
for (tllogic_id, program_id), group in df_tllogic.groupby(['tllogic_id', 'program_id']):
group_sorted = group.sort_values(by='index')
states = list(group_sorted['state'])
# 항상 적색인 경우
merged_state = check_always_red(states)
if 'r' in merged_state:
sub = {'note': f'There is always a red signal. state: {merged_state}. tllogic_id: {tllogic_id}, program_id: {program_id}'}
review_needed.append(sub)
# 신호 색상 순서 확인
for note in check_state_transitions(states):
note = note + f' tllogic_id: {tllogic_id}, program_id: {program_id}'
review_needed.append({'note': note})
review_needed = pd.DataFrame(review_needed)
return review_needed
if __name__ == '__main__':
file_names = os.listdir(os.path.dirname(os.path.abspath(__file__)))
path_tll_xml = file_names[3]
path_network_xml = file_names[1]
print(path_tll_xml, path_network_xml)
interval = 300
review_needed = main(path_tll_xml, path_network_xml, interval)
review_needed.to_csv('review_needed.csv', index=False, encoding='cp949')