def make_rhistory(m:int): ''' input : m - m ranges from 0 to 287, but 0 makes an error where 288 = 86400//300 - present_time = fmins[m] : 현재시점 + fmins[m-2] : 시뮬레이션 시작시점 + fmins[m-1] : 시뮬레이션 종료시점 output : rhistory - recent history - 현재시각(present_time) 이전 1시간 동안의 신호이력에 대하여 결측치 및 이상치를 처리한 결과 - 교차로번호(inter_no), 종료유닉스(end_unix), 현시시간(dur_Aj, dur_Bj), 주기(cycle), 옵셋(offset) ''' fmins = range(midnight, next_day, 300) # fmins : unix time by Five MINuteS present_time = fmins[m] # 현재시점 print(datetime.fromtimestamp(present_time)) Rhists = [] # Recent history (1시간 이내) for inter_no in history.inter_no.unique(): # - 5분마다 신호이력 데이터 수집해서 통합테이블 생성할때 # 1. 조회시점의 유닉스 타임을 기준으로 신호이력의 유닉스 타임이 1시간 이내인(Rhist) 데이터 수집 rhistory = history.copy() # recent history rhistory = rhistory[(rhistory.end_unix < present_time)] hours = np.array(range(midnight, next_day + 1, 3600)) rhist = rhistory.copy()[rhistory.inter_no == inter_no] # 특정한 inter_no rhist = rhist.reset_index(drop=True) new_rows = [] # 1-1. 결측치 처리 : 인접한 두 end_unix의 차이가 계획된 주기의 두 배보다 크면 결측이 일어났다고 판단 for n in range(len(rhist) - 1): curr_unix = rhist.iloc[n].end_unix # current end_unix next_unix = rhist.iloc[n+1].end_unix # next end_unix cycle = rhist.iloc[n].cycle if next_unix - curr_unix >= 2 * cycle: # 현재 unix를 계획된 주기만큼 늘려가면서 한 행씩 채워나간다. #(다음 unix와의 차이가 계획된 주기보다 작거나 같아질 때까지) while next_unix - curr_unix > cycle: curr_unix += cycle start_seconds = np.array(timetable.start_seconds) idx = (start_seconds <= curr_unix).sum() - 1 start_hour = timetable.iloc[idx].start_hour start_minute = timetable.iloc[idx].start_minute prow = plan[(plan.inter_no==inter_no) & (plan.start_hour==start_hour) & (plan.start_minute==start_minute)] # planned row prow = prow.drop(['start_hour', 'start_minute'], axis=1) prow['end_unix'] = curr_unix cycle = prow.iloc[0].cycle new_rows.append(prow) rhist = pd.concat([rhist] + new_rows).sort_values(['end_unix']) rhist = rhist.reset_index(drop=True) # 1-2. 이상치 처리 : 기준유닉스로부터의 시간차이와 현시시간합이 11 이상 차이나면 이상치가 발생했다고 판단 Rhist = rhist.copy() # recent history 1704393231 Rhist = Rhist[(Rhist.end_unix >= present_time - 3600)] # Recent history (1시간 이내) Rhist = Rhist.reset_index(drop=True) Rhist['D_n'] = 0 Rhist['S_n'] = 0 for n in range(len(Rhist)): curr_unix = Rhist.iloc[n].end_unix # current end_unix cycle = Rhist.iloc[n].cycle ghour_lt_curr_unix = hours[hours < curr_unix].max() # the greatest hour less than curr_unix end_unixes = rhist.end_unix.unique() end_unixes_lt_ghour = np.sort(end_unixes[end_unixes < ghour_lt_curr_unix]) # end unixes less than ghour_lt_end_unix base_unix = end_unixes_lt_ghour[-5] # 기준유닉스 : curr_unix보다 작은 hour 중에서 가장 큰 값으로부터 다섯 번째로 작은 end_unix # D_n : 시간차이 D_n = curr_unix - base_unix ddurations = rhist[(rhist.end_unix > base_unix) & (rhist.end_unix <= curr_unix)][[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] # S_n : 현시시간합 S_n = ddurations.values.sum() // 2 Rhist.loc[n, ['D_n', 'S_n']] = [D_n, S_n] n = 1 while n < len(Rhist): prev_unix = Rhist[Rhist.index==n-1]['end_unix'].iloc[0] # previous end_unix curr_unix = Rhist[Rhist.index==n]['end_unix'].iloc[0] # current end_unix R_n = (curr_unix - prev_unix) / cycle ghour_lt_curr_unix = hours[hours < curr_unix].max() # the greatest hour less than curr_unix end_unixes = rhist.end_unix.unique() end_unixes_lt_ghour = np.sort(end_unixes[end_unixes < ghour_lt_curr_unix]) # end unixes less than ghour_lt_end_unix base_unix = end_unixes_lt_ghour[-5] # 기준유닉스 : curr_unix보다 작은 hour 중에서 가장 큰 값으로부터 다섯 번째로 작은 end_unix # D_n : 시간차이 D_n = curr_unix - base_unix # S_n : 현시시간합 ddurations = rhist[(rhist.end_unix > base_unix) & (rhist.end_unix <= curr_unix)][[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] S_n = ddurations.values.sum() // 2 # 비율이 0.5보다 작거나 같으면 해당 행을 삭제 if (abs(D_n - S_n) > 10) & (R_n <= 0.5): # print("lt", inter_no, curr_unix, round(R_n,2), D_n, S_n) # display(Rhist.iloc[n]) Rhist = Rhist.drop(index=n) n += 1 # 행삭제에 따른 curr_unix, D_n, S_n 등 재정의 if not Rhist[Rhist.index==n]['end_unix'].empty: # 마지막 행을 삭제하여 뒤의 행이 없을 때를 대비 curr_unix = Rhist[Rhist.index==n]['end_unix'].iloc[0] # current end_unix R_n = (curr_unix - prev_unix) / cycle ghour_lt_curr_unix = hours[hours < curr_unix].max() # the greatest hour less than curr_unix end_unixes = rhist.end_unix.unique() end_unixes_lt_ghour = np.sort(end_unixes[end_unixes < ghour_lt_curr_unix]) # end unixes less than ghour_lt_end_unix base_unix = end_unixes_lt_ghour[-5] # 기준유닉스 : curr_unix보다 작은 hour 중에서 가장 큰 값으로부터 다섯 번째로 작은 end_unix # D_n : 시간차이 D_n = curr_unix - base_unix # S_n : 현시시간합 ddurations = rhist[(rhist.end_unix > base_unix) & (rhist.end_unix <= curr_unix)][[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] S_n = ddurations.values.sum() // 2 # 비율이 0.5보다 크면 해당 행 조정 (비율을 유지한 채로 현시시간 대체) if (abs(D_n - S_n) > 10) & (R_n > 0.5): start_seconds = np.array(timetable.start_seconds) idx = (start_seconds <= curr_unix).sum() - 1 start_hour = timetable.iloc[idx].start_hour start_minute = timetable.iloc[idx].start_minute prow = plan[(plan.inter_no==inter_no) & (plan.start_hour==start_hour) & (plan.start_minute==start_minute)].copy().reset_index(drop=True).iloc[0] # planned row adjusted_dur = prow[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] * R_n # 조정된 현시시간을 정수로 바꿈 int_parts = adjusted_dur.apply(lambda x: int(x)) frac_parts = adjusted_dur - int_parts difference = int(round(adjusted_dur.sum())) - int_parts.sum() # 소수 부분이 가장 큰 상위 'difference'개의 값에 대해 올림 처리 for _ in range(difference): max_frac_index = frac_parts.idxmax() int_parts[max_frac_index] += 1 frac_parts[max_frac_index] = 0 # 이미 처리된 항목은 0으로 설정 Rhist.loc[n, [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] = int_parts.values Rhist.loc[n, 'cycle'] = int_parts.sum() // 2 # print("gt", inter_no, curr_unix, round(R_n,2), D_n, S_n) n += 1 Rhist = Rhist.drop(columns=['offset', 'D_n', 'S_n']) Rhists.append(Rhist) Rhists = pd.concat(Rhists) Rhists = Rhists.sort_values(by=['end_unix', 'inter_no']).reset_index(drop=True) return Rhists def make_histid(m): fmins = range(midnight, next_day, 300) # fmins : unix time by Five MINuteS present_time = fmins[m] rhistory = make_rhistory(m) # 2. 시작 유닉스 타임컬럼 생성 후 종류 유닉스 타임에서 현시별 현시기간 컬럼의 합을 뺀 값으로 입력 # - 현시시간의 합을 뺀 시간의 +- 10초 이내에 이전 주기정보가 존재하면 그 유닉스 시간을 시작 유닉스시간 값으로 하고, 존재하지 않으면 현시시간의 합을 뺀 유닉스 시간을 시작 유닉스 시간으로 지정 for i, row in rhistory.iterrows(): # 이전 유닉스 존재하지 않음 => 현시시간 합의 차 # 이전 유닉스 존재, abs < 10 => 이전 유닉스 # 이전 유닉스 존재, abs >=10 => 현시시간 합의 차 inter_no = row.inter_no end_unix = row.end_unix elapsed_time = row[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]].sum() // 2 # 현시시간 합 start_unix = end_unix - elapsed_time pre_rows = history[:i] # previous rows if inter_no in pre_rows.inter_no.unique(): # 이전 유닉스 존재 pre_unix = pre_rows[pre_rows.inter_no == inter_no]['end_unix'].iloc[-1] # previous unix time if abs(pre_unix - start_unix) < 10: # abs < 10 start_unix = pre_unix else: # abs >= 10 pass rhistory.loc[i, 'start_unix'] = start_unix rhistory[rhistory.isna()] = 0 rhistory['start_unix'] = rhistory['start_unix'].astype(int) # # with pd.option_context('display.max_rows', None, 'display.max_columns', None): # # display(rhistory) rhistory = rhistory[['inter_no', 'start_unix'] + [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)] + ['cycle']] # 계층화된 형태로 변환 hrhistory = [] # hierarchied recent history for i, row in rhistory.iterrows(): inter_no = row.inter_no start_unix = row.start_unix ind = (timetable['start_seconds'] <= row.start_unix).sum() - 1 start_hour = timetable.iloc[ind].start_hour start_minute = timetable.iloc[ind].start_minute isplit = isplits[(inter_no, start_hour, start_minute)] phas_As = [isplit[j][0] for j in isplit.keys()] phas_Bs = [isplit[j][1] for j in isplit.keys()] durs_A = row[[f'dura_A{j}' for j in range(1,9)]] durs_B = row[[f'dura_B{j}' for j in range(1,9)]] durations = [] for j in range(1, len(isplit)+1): ja = isplit[j][0] jb = isplit[j][1] if ja == jb: durations.append(min(durs_A[ja-1], durs_B[jb-1])) else: durations.append(abs(durs_A[ja-1] - durs_B[ja-1])) new_rows = pd.DataFrame({'inter_no':[inter_no] * len(durations), 'start_unix':[start_unix] * len(durations), 'phas_A':phas_As, 'phas_B':phas_Bs, 'duration':durations}) hrhistory.append(new_rows) hrhistory = pd.concat(hrhistory) hrhistory = hrhistory.sort_values(by = ['start_unix', 'inter_no', 'phas_A', 'phas_B']).reset_index(drop=True) # 3. D 데이터프레임 CSV을 읽어옮 # 4. 시작 유닉스 타임을 키값으로 D데이터프레임을 이력 데이터프레임에 조인함 # 5초단위로 수집한 이동류정보(time2movement[present_time])와 최근 1시간 신호이력(hrhistory)을 병합 movedur = pd.merge(time2movement[present_time], hrhistory, how='inner', on=['inter_no', 'start_unix', 'phas_A', 'phas_B']) # movements and durations movedur = movedur.sort_values(by=['start_unix', 'inter_no', 'phas_A','phas_B']) movedur = movedur[['inter_no', 'start_unix', 'phas_A', 'phas_B', 'move_A', 'move_B', 'duration']] # 이동류 매칭 테이블에서 진입id, 진출id를 가져와서 붙임. for i, row in movedur.iterrows(): inter_no = row.inter_no start_unix = row.start_unix # incoming and outgoing edges A move_A = row.move_A if move_A in [17, 18]: inc_edge_A = np.nan out_edge_A = np.nan else: match_A = matching[(matching.inter_no == inter_no) & (matching.move_no == move_A)].iloc[0] inc_edge_A = match_A.inc_edge out_edge_A = match_A.out_edge movedur.loc[i, ['inc_edge_A', 'out_edge_A']] = [inc_edge_A, out_edge_A] # incoming and outgoing edges B move_B = row.move_B if move_B in [17, 18]: inc_edge_B = np.nan out_edge_B = np.nan else: match_B = matching[(matching.inter_no == inter_no) & (matching.move_no == move_B)].iloc[0] inc_edge_B = match_B.inc_edge out_edge_B = match_B.out_edge movedur.loc[i, ['inc_edge_B', 'out_edge_B']] = [inc_edge_B, out_edge_B] # 이동류 컬럼 제거 movedur = movedur.drop(['move_A', 'move_B'], axis=1) histid = movedur.copy() # history with edge ids (incoming and outgoing edge ids) histid['node_id'] = histid['inter_no'].map(inter2node) histid = histid[['inter_no', 'node_id', 'start_unix', 'phas_A', 'phas_B', 'duration', 'inc_edge_A', 'out_edge_A', 'inc_edge_B', 'out_edge_B']] histid = histid[histid.start_unix > present_time - 3600] # 시뮬레이션 시작시각 : 현재시각 - 600 # 시뮬레이션 종료시각 : 현재시각 - 300 # 현재시각 : present_time, PT # PT-900 ... PT-600 ... PT-300 ... PT return histid