{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import os\n", "from tqdm import tqdm\n", "from datetime import datetime" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "datetime.datetime(2024, 1, 5, 11, 55, 13, 99135)" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "now = datetime.now()\n", "now = now.replace(month=1, day=5)\n", "now" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "midnight = int(datetime(2024, 1, 5, 0, 0, 0).timestamp())\n", "next_day = int(datetime(2024, 1, 6, 0, 0, 0).timestamp())\n", "fsecs = range(midnight, next_day, 5) # fsecs : unix time by Five SECondS\n", "fmins = range(midnight, next_day, 300) # fmins : unix time by Five MINuteS" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
inter_nophas_Aphas_Bmove_Amove_Bstart_unix
017711841704408330
117722731704408330
21773317181704408330
317744511704408330
420111831704408330
.....................
70017844611704411830
70120111831704411850
70220144611704411850
70320155741704411850
7042062217181704411880
\n", "

705 rows × 6 columns

\n", "
" ], "text/plain": [ " inter_no phas_A phas_B move_A move_B start_unix\n", "0 177 1 1 8 4 1704408330\n", "1 177 2 2 7 3 1704408330\n", "2 177 3 3 17 18 1704408330\n", "3 177 4 4 5 1 1704408330\n", "4 201 1 1 8 3 1704408330\n", ".. ... ... ... ... ... ...\n", "700 178 4 4 6 1 1704411830\n", "701 201 1 1 8 3 1704411850\n", "702 201 4 4 6 1 1704411850\n", "703 201 5 5 7 4 1704411850\n", "704 206 2 2 17 18 1704411880\n", "\n", "[705 rows x 6 columns]" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "history = pd.read_csv('../Data/tables/history.csv', index_col=0)\n", "m = 105\n", "present_time = fmins[m]\n", "\n", "# - 아래 절차를 5초마다 반복\n", "for fsec in range(midnight, present_time + 1, 5): # fsec : unix time by Five SECond\n", " # 1. 상태 테이블 조회해서 전체 데이터중 필요데이터(교차로번호, A링 현시번호, A링 이동류번호, B링 현시번호, B링 이동류번호)만 수집 : A\n", " # move = time2move[fsec]\n", " move = pd.read_csv(f'../Data/tables/move/move_{fsec}.csv', index_col=0)\n", " # 2. 이력 테이블 조회해서 교차로별로 유닉스시간 최대인 데이터(교차로변호, 종료유닉스타임)만 수집 : B\n", " recent_histories = [group.iloc[-1:] for _, group in history[history['end_unix'] < fsec].groupby('inter_no')] # 교차로별로 유닉스시간이 최대인 행들\n", " if not recent_histories:\n", " rhistory = pd.DataFrame({'inter_no':[], 'end_unix':[]}) # recent history\n", " else:\n", " rhistory = pd.concat(recent_histories)\n", " recent_unix = rhistory[['inter_no', 'end_unix']]\n", " # 3. 상태 테이블 조회정보(A)와 이력 테이블 조회정보(B) 조인(키값 : 교차로번호) : C\n", " move = pd.merge(move, recent_unix, how='left', on='inter_no')\n", " move['end_unix'] = move['end_unix'].fillna(0).astype(int)\n", " move = move.drop_duplicates()\n", " # 4. C데이터 프레임에 신규 컬럼(시작 유닉스타임) 생성 후 종료유닉스 타임 값 입력, 종료 유닉스 타임 컬럼 제거\n", " move = move.rename(columns = {'end_unix':'start_unix'})\n", " # 5. 이동류 이력정보 READ\n", " # - CSV 파일로 서버에 저장된 이동류정보를 읽어옴(파일이 없는 경우에는 데이터가 없는 프레임 D 생성)\n", " try:\n", " if isinstance(movement, pd.DataFrame): # movement가 존재할 경우 그걸 그대로 씀.\n", " pass\n", " else: \n", " movement = pd.DataFrame()\n", " except NameError: # movement가 존재하지 않는 경우 생성\n", " movement = pd.DataFrame()\n", " # 6. 이동류 이력정보 데이터테이블(D)에 C데이터 add\n", " movement = pd.concat([movement, move])\n", " # 7. D데이터 프레임에서 중복데이터 제거(교차로번호, 시작 유닉스타임, A링 현시번호, B링 현시번호 같은 행은 제거)\n", " movement = movement.drop_duplicates(['inter_no','phas_A','phas_B','start_unix'])\n", " # 8. D데이터 보관 시간 기준시간을 시작 유닉스 타임의 최대값 - 3600을 값으로 산출하고, 보관 시간 기준시간보다 작은 시작 유닉스 타임을 가진 행은 모두 제거(1시간 데이터만 보관)\n", " movement = movement[movement.start_unix > fsec - 3600]\n", " movement = movement.sort_values(by=['start_unix','inter_no','phas_A','phas_B']).reset_index(drop=True)\n", "\n", "display(movement)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def make_splits(plan):\n", " # split, isplit : A,B 분리 혹은 통합시 사용될 수 있는 딕셔너리 \n", " splits = {} # splits maps (inter_no, start_hour, start_minute) to split \n", " for i, row in plan.iterrows():\n", " inter_no = row.inter_no\n", " start_hour = row.start_hour\n", " start_minute = row.start_minute\n", " cycle = row.cycle\n", " cums_A = row[[f'dura_A{j}' for j in range(1,9)]].cumsum()\n", " cums_B = row[[f'dura_B{j}' for j in range(1,9)]].cumsum()\n", " splits[(inter_no, start_hour, start_minute)] = {} # split maps (phas_A, phas_B) to k\n", " k = 0\n", " for t in range(cycle):\n", " new_phas_A = len(cums_A[cums_A < t]) + 1\n", " new_phas_B = len(cums_B[cums_B < t]) + 1\n", " if k == 0 or ((new_phas_A, new_phas_B) != (phas_A, phas_B)):\n", " k += 1\n", " phas_A = new_phas_A\n", " phas_B = new_phas_B\n", " splits[(inter_no, start_hour, start_minute)][(phas_A, phas_B)] = k\n", "\n", " isplits = {} # the inverse of splits\n", " for i in splits:\n", " isplits[i] = {splits[i][k]:k for k in splits[i]} # isplit maps k to (phas_A, phas_B)\n", " return splits, isplits\n", "\n", "def make_timetable(plan):\n", " # timetable\n", " timetable = plan[['start_hour', 'start_minute']].drop_duplicates()\n", " timetable['start_seconds'] = midnight + timetable['start_hour'] * 3600 + timetable['start_minute'] * 60\n", " return timetable\n", "\n", "# inter2node\n", "inter_node = pd.read_csv('../Data/tables/inter_node.csv', index_col=0)\n", "inter_node = inter_node[inter_node.inter_type=='parent']\n", "inter2node = dict(zip(inter_node['inter_no'], inter_node['node_id']))\n", "\n", "hours = np.array(range(midnight - 7200, next_day + 1, 3600)) # 정각에 해당하는 시각들 목록" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "def calculate_DS(rhist, curr_unix, hours, timetable):\n", " program_starts = np.array(timetable.start_seconds)\n", " idx = (program_starts <= present_time).sum() - 1\n", " program_start = program_starts[idx]\n", " if list(hours[hours <= curr_unix]):\n", " ghour_lt_curr_unix = hours[hours <= curr_unix].max() # the greatest hour less than or equal to curr_unix\n", " else:\n", " ghour_lt_curr_unix = program_start\n", " start_unixes = rhist.start_unix.unique()\n", " start_unixes_lt_ghour = np.sort(start_unixes[start_unixes < ghour_lt_curr_unix]) # start unixes less than ghour_lt_curr_unix\n", " # 기준유닉스(base_unix) : curr_unix보다 작은 hour 중에서 가장 큰 값으로부터 다섯 번째로 작은 start_unix\n", " if len(start_unixes_lt_ghour) > 5:\n", " base_unix = start_unixes_lt_ghour[-5]\n", " # start_unixes_lt_ghour의 길이가 5 미만일 경우에는 맨 앞 start_unix로 base_unix를 지정\n", " else:\n", " base_unix = rhist.start_unix.min()\n", " D_n = curr_unix - base_unix\n", " S_n_durs = rhist[(rhist.start_unix > base_unix) & (rhist.start_unix <= curr_unix)] \\\n", " [[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]]\n", " S_n = S_n_durs.values.sum() // 2\n", " return D_n, S_n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def load_prow(plan, timetable, inter_no, time):\n", " '''\n", " load planned row\n", " '''\n", " # 프로그램 시작시각\n", " program_starts = np.array(timetable.start_seconds)\n", " idx = (program_starts <= time).sum() - 1\n", " program_start = program_starts[idx]\n", "\n", " # 최근 프로그램 시작시각에 대한 신호계획\n", " start_hour = timetable.iloc[idx].start_hour\n", " start_minute = timetable.iloc[idx].start_minute\n", " prow = plan[(plan.inter_no==inter_no) & (plan.start_hour==start_hour) & (plan.start_minute==start_minute)] # planned row\n", " return program_start, prow" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def make_rhistory(plan, timetable, history, present_time, adder):\n", " # 1. 조회시점의 유닉스 타임 이전의 신호이력 수집\n", " rhistory = history.copy() # recent history\n", " rhistory = rhistory[(rhistory.end_unix <= present_time) & (rhistory.end_unix > present_time - 9000)] # 두 시간 반 전부터 현재까지의 신호이력을 가져옴. 9000 = 3600 * 2.5\n", "\n", " # rhistory에 모든 교차로번호가 존재하지 않으면 해당 교차로번호에 대한 신호이력을 추가함 (at 최근 프로그램 시작시각)\n", " whole_inter_nos = sorted(history.inter_no.unique())\n", " recent_inter_nos = sorted(rhistory.inter_no.unique())\n", " if not whole_inter_nos==recent_inter_nos:\n", " for inter_no in set(whole_inter_nos) - set(recent_inter_nos):\n", " program_start, prow = load_prow(plan, timetable, inter_no, present_time - 9000)\n", " cycle = prow.cycle.iloc[0]\n", " row1 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()\n", " row2 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()\n", " # prow에서 필요한 부분을 rhistory에 추가\n", " row1['end_unix'] = program_start\n", " row2['end_unix'] = program_start + cycle\n", " rhistory = pd.concat([rhistory, row1, row2]).reset_index(drop=True)\n", " # present_time + adder 의 시각에 한 주기의 신호 추가\n", " for inter_no in set(whole_inter_nos):\n", " program_start, prow = load_prow(plan, timetable, inter_no, present_time)\n", " cycle = prow.cycle.iloc[0]\n", " row3 = prow.drop(['start_hour', 'start_minute'], axis=1).copy()\n", " # prow에서 필요한 부분을 rhistory에 추가\n", " row3['end_unix'] = present_time + adder\n", " rhistory = pd.concat([rhistory, row3]).reset_index(drop=True)\n", "\n", " # 2. 시작 유닉스 타임컬럼 생성 후 종류 유닉스 타임에서 현시별 현시기간 컬럼의 합을 뺀 값으로 입력\n", " # - 현시시간의 합을 뺀 시간의 +- 10초 이내에 이전 주기정보가 존재하면 그 유닉스 시간을 시작 유닉스시간 값으로 하고, 존재하지 않으면 현시시간의 합을 뺀 유닉스 시간을 시작 유닉스 시간으로 지정\n", " for i, row in rhistory.iterrows():\n", " inter_no = row.inter_no\n", " end_unix = row.end_unix\n", " elapsed_time = row[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]].sum() // 2 # 현시시간 합\n", " # 이전 유닉스 존재하지 않음 : 현시시간 합의 차\n", " start_unix = end_unix - elapsed_time\n", " pre_rows = history[:i] # previous rows\n", " if inter_no in pre_rows.inter_no.unique(): # 이전 유닉스 존재\n", " pre_unix = pre_rows[pre_rows.inter_no == inter_no]['end_unix'].iloc[-1] # previous unix time\n", " # 이전 유닉스 존재, abs < 10 : 이전 유닉스\n", " if abs(pre_unix - start_unix) < 10:\n", " start_unix = pre_unix\n", " # 이전 유닉스 존재, abs >=10 : 현시시간 합의 차\n", " else:\n", " pass\n", " rhistory.loc[i, 'start_unix'] = start_unix \n", " rhistory[rhistory.isna()] = 0\n", " rhistory['start_unix'] = rhistory['start_unix'].astype(int)\n", " rhistory = rhistory[['inter_no', 'start_unix'] + [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)] + ['cycle']]\n", " return rhistory\n", "adder = 600" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "def processing(plan, rhistory, timetable, hours):\n", " rhists = []\n", " for inter_no in sorted(rhistory.inter_no.unique()):\n", " rhist = rhistory.copy()[rhistory.inter_no==inter_no]\n", " rhist = rhist.drop_duplicates(subset=['start_unix']).reset_index(drop=True)\n", "\n", " # D_n 및 S_n 값 정의\n", " rhist['D_n'] = 0 # D_n : 시간차이\n", " rhist['S_n'] = 0 # S_n : 현시시간합\n", " for n in range(len(rhist)):\n", " curr_unix = rhist.iloc[n].start_unix # current start_unix\n", " rhist.loc[n, ['D_n', 'S_n']] = calculate_DS(rhist, curr_unix, hours, timetable)\n", "\n", " # 이전시각, 현재시각\n", " prev_unix = rhist.loc[0, 'start_unix'] # previous start_unix\n", " curr_unix = rhist.loc[1, 'start_unix'] # current start_unix\n", "\n", " # rhist의 마지막 행에 도달할 때까지 반복\n", " while True:\n", " n = rhist[rhist.start_unix==curr_unix].index[0]\n", " cycle = rhist.loc[n, 'cycle']\n", " D_n = rhist.loc[n, 'D_n']\n", " S_n = rhist.loc[n, 'S_n']\n", " # 참값인 경우\n", " if (abs(D_n - S_n) <= 5):\n", " pass\n", " # 참값이 아닌 경우\n", " else:\n", " # 2-1-1. 결측치 처리 : 인접한 두 start_unix의 차이가 계획된 주기의 두 배보다 크면 결측이 일어났다고 판단, 신호계획의 현시시간으로 \"대체\"\n", " if curr_unix - prev_unix >= 2 * cycle:\n", " # prev_unix를 계획된 주기만큼 늘려가면서 한 행씩 채워나간다.\n", " # (curr_unix와의 차이가 계획된 주기보다 작거나 같아질 때까지)\n", " while curr_unix - prev_unix > cycle:\n", " prev_unix += cycle\n", " # 신호 계획(prow) 불러오기\n", " start_seconds = np.array(timetable.start_seconds)\n", " idx = (start_seconds <= prev_unix).sum() - 1\n", " start_hour = timetable.iloc[idx].start_hour\n", " start_minute = timetable.iloc[idx].start_minute\n", " prow = plan.copy()[(plan.inter_no==inter_no) & (plan.start_hour==start_hour) & (plan.start_minute==start_minute)] # planned row\n", " # prow에서 필요한 부분을 rhist에 추가\n", " prow['start_unix'] = prev_unix\n", " prow = prow.drop(['start_hour', 'start_minute', 'offset'], axis=1)\n", " cycle = prow.iloc[0].cycle\n", " rhist = pd.concat([rhist, prow])\n", " rhist = rhist.sort_values(by='start_unix').reset_index(drop=True)\n", " n += 1\n", "\n", " # 2-1-2. 이상치 처리 : 비율에 따라 해당 행을 \"삭제\"(R_n <= 0.5) 또는 \"조정\"(R_n > 0.5)한다\n", " R_n = (curr_unix - prev_unix) / cycle # R_n : 비율\n", " # R_n이 0.5보다 작거나 같으면 해당 행을 삭제\n", " if R_n <= 0.5:\n", " rhist = rhist.drop(index=n).reset_index(drop=True)\n", " if n >= rhist.index[-1]:\n", " break\n", " # 행삭제에 따른 curr_unix, R_n 재정의\n", " curr_unix = rhist.loc[n, 'start_unix']\n", " R_n = (curr_unix - prev_unix) / cycle # R_n : 비율\n", "\n", " # R_n이 0.5보다 크면 해당 행 조정 (비율을 유지한 채로 현시시간 대체)\n", " if R_n > 0.5:\n", " # 신호 계획(prow) 불러오기\n", " start_seconds = np.array(timetable.start_seconds)\n", " idx = (start_seconds <= curr_unix).sum() - 1\n", " start_hour = timetable.iloc[idx].start_hour\n", " start_minute = timetable.iloc[idx].start_minute\n", " prow = plan[(plan.inter_no==inter_no) & (plan.start_hour==start_hour) & (plan.start_minute==start_minute)] # planned row\n", " # 조정된 현시시간 (prow에 R_n을 곱하고 정수로 바꿈)\n", " adjusted_dur = prow.copy()[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] * R_n\n", " int_parts = adjusted_dur.iloc[0].apply(lambda x: int(x))\n", " frac_parts = adjusted_dur.iloc[0] - int_parts\n", " difference = round(adjusted_dur.iloc[0].sum()) - int_parts.sum()\n", " for _ in range(difference): # 소수 부분이 가장 큰 상위 'difference'개의 값에 대해 올림 처리\n", " max_frac_index = frac_parts.idxmax()\n", " int_parts[max_frac_index] += 1\n", " frac_parts[max_frac_index] = 0 # 이미 처리된 항목은 0으로 설정\n", " # rhist에 조정된 현시시간을 반영\n", " rhist.loc[n, [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] = int_parts.values\n", " rhist.loc[n, 'cycle'] = int_parts.sum().sum() // 2\n", "\n", " if n >= rhist.index[-1]:\n", " break\n", " prev_unix = curr_unix\n", " curr_unix = rhist.loc[n+1, 'start_unix']\n", "\n", " # 생략해도 무방할 코드\n", " rhist = rhist.reset_index(drop=True)\n", " rhist = rhist.sort_values(by=['start_unix'])\n", "\n", " # D_n 및 S_n 값 재정의\n", " for n in range(len(rhist)):\n", " curr_unix = rhist.iloc[n].start_unix # current start_unix\n", " rhist.loc[n, ['D_n', 'S_n']] = calculate_DS(rhist, curr_unix, hours, timetable)\n", " rhists.append(rhist)\n", " rhists = pd.concat(rhists).sort_values(by=['start_unix','inter_no'])\n", " rhists = rhists[rhists.start_unix >= present_time - 3600]\n", " rhists = rhists.drop(columns=['D_n', 'S_n'])\n", " return rhists" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "def make_hrhists(rhists, isplits, timetable):\n", " # 계층화된 형태로 변환\n", " hrhists = [] # hierarchied recent history\n", " for i, row in rhists.iterrows():\n", " inter_no = row.inter_no\n", " start_unix = row.start_unix\n", "\n", " ind = (timetable['start_seconds'] <= row.start_unix).sum() - 1\n", " start_hour = timetable.iloc[ind].start_hour\n", " start_minute = timetable.iloc[ind].start_minute\n", " isplit = isplits[(inter_no, start_hour, start_minute)]\n", " phas_As = [isplit[j][0] for j in isplit.keys()]\n", " phas_Bs = [isplit[j][1] for j in isplit.keys()]\n", " durs_A = row[[f'dura_A{j}' for j in range(1,9)]]\n", " durs_B = row[[f'dura_B{j}' for j in range(1,9)]]\n", " durations = []\n", " for j in range(1, len(isplit)+1):\n", " ja = isplit[j][0]\n", " jb = isplit[j][1]\n", " if ja == jb:\n", " durations.append(min(durs_A[ja-1], durs_B[jb-1]))\n", " else:\n", " durations.append(abs(durs_A[ja-1] - durs_B[ja-1]))\n", " new_rows = pd.DataFrame({'inter_no':[inter_no] * len(durations), 'start_unix':[start_unix] * len(durations),\n", " 'phas_A':phas_As, 'phas_B':phas_Bs, 'duration':durations})\n", " hrhists.append(new_rows)\n", " hrhists = pd.concat(hrhists)\n", " hrhists = hrhists.sort_values(by = ['start_unix', 'inter_no', 'phas_A', 'phas_B']).reset_index(drop=True)\n", " return hrhists" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "def update_movement(hrhists, movement, movements):\n", " # 중복을 제거하고 (inter_no, start_unix) 쌍을 만듭니다.\n", " hrhists_inter_unix = set(hrhists[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None))\n", " movement_inter_unix = set(movement[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None))\n", "\n", " # hrhists에는 있지만 movement에는 없는 (inter_no, start_unix) 쌍을 찾습니다.\n", " missing_in_movement = hrhists_inter_unix - movement_inter_unix\n", "\n", " # 새로운 행들을 생성합니다.\n", " new_rows = []\n", " if missing_in_movement:\n", " for inter_no, start_unix in missing_in_movement:\n", " # movements에서 해당 inter_no의 데이터를 찾습니다.\n", " new_row = movements[movements['inter_no'] == inter_no].copy()\n", " # start_unix 값을 설정합니다.\n", " new_row['start_unix'] = start_unix\n", " new_rows.append(new_row)\n", "\n", " # 새로운 데이터프레임을 생성하고 기존 movement 데이터프레임과 합칩니다.\n", " new_movement = pd.concat(new_rows, ignore_index=True)\n", " movement_updated = pd.concat([movement, new_movement], ignore_index=True)\n", " else:\n", " movement_updated = movement\n", " return movement_updated" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "def make_histid(present_time, hrhists, movement_updated, inter2node, matching):\n", " # movements and durations\n", " movedur = pd.merge(hrhists, movement_updated, how='inner', on=['inter_no', 'start_unix', 'phas_A', 'phas_B'])\n", " movedur = movedur.sort_values(by=['start_unix', 'inter_no', 'phas_A','phas_B'])\n", " movedur = movedur[['inter_no', 'start_unix', 'phas_A', 'phas_B', 'move_A', 'move_B', 'duration']]\n", "\n", " # 이동류 매칭 테이블에서 진입id, 진출id를 가져와서 붙임.\n", " for i, row in movedur.iterrows():\n", " inter_no = row.inter_no\n", " start_unix = row.start_unix\n", " # incoming and outgoing edges A\n", " move_A = row.move_A\n", " if move_A in [17, 18]:\n", " inc_edge_A = np.nan\n", " outhedge_A = np.nan\n", " else:\n", " match_A = matching[(matching.inter_no == inter_no) & (matching.move_no == move_A)].iloc[0]\n", " inc_edge_A = match_A.inc_edge\n", " out_edge_A = match_A.out_edge\n", " movedur.loc[i, ['inc_edge_A', 'out_edge_A']] = [inc_edge_A, out_edge_A]\n", " # incoming and outgoing edges B\n", " move_B = row.move_B\n", " if move_B in [17, 18]:\n", " inc_edge_B = np.nan\n", " out_edge_B = np.nan\n", " else:\n", " match_B = matching[(matching.inter_no == inter_no) & (matching.move_no == move_B)].iloc[0]\n", " inc_edge_B = match_B.inc_edge\n", " out_edge_B = match_B.out_edge\n", " movedur.loc[i, ['inc_edge_B', 'out_edge_B']] = [inc_edge_B, out_edge_B]\n", "\n", " # 이동류 컬럼 제거\n", " movedur = movedur.drop(['move_A', 'move_B'], axis=1)\n", "\n", " histid = movedur.copy() # history with edge ids (incoming and outgoing edge ids)\n", " histid['node_id'] = histid['inter_no'].map(inter2node)\n", " histid = histid[['inter_no', 'node_id', 'start_unix', 'phas_A', 'phas_B', 'duration', 'inc_edge_A', 'out_edge_A', 'inc_edge_B', 'out_edge_B']]\n", " histid_start = present_time - 600\n", " histid = histid[histid.start_unix > histid_start]\n", " return histid" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "def preprocess(m):\n", " '''\n", " 통합테이블(histid)를 만드는 함수\n", "\n", " input : m\n", " - m ranges from 0 to 287, but 0 makes an error where 288 = 86400//300\n", " - present_time = fmins[m] : 현재시점\n", "\n", " output : histid (통합테이블, HISTory with edge_IDs)\n", " - 컬럼 : inter_no, node_id, start_unix, phas_A, phas_B, duration, inc_edge_A, out_edge_A, inc_edge_B, out_edge_B\n", "\n", " 주요 데이터, 중간산출물 및 결과물 :\n", " # 데이터\n", " - history : 신호이력 (inter_no, end_unix, dura_Aj, dura_Bj, cycle, offset)\n", " - plan : 신호계획 (inter_no, start_hour, start_minute, dura_Aj, dura_Bj cycle, offset)\n", " # 중간산출물\n", " - rhists (recent history)\n", " - history에서 현재 시각 이전의 데이터를 가져옴.\n", " - end_unix를 start_unix로 변환\n", " - 참값판단 프로세스(결측·이상치 처리)\n", " - 컬럼 : inter_no, start_unix, dura_Aj, dura_Bj, cycle\n", " - hrhists (hierarchized recent history)\n", " - rhists를 계층화\n", " - 컬럼 : inter_no, start_unix, phas_A, phas_B, duration\n", " - movements\n", " - 각 교차로에 대하여 현시별로 이동류를 정해놓음.\n", " - join시 사용하기 위함.\n", " - 한 번 만들어놓고 두고두고 사용함.\n", " - 컬럼 : inter_no, phas_A, phas_B, move_A, move_B\n", " - movement\n", " - 현재 시점에서의 이동류정보\n", " - 컬럼 : inter_no, phas_A, phas_B, move_A, move_B, start_unix\n", " - movement_updated\n", " - movement와 hrhists를 join하기 전에, movement에는 없지만 hrhists에는 있는 start_unix에 대한 이동류 정보를 가져와 movement에 붙임\n", " - 이동류정보는 앞서 정의한 movements에서 가져옴.\n", " - 컬럼 : inter_no, phas_A, phas_B, move_A, move_B, start_unix\n", " - movedur\n", " - hrhists와 movement_updated를 join\n", " - 컬럼 : inter_no, phas_A, phas_B, move_A, move_B, start_unix, duration\n", " # 결과 : histid\n", " - 신호생성에 직접적으로 사용되는 데이터프레임\n", " - 컬럼 : inter_no, node_id, start_unix, phas_A, phas_B, duration, inc_edge_A, out_edge_A, inc_edge_B, out_edge_B\n", " - 한글컬럼 : 교차로번호, 노드id, 시작유닉스, A현시번호, B현시번호, 현시시간, 진입엣지(A), 진출엣지(A), 진입엣지(B), 진출엣지(B)\n", " '''\n", " midnight = int(datetime(2024, 1, 5, 0, 0, 0).timestamp())\n", " next_day = int(datetime(2024, 1, 6, 0, 0, 0).timestamp())\n", " fmins = range(midnight, next_day, 300) # fmins : unix time by Five MINuteS\n", " # 현재시각\n", " present_time = fmins[m]\n", " print(datetime.fromtimestamp(present_time))\n", " # 사용할 표준 테이블 목록\n", " plan = pd.read_csv('../Data/tables/plan.csv', index_col=0)\n", " history = pd.read_csv('../Data/tables/history.csv', index_col=0)\n", " matching = pd.read_csv('../Intermediates/matching.csv', index_col=0)\n", " # 참고할 딕셔너리, 데이터프레임, 리스트 등 목록\n", " splits, isplits = make_splits(plan)\n", " timetable = make_timetable(plan)\n", " inter_node = pd.read_csv('../Data/tables/inter_node.csv', index_col=0)\n", " inter_node = inter_node[inter_node.inter_type=='parent']\n", " inter2node = dict(zip(inter_node['inter_no'], inter_node['node_id']))\n", " hours = np.array(range(midnight - 7200, next_day + 1, 3600)) # 정각에 해당하는 시각들 목록\n", " # rhistory, rhists, hrhists\n", " adder = 600\n", " rhistory = make_rhistory(plan, timetable, history, present_time, adder)\n", " rhists = processing(plan, rhistory, timetable, hours)\n", " hrhists = make_hrhists(rhists, isplits, timetable)\n", " # movements, movement, movement_updated\n", " movements = pd.read_csv('../Intermediates/movements.csv')\n", " movement = pd.read_csv(f'../Intermediates/movement/movement_{present_time}.csv', index_col=0)\n", " movement_updated = update_movement(hrhists, movement, movements)\n", " # movedur\n", " movedur = pd.merge(movement_updated, hrhists, how='inner', on=['inter_no', 'start_unix', 'phas_A', 'phas_B']) # movements and durations\n", " movedur = movedur.sort_values(by=['start_unix', 'inter_no', 'phas_A','phas_B'])\n", " movedur = movedur[['inter_no', 'start_unix', 'phas_A', 'phas_B', 'move_A', 'move_B', 'duration']]\n", " # histid\n", " histid = make_histid(present_time, hrhists, movement_updated, inter2node, matching)\n", " histid.to_csv(f'../Intermediates/histid/histid_{fmins[m]}.csv')\n", " return histid" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2024-01-05 08:45:00\n", "2024-01-05 08:50:00\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
inter_nonode_idstart_unixphas_Aphas_Bdurationinc_edge_Aout_edge_Ainc_edge_Bout_edge_B
655202i917044116101146571510152_02-571510152_01571510152_01571510152_01.65
656202i9170441161022114NaN-571510152_01NaNNaN
657175i017044116291140-571542797_02571500487_01-571500487_01571542797_02
658175i017044116292242-571500487_01571545870_01-571542797_02571510153_01
659175i017044116293329571545870_02571510153_01571545870_02571542797_02
.................................
871201i817044126405517571500583_01571500617_01571500583_01571500569_01
872206i717044126601125-571511538_02571542073_02571542073_01571511538_02
873206i717044126602225NaN571542073_02NaNNaN
874206i717044126603315-571511538_02571542073_02571542073_01571511538_02
875206i717044126604415NaN571542073_02NaNNaN
\n", "

221 rows × 10 columns

\n", "
" ], "text/plain": [ " inter_no node_id start_unix phas_A phas_B duration inc_edge_A \\\n", "655 202 i9 1704411610 1 1 46 571510152_02 \n", "656 202 i9 1704411610 2 2 114 NaN \n", "657 175 i0 1704411629 1 1 40 -571542797_02 \n", "658 175 i0 1704411629 2 2 42 -571500487_01 \n", "659 175 i0 1704411629 3 3 29 571545870_02 \n", ".. ... ... ... ... ... ... ... \n", "871 201 i8 1704412640 5 5 17 571500583_01 \n", "872 206 i7 1704412660 1 1 25 -571511538_02 \n", "873 206 i7 1704412660 2 2 25 NaN \n", "874 206 i7 1704412660 3 3 15 -571511538_02 \n", "875 206 i7 1704412660 4 4 15 NaN \n", "\n", " out_edge_A inc_edge_B out_edge_B \n", "655 -571510152_01 571510152_01 571510152_01.65 \n", "656 -571510152_01 NaN NaN \n", "657 571500487_01 -571500487_01 571542797_02 \n", "658 571545870_01 -571542797_02 571510153_01 \n", "659 571510153_01 571545870_02 571542797_02 \n", ".. ... ... ... \n", "871 571500617_01 571500583_01 571500569_01 \n", "872 571542073_02 571542073_01 571511538_02 \n", "873 571542073_02 NaN NaN \n", "874 571542073_02 571542073_01 571511538_02 \n", "875 571542073_02 NaN NaN \n", "\n", "[221 rows x 10 columns]" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "preprocess(105)\n", "preprocess(106)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "# for m in range(30, 288):\n", "# print(m)\n", "# histid = preprocess(m)" ] } ], "metadata": { "kernelspec": { "display_name": "rts", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 2 }