{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import os\n", "import sumolib\n", "import random\n", "from tqdm import tqdm\n", "from datetime import datetime" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# A. 이동류 매칭" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "def make_match1():\n", " '''\n", " 신호 DB에는 매 초마다 이동류정보가 업데이트 된다. 그리고 이 이동류정보를 매 5초마다 불러와서 사용하게 된다.\n", " '../Data/tables/move/'에는 5초마다의 이동류정보가 저장되어 있다.\n", "\n", " return : 통합된 이동류정보\n", " - 모든 inter_no(교차로번호)에 대한 A, B링 현시별 이동류정보\n", "\n", " match1을 만드는 데 시간이 소요되므로 한 번 만들어서 저장해두고 저장해둔 것을 쓴다.\n", " '''\n", " # [이동류번호] 불러오기 (약 1분의 소요시간)\n", " path_move = '../Data/tables/move/'\n", " csv_moves = os.listdir(path_move)\n", " moves = [pd.read_csv(path_move + csv_move, index_col=0) for csv_move in tqdm(csv_moves)]\n", " match1 = pd.concat(moves).drop_duplicates().sort_values(by=['inter_no','phas_A','phas_B']).reset_index(drop=True)\n", " match1.to_csv('../Intermediates/match1.csv')\n", " return match1" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "def make_match2(match1):\n", " '''\n", " match1을 계층화함.\n", " - match1의 컬럼 : inter_no, phas_A, phas_B, move_A, move_B\n", " - match2의 컬럼 : inter_no, phase_no, ring_type, move_no\n", " '''\n", " # 계층화 (inter_no, phas_A, phas_B, move_A, move_B) -> ('inter_no', 'phase_no', 'ring_type', 'move_no')\n", " matchA = match1[['inter_no', 'phas_A', 'move_A']].copy()\n", " matchA.columns = ['inter_no', 'phase_no', 'move_no']\n", " matchA['ring_type'] = 'A'\n", " matchB = match1[['inter_no', 'phas_B', 'move_B']].copy()\n", " matchB.columns = ['inter_no', 'phase_no', 'move_no']\n", " matchB['ring_type'] = 'B'\n", " match2 = pd.concat([matchA, matchB]).drop_duplicates()\n", " match2 = match2[['inter_no', 'phase_no', 'ring_type', 'move_no']]\n", " match2 = match2.sort_values(by=list(match2.columns))\n", " return match2" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "def make_match3(match2):\n", " '''\n", " 각 movement들에 방향(진입방향, 진출방향)을 매칭시켜 추가함.\n", " - match2의 컬럼 : inter_no, phase_no, ring_type, move_no\n", " - match3의 컬럼 : inter_no, phase_no, ring_type, move_no, inc_dir, out_dir\n", "\n", " nema : \n", " - 컬럼 : move_no, inc_dir, out_dir\n", " - 모든 종류의 이동류번호에 대하여 진입방향과 진출방향을 매칭시키는 테이블\n", " - 이동류번호 : 1 ~ 16, 17, 18, 21\n", " - 진입, 진출방향(8방위) : 동, 서, 남, 북, 북동, 북서, 남동, 남서\n", " '''\n", " # nema 정보 불러오기 및 병합\n", " nema = pd.read_csv('../Data/tables/nema.csv', encoding='cp949')\n", " match3 = pd.merge(match2, nema, how='left', on='move_no').drop_duplicates()\n", " return match3" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "def make_match4(match3):\n", " '''\n", " 방위각 정보를 매칭시켜 추가함.\n", " - match3의 컬럼 : inter_no, phase_no, ring_type, move_no, inc_dir, out_dir\n", " - match4의 컬럼 : inter_no, phase_no, ring_type, move_no, inc_dir, out_dir, inc_angle, out_angle\n", "\n", " angle_original : \n", " - 컬럼 : inter_no, angle_Aj, angle_Bj (j : 1 ~ 8)\n", " - 모든 종류의 이동류번호에 대하여 진입방향과 진출방향을 매칭시키는 테이블\n", " - 이동류번호 : 1 ~ 16, 17, 18, 21\n", " - 진입, 진출방향(8방위) : 동, 서, 남, 북, 북동, 북서, 남동, 남서\n", " '''\n", "\n", " # 방위각 정보 불러오기\n", " dtype_dict = {f'angle_{alph}{j}':'str' for alph in ['A', 'B'] for j in range(1,9)}\n", " angle_original = pd.read_csv('../Data/tables/angle.csv', index_col=0, dtype = dtype_dict)\n", "\n", " # 계층화\n", " angle = []\n", " for i, row in angle_original.iterrows():\n", " angle_codes = row[[f'angle_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]]\n", " new = pd.DataFrame({'inter_no':[row.inter_no] * 16, 'phase_no':list(range(1, 9))*2, 'ring_type':['A'] * 8 + ['B'] * 8, 'angle_code':angle_codes.to_list()})\n", " angle.append(new)\n", " angle = pd.concat(angle)\n", " angle = angle.dropna().reset_index(drop=True)\n", "\n", " # 병합\n", " six_chars = angle.angle_code.apply(lambda x:len(x)==6)\n", " angle.loc[six_chars,'inc_angle'] = angle.angle_code.apply(lambda x:x[:3])\n", " angle.loc[six_chars,'out_angle'] = angle.angle_code.apply(lambda x:x[3:])\n", " angle = angle.drop('angle_code', axis=1)\n", " match4 = pd.merge(match3, angle, how='left', left_on=['inter_no', 'phase_no', 'ring_type'],\n", " right_on=['inter_no', 'phase_no', 'ring_type']).drop_duplicates()\n", " return match4" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "def make_match5(match4):\n", " '''\n", " 진입엣지id, 진출엣지id, 노드id를 추가함 (주교차로).\n", " - match4의 컬럼 : inter_no, phase_no, ring_type, move_no, inc_dir, out_dir, inc_angle, out_angle\n", " - match5의 컬럼 : inter_no, phase_no, ring_type, move_no, inc_dir, out_dir, inc_angle, out_angle, inc_edge, out_edge, node_id\n", " \n", " 사용된 데이터 : \n", " (1) net\n", " - 성남시 정자동 부근의 샘플 네트워크\n", " (2) inter_node\n", " - 교차로번호와 노드id를 매칭시키는 테이블.\n", " - parent/child 정보도 포함되어 있음\n", " - 컬럼 : inter_no, node_id, inter_type\n", " (3) inter_info\n", " - 교차로 정보. 여기에서는 위도와 경도가 쓰임.\n", " - 컬럼 : inter_no, inter_name, inter_lat, inter_lon, group_no, main_phase_no\n", "\n", " 진입엣지id, 진출엣지id를 얻는 과정 :\n", " - match5 = match4.copy()의 각 열을 순회하면서 아래 과정을 반복함.\n", " * 진입에 대해서만 서술하겠지만 진출도 마찬가지로 설명될 수 있음\n", " - 해당 행의 교차로정보로부터 노드ID를 얻어내고, 해당 노드에 대한 모든 진출엣지id를 inc_edges에 저장.\n", " * inc_edge(진입엣지) : incoming edge, out_edge(진출엣지) : outgoing_edge\n", " - inc_edges의 모든 진입엣지에 대하여 진입방향(inc_dires, 2차원 단위벡터)을 얻어냄.\n", " - 해당 행의 진입각으로부터 그에 대응되는 진입각방향(단위벡터)를 얻어냄.\n", " - 주어진 진입각방향에 대하여 내적이 가장 작은 진입방향에 대한 진입엣지를 inc_edge_id로 지정함.\n", " '''\n", "\n", " # 네트워크 불러오기 \n", " net = sumolib.net.readNet('../Data/networks/sn.net.xml')\n", " # 교차로-노드 매칭 정보 불러오기\n", " inter_node = pd.read_csv('../Data/tables/inter_node.csv', index_col=0)\n", " # 교차로정보(위, 경도) 불러오기\n", " inter_info = pd.read_csv('../Data/tables/inter_info.csv', index_col=0)\n", "\n", " # parent node만 가져옴.\n", " inter_node1 = inter_node[inter_node.inter_type == 'parent'].drop('inter_type', axis=1)\n", " inter_info1 = inter_info[['inter_no', 'inter_lat', 'inter_lon']]\n", " inter = pd.merge(inter_node1, inter_info1, how='left', left_on=['inter_no'],\n", " right_on=['inter_no']).drop_duplicates()\n", "\n", " inter2node = dict(zip(inter['inter_no'], inter['node_id']))\n", "\n", " match5 = match4.copy()\n", " # 진입진출ID 매칭\n", " for index, row in match5.iterrows():\n", " node_id = inter2node[row.inter_no]\n", " node = net.getNode(node_id)\n", " # 교차로의 모든 (from / to) edges\n", " inc_edges = [edge for edge in node.getIncoming() if edge.getFunction() == ''] # incoming edges\n", " out_edges = [edge for edge in node.getOutgoing() if edge.getFunction() == ''] # outgoing edges\n", " # 교차로의 모든 (from / to) directions\n", " inc_dirs = []\n", " for inc_edge in inc_edges:\n", " start = inc_edge.getShape()[-2]\n", " end = inc_edge.getShape()[-1]\n", " inc_dir = np.array(end) - np.array(start)\n", " inc_dir = inc_dir / (inc_dir ** 2).sum() ** 0.5\n", " inc_dirs.append(inc_dir)\n", " out_dirs = []\n", " for out_edge in out_edges:\n", " start = out_edge.getShape()[0]\n", " end = out_edge.getShape()[1]\n", " out_dir = np.array(end) - np.array(start)\n", " out_dir = out_dir / (out_dir ** 2).sum() ** 0.5\n", " out_dirs.append(out_dir)\n", " # 진입각, 진출각 불러오기\n", " if not pd.isna(row.inc_angle):\n", " inc_angle = int(row.inc_angle)\n", " out_angle = int(row.out_angle)\n", " # 방위각을 일반각으로 가공, 라디안 변환, 단위벡터로 변환\n", " inc_angle = (-90 - inc_angle) % 360\n", " inc_angle = inc_angle * np.pi / 180.\n", " inc_dir_true = np.array([np.cos(inc_angle), np.sin(inc_angle)])\n", " out_angle = (90 - out_angle) % 360\n", " out_angle = out_angle * np.pi / 180.\n", " out_dir_true = np.array([np.cos(out_angle), np.sin(out_angle)])\n", " # 매칭 엣지 반환\n", " inc_index = np.array([np.dot(inc_dir, inc_dir_true) for inc_dir in inc_dirs]).argmax()\n", " out_index = np.array([np.dot(out_dir, out_dir_true) for out_dir in out_dirs]).argmax()\n", " inc_edge_id = inc_edges[inc_index].getID()\n", " out_edge_id = out_edges[out_index].getID()\n", " match5.at[index, 'inc_edge'] = inc_edge_id\n", " match5.at[index, 'out_edge'] = out_edge_id\n", " match5['node_id'] = match5['inter_no'].map(inter2node)\n", " match5 = match5.sort_values(by=['inter_no','phase_no','ring_type']).reset_index(drop=True)\n", " return match5" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def make_match6(match5):\n", " '''\n", " 진입엣지id, 진출엣지id, 노드id를 추가함 (부교차로).\n", " - match6의 컬럼 : inter_no, phase_no, ring_type, move_no, inc_dir, out_dir, inc_angle, out_angle, inc_edge, out_edge, node_id\n", " \n", " 사용된 데이터 : \n", " (1) inter_node\n", " - 교차로번호와 노드id를 매칭시키는 테이블.\n", " - parent/child 정보도 포함되어 있음\n", " - 컬럼 : inter_no, node_id, inter_type\n", " (2) uturn (유턴정보)\n", " - 컬럼 : parent_id, child_id, direction, condition, inc_edge, out_edge\n", " - parent_id, child_id : 주교차로id, 유턴교차로id\n", " - direction : 주교차로에 대한 유턴노드의 상대적인 위치(방향)\n", " - condition : 좌회전시, 직진시, 직좌시, 보행신호시 중 하나\n", " - inc_edge, out_edge : 유턴에 대한 진입진출엣지\n", " (3) coord (연동교차로정보)\n", " - 컬럼 : parent_id, child_id, phase_no, ring_type, inc_edge, out_edge\n", " - parent_id, child_id : 주교차로id, 연동교차로id\n", " - 나머지 컬럼 : 각 (현시, 링)별 진입진출엣지\n", "\n", " 설명 :\n", " - match5는 주교차로에 대해서만 진입엣지id, 진출엣지id, 노드id를 추가했었음.\n", " 여기에서 uturn, coord를 사용해서 부교차로들(유턴교차로, 연동교차로)에 대해서도 해당 값들을 부여함.\n", " 유턴교차로 :\n", " - directions를 정북기준 시계방향의 8방위로 정함.\n", " - 이를 통해 진입방향이 주어진 경우에 좌회전, 직진, 보행 등에 대한 (진입방향, 진출방향)을 얻어낼 수 있음.\n", " - 예) 진입방향(direction)이 '북'일 때, \n", " - 직진 : (북, 남)\n", " * 남 : directions[(ind + 4) % len(directions)]\n", " - 좌회전 : (북, 동)\n", " * 동 : directions[(ind + 2) % len(directions)]\n", " - 보행 : (서, 동)\n", " * 서 : directions[(ind - 2) % len(directions)]\n", " - uturn의 각 행을 순회하면서 아래 과정을 반복함\n", " - match5에서 parent_id에 해당하는 행들을 가져옴(cmatch).\n", " - condition 별로 진입방향, 진출방향A, 진출방향B 정함.\n", " - 상술한 directions를 활용하여 정함.\n", " - (진입방향, 진출방향A, 진출방향B)을 고려하여 (현시, 링) 별로 진입엣지id, 진출엣지id를 정함.\n", " - ex) cmatch.loc[(cmatch.inc_dir==inc_dire) & (cmatch.out_dir==out_dire_A), ['inc_edge', 'out_edge']] = [inc_edge_id, out_edge_id]\n", " - 순회하면서 만든 cmatch를 cmatchs라는 리스트에 저장함.\n", "\n", " 연동교차로 :\n", " - 연동교차로의 경우 coord에 (현시, 링)별 진입엣지ID, 진출엣지ID가 명시되어 있음.\n", " - 'inc_dir', 'out_dir', 'inc_angle','out_angle'와 같은 열들은 np.nan을 지정해놓음.\n", " - 이 열들은, 사실상 다음 스텝부터는 사용되지 않는 열들이기 때문에 np.nan으로 지정해놓아도 문제없음.\n", "\n", " match6 :\n", " - 이렇게 얻은 match5, cmatchs, coord를 모두 pd.concat하여 match6을 얻어냄.\n", " '''\n", "\n", " inter_node = pd.read_csv('../Data/tables/inter_node.csv', index_col=0)\n", " node2inter = dict(zip(inter_node['node_id'], inter_node['inter_no']))\n", "\n", " uturn = pd.read_csv('../Data/tables/child_uturn.csv')\n", " coord = pd.read_csv('../Data/tables/child_coord.csv')\n", " child_ids = inter_node[inter_node.inter_type=='child'].node_id.unique()\n", " ch2pa = {} # child to parent\n", " for child_id in child_ids:\n", " parent_no = inter_node[inter_node.node_id==child_id].inter_no.iloc[0]\n", " sub_inter_node = inter_node[inter_node.inter_no==parent_no]\n", " ch2pa[child_id] = sub_inter_node[sub_inter_node.inter_type=='parent'].iloc[0].node_id\n", " directions = ['북', '북동', '동', '남동', '남', '남서', '서', '북서'] # 정북기준 시계방향으로 8방향\n", "\n", " # 각 uturn node에 대하여 (inc_edge_id, out_edge_id) 부여\n", " cmatches = []\n", " for _, row in uturn.iterrows():\n", " child_id = row.child_id\n", " parent_id = row.parent_id\n", " direction = row.direction\n", " condition = row.condition\n", " inc_edge_id = row.inc_edge\n", " out_edge_id = row.out_edge\n", " # match5에서 parent_id에 해당하는 행들을 가져옴\n", " cmatch = match5.copy()[match5.node_id==parent_id] # match dataframe for a child node\n", " cmatch = cmatch.sort_values(by=['phase_no', 'ring_type']).reset_index(drop=True)\n", " cmatch['node_id'] = child_id\n", " cmatch[['inc_edge', 'out_edge']] = np.nan\n", "\n", " # condition 별로 inc_dire, out_dire_A, out_dire_B를 정함\n", " ind = directions.index(direction)\n", " if condition == \"좌회전시\":\n", " inc_dire = direction\n", " out_dire_A = out_dire_B = directions[(ind + 2) % len(directions)]\n", " elif condition == \"직진시\":\n", " inc_dire = direction\n", " out_dire_A = out_dire_B = directions[(ind + 4) % len(directions)]\n", " elif condition == \"보행신호시\":\n", " inc_dire = directions[(ind + 2) % len(directions)]\n", " out_dire_A = directions[(ind - 2) % len(directions)]\n", " out_dire_B = directions[(ind - 2) % len(directions)]\n", "\n", " # (inc_dire, out_dire_A, out_dire_B) 별로 inc_edge_id, out_edge_id를 정함\n", " if condition == '보행신호시':\n", " cmatch.loc[(cmatch.inc_dir==inc_dire) & (cmatch.out_dir==out_dire_A), ['inc_edge', 'out_edge']] = [inc_edge_id, out_edge_id]\n", " cmatch.loc[(cmatch.inc_dir==inc_dire) & (cmatch.out_dir==out_dire_B), ['inc_edge', 'out_edge']] = [inc_edge_id, out_edge_id]\n", " # 이동류번호가 17(보행신호)이면서 유턴노드방향으로 가는 신호가 없으면 (inc_edge_id, out_edge_id)를 부여한다.\n", " cmatch.loc[(cmatch.move_no==17) & (cmatch.out_dir!=direction), ['inc_edge', 'out_edge']] = [inc_edge_id, out_edge_id]\n", " else: # '직진시', '좌회전시'\n", " cmatch.loc[(cmatch.inc_dir==inc_dire) & (cmatch.out_dir==out_dire_A), ['inc_edge', 'out_edge']] = [inc_edge_id, out_edge_id]\n", " cmatch.loc[(cmatch.inc_dir==inc_dire) & (cmatch.out_dir==out_dire_B), ['inc_edge', 'out_edge']] = [inc_edge_id, out_edge_id]\n", " # 유턴신호의 이동류번호를 19로 부여한다.\n", " cmatch.loc[(cmatch.inc_dir==inc_dire) & (cmatch.out_dir==out_dire_A), 'move_no'] = 19\n", " cmatch.loc[(cmatch.inc_dir==inc_dire) & (cmatch.out_dir==out_dire_B), 'move_no'] = 19\n", " cmatches.append(cmatch)\n", "\n", " # 각 coordination node에 대하여 (inc_edge_id, out_edge_id) 부여\n", " coord['inter_no'] = coord['parent_id'].map(node2inter)\n", " coord = coord.rename(columns={'child_id':'node_id'})\n", " coord[['inc_dir', 'out_dir', 'inc_angle','out_angle']] = np.nan\n", " coord['move_no'] = 20\n", " coord = coord[['inter_no', 'phase_no', 'ring_type', 'move_no', 'inc_dir', 'out_dir', 'inc_angle','out_angle', 'inc_edge', 'out_edge', 'node_id']]\n", " \n", " # display(coord)\n", " cmatches = pd.concat(cmatches)\n", " match6 = pd.concat([match5, cmatches, coord]).drop_duplicates().sort_values(by=['inter_no', 'node_id', 'phase_no', 'ring_type'])\n", " match6.to_csv('../Intermediates/match6.csv')\n", " return match6" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def make_matching(match6):\n", " '''\n", " 이동류 매칭 : 각 교차로에 대하여, 가능한 모든 이동류 (1~18, 21)에 대한 진입·진출엣지ID를 지정한다.\n", " 모든 이동류에 대해 지정하므로, 시차제시 이전과 다른 이동류가 등장하더라도 항상 진입·진출 엣지 ID를 지정할 수 있다. \n", " - matching의 컬럼 : inter_no, move_no, inc_dir, out_dir, inc_edge, out_edge, node_id\n", " \n", " 설명 : \n", " - 필요한 리스트, 딕셔너리 등을 정의\n", " (1) 가능한 (진입방향, 진출방향) 목록 [리스트]\n", " (2) 각 교차로별 방향 목록 : pdires (possible directions) [딕셔너리]\n", " (3) 각 (교차로, 진입방향) 별 진입id 목록 : inc2id (incoming direction to incoming edge_id) [딕셔너리]\n", " (4) 각 (교차로, 진출방향) 별 진출id 목록 : out2id (outgoing direction to outgoing edge_id) [딕셔너리]\n", " (5) 각 교차로별 가능한 (진입방향, 진출방향) 목록 : pflow (possible flows) [딕셔너리]\n", " - matching은 빈 리스트로 지정.\n", " - 모든 노드id에 대하여 다음 과정을 반복\n", " - 해당 노드id에 대한 모든 가능한 (진입방향, 진출방향)에 대하여 다음 과정을 반복\n", " - (노드id, 진입방향)으로부터 진입엣지id를 얻어냄. 마찬가지로 진출엣지id도 얻어냄\n", " - 얻어낸 정보를 바탕으로 한 행(new_row)을 만들고 이것을 matching에 append\n", " '''\n", "\n", " match7 = match6.copy()\n", " match7 = match7[['inter_no', 'move_no', 'inc_dir', 'out_dir', 'inc_edge', 'out_edge', 'node_id']]\n", " inter_node = pd.read_csv('../Data/tables/inter_node.csv', index_col=0)\n", " nema = pd.read_csv('../Data/tables/nema.csv', encoding='cp949')\n", "\n", " parent_ids = sorted(inter_node[inter_node.inter_type=='parent'].node_id.unique())\n", " child_ids = sorted(inter_node[inter_node.inter_type=='child'].node_id.unique())\n", "\n", " # (1) 가능한 (진입방향, 진출방향) 목록 \n", " flows = nema.dropna().apply(lambda row: (row['inc_dir'], row['out_dir']), axis=1).tolist()\n", " # (2) 각 교차로별 방향 목록 : pdires (possible directions)\n", " pdires = {}\n", " for node_id in parent_ids:\n", " dires = match7[match7.node_id == node_id][['inc_dir','out_dir']].values.flatten()\n", " dires = {dire for dire in dires if type(dire)==str}\n", " pdires[node_id] = dires\n", " # (3) 각 (교차로, 진입방향) 별 진입id 목록 : inc2id (incoming direction to incoming edge_id)\n", " inc2id = {}\n", " for node_id in parent_ids:\n", " for inc_dir in pdires[node_id]:\n", " df = match7[(match7.node_id==node_id) & (match7.inc_dir==inc_dir)]\n", " inc2id[(node_id, inc_dir)] = df.inc_edge.iloc[0]\n", " # (4) 각 (교차로, 진출방향) 별 진출id 목록 : out2id (outgoing direction to outgoing edge_id)\n", " out2id = {}\n", " for node_id in parent_ids:\n", " for out_dir in pdires[node_id]:\n", " df = match7[(match7.node_id==node_id) & (match7.out_dir==out_dir)]\n", " out2id[(node_id, out_dir)] = df.out_edge.iloc[0]\n", " # (5) 각 교차로별 가능한 (진입방향, 진출방향) 목록 : pflow (possible flows)\n", " pflow = {}\n", " for node_id in parent_ids:\n", " pflow[node_id] = [flow for flow in flows if set(flow).issubset(pdires[node_id])]\n", " # (6) 가능한 이동류에 대하여 진입id, 진출id 배정 : matching\n", " node2inter = dict(zip(match7['node_id'], match7['inter_no']))\n", " dires_right = ['북', '서', '남', '동', '북'] # ex (북, 서), (서, 남) 등은 우회전 flow\n", " matching = []\n", " for node_id in parent_ids:\n", " inter_no = node2inter[node_id]\n", " # 좌회전과 직진(1 ~ 16)\n", " for (inc_dir, out_dir) in pflow[node_id]:\n", " move_no = nema[(nema.inc_dir==inc_dir) & (nema.out_dir==out_dir)].move_no.iloc[0]\n", " inc_edge = inc2id[(node_id, inc_dir)]\n", " out_edge = out2id[(node_id, out_dir)]\n", " new_row = pd.DataFrame({'inter_no':[inter_no], 'move_no':[move_no],\n", " 'inc_dir':[inc_dir], 'out_dir':[out_dir],\n", " 'inc_edge':[inc_edge], 'out_edge':[out_edge], 'node_id':[node_id]})\n", " matching.append(new_row)\n", " # 보행신호(17), 전적색(18)\n", " new_row = pd.DataFrame({'inter_no':[inter_no] * 2, 'move_no':[17, 18],\n", " 'inc_dir':[None]*2, 'out_dir':[None]*2,\n", " 'inc_edge':[None]*2, 'out_edge':[None]*2, 'node_id':[node_id]*2})\n", " matching.append(new_row)\n", " # 신호우회전(21)\n", " for d in range(len(dires_right)-1):\n", " inc_dir = dires_right[d]\n", " out_dir = dires_right[d+1]\n", " if {inc_dir, out_dir}.issubset(pdires[node_id]):\n", " inc_edge = inc2id[(node_id, inc_dir)]\n", " out_edge = out2id[(node_id, out_dir)]\n", " new_row = pd.DataFrame({'inter_no':[inter_no], 'move_no':[21],\n", " 'inc_dir':[inc_dir], 'out_dir':[out_dir],\n", " 'inc_edge':[inc_edge], 'out_edge':[out_edge], 'node_id':[node_id]})\n", " matching.append(new_row)\n", " matching.append(match7[match7.node_id.isin(child_ids)])\n", " matching = pd.concat(matching)\n", " matching = matching.dropna().sort_values(by=['inter_no', 'node_id', 'move_no']).reset_index(drop=True)\n", " matching['move_no'] = matching['move_no'].astype(int)\n", " matching.to_csv('../Intermediates/matching.csv')\n", " return matching" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "match1 = pd.read_csv('../Intermediates/match1.csv', index_col=0)\n", "match2 = make_match2(match1)\n", "match3 = make_match3(match2)\n", "match4 = make_match4(match3)\n", "match5 = make_match5(match4)\n", "match6 = make_match6(match5)\n", "matching = make_matching(match6)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# B. 5초 간격으로 이동류번호 수집" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "# 5초 단위로 이동류번호 저장 및 신호이력에서 유닉스시각 가져와서 표시, 한시간동안의 데이터만 보관\n", "midnight = int(datetime(2024, 1, 5, 0, 0, 0).timestamp())\n", "next_day = int(datetime(2024, 1, 6, 0, 0, 0).timestamp())\n", "fsecs = range(midnight, next_day, 5) # fsecs : unix time by Five SECondS\n", "fmins = range(midnight, next_day, 300) # fmins : unix time by Five MINuteS" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "def save_movement():\n", " # time2move = dict(zip(fsecs,moves)) # move : 어느 순간의 이동류정보\n", " history = pd.read_csv('../Data/tables/history.csv', index_col=0)\n", "\n", " time2movement = {} # movement : 어느 순간의, 그 순간으로부터 한시간 동안의 (교차로번호 + 현시별이동류번호 + 시작시간)\n", " # - 아래 절차를 5초마다 반복\n", " for fsec in tqdm(fsecs): # fsec : unix time by Five SECond\n", " # 1. 상태 테이블 조회해서 전체 데이터중 필요데이터(교차로번호, A링 현시번호, A링 이동류번호, B링 현시번호, B링 이동류번호)만 수집 : A\n", " # move = time2move[fsec]\n", " move = pd.read_csv(f'../Data/tables/move/move_{fsec}.csv', index_col=0)\n", " # 2. 이력 테이블 조회해서 교차로별로 유닉스시간 최대인 데이터(교차로변호, 종료유닉스타임)만 수집 : B\n", " recent_histories = [group.iloc[-1:] for _, group in history[history['end_unix'] < fsec].groupby('inter_no')] # 교차로별로 유닉스시간이 최대인 행들\n", " if not recent_histories:\n", " rhistory = pd.DataFrame({'inter_no':[], 'end_unix':[]}) # recent history\n", " else:\n", " rhistory = pd.concat(recent_histories)\n", " recent_unix = rhistory[['inter_no', 'end_unix']]\n", " # 3. 상태 테이블 조회정보(A)와 이력 테이블 조회정보(B) 조인(키값 : 교차로번호) : C\n", " move = pd.merge(move, recent_unix, how='left', on='inter_no')\n", " move['end_unix'] = move['end_unix'].fillna(0).astype(int)\n", " move = move.drop_duplicates()\n", " # 4. C데이터 프레임에 신규 컬럼(시작 유닉스타임) 생성 후 종료유닉스 타임 값 입력, 종료 유닉스 타임 컬럼 제거\n", " move = move.rename(columns = {'end_unix':'start_unix'})\n", " # 5. 이동류 이력정보 READ\n", " # - CSV 파일로 서버에 저장된 이동류정보를 읽어옴(파일이 없는 경우에는 데이터가 없는 프레임 D 생성)\n", " try:\n", " if isinstance(movement, pd.DataFrame): # movement가 존재할 경우 그걸 그대로 씀.\n", " pass\n", " else: \n", " movement = pd.DataFrame()\n", " except NameError: # movement가 존재하지 않는 경우 생성\n", " movement = pd.DataFrame()\n", " # 6. 이동류 이력정보 데이터테이블(D)에 C데이터 add\n", " movement = pd.concat([movement, move])\n", " # 7. D데이터 프레임에서 중복데이터 제거(교차로번호, 시작 유닉스타임, A링 현시번호, B링 현시번호 같은 행은 제거)\n", " movement = movement.drop_duplicates(['inter_no','phas_A','phas_B','start_unix'])\n", " # 8. D데이터 보관 시간 기준시간을 시작 유닉스 타임의 최대값 - 3600을 값으로 산출하고, 보관 시간 기준시간보다 작은 시작 유닉스 타임을 가진 행은 모두 제거(1시간 데이터만 보관)\n", " movement = movement[movement.start_unix > fsec - 3600]\n", " movement = movement.sort_values(by=['start_unix','inter_no','phas_A','phas_B']).reset_index(drop=True)\n", "\n", " time2movement[fsec] = movement\n", " movement.to_csv(f'../Intermediates/movement/movement_{fsec}.csv')\n", "# save_movement()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# C. 5분 간격으로 신호이력 수집 및 통합테이블 생성" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "midnight = int(datetime(2024, 1, 5, 0, 0, 0).timestamp())\n", "next_day = int(datetime(2024, 1, 6, 0, 0, 0).timestamp())\n", "fmins = range(midnight, next_day, 300) # fmins : unix time by Five MINuteS\n", "\n", "m = 105\n", "midnight = int(datetime(2024, 1, 5, 0, 0, 0).timestamp())\n", "next_day = int(datetime(2024, 1, 6, 0, 0, 0).timestamp())\n", "fmins = range(midnight, next_day, 300)\n", "\n", "# 현재시각\n", "present_time = fmins[m]" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "def make_splits(plan):\n", " # split, isplit : A,B 분리 혹은 통합시 사용될 수 있는 딕셔너리 \n", " splits = {} # splits maps (inter_no, start_hour, start_minute) to split \n", " for i, row in plan.iterrows():\n", " inter_no = row.inter_no\n", " start_hour = row.start_hour\n", " start_minute = row.start_minute\n", " cycle = row.cycle\n", " cums_A = row[[f'dura_A{j}' for j in range(1,9)]].cumsum()\n", " cums_B = row[[f'dura_B{j}' for j in range(1,9)]].cumsum()\n", " splits[(inter_no, start_hour, start_minute)] = {} # split maps (phas_A, phas_B) to k\n", " k = 0\n", " for t in range(cycle):\n", " new_phas_A = len(cums_A[cums_A < t]) + 1\n", " new_phas_B = len(cums_B[cums_B < t]) + 1\n", " if k == 0 or ((new_phas_A, new_phas_B) != (phas_A, phas_B)):\n", " k += 1\n", " phas_A = new_phas_A\n", " phas_B = new_phas_B\n", " splits[(inter_no, start_hour, start_minute)][(phas_A, phas_B)] = k\n", "\n", " isplits = {} # the inverse of splits\n", " for i in splits:\n", " isplits[i] = {splits[i][k]:k for k in splits[i]} # isplit maps k to (phas_A, phas_B)\n", " return splits, isplits\n", "\n", "def make_timetable(plan):\n", " # timetable\n", " timetable = plan[['start_hour', 'start_minute']].drop_duplicates()\n", " timetable['start_seconds'] = midnight + timetable['start_hour'] * 3600 + timetable['start_minute'] * 60\n", " return timetable\n", "\n", "# inter2node\n", "inter_node = pd.read_csv('../Data/tables/inter_node.csv', index_col=0)\n", "inter_node = inter_node[inter_node.inter_type=='parent']\n", "inter2node = dict(zip(inter_node['inter_no'], inter_node['node_id']))" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [], "source": [ "def make_rhists(present_time, timetable, plan, history):\n", " # 1. 조회시점의 유닉스 타임 이전의 신호이력 수집\n", " rhistory = history.copy() # recent history\n", " rhistory = rhistory[(rhistory.end_unix <= present_time) & (rhistory.end_unix > present_time - 9000)] # 두 시간 반 전부터 현재까지의 신호이력을 가져옴. 9000 = 3600 * 2.5\n", " # 2. 시작 유닉스 타임컬럼 생성 후 종류 유닉스 타임에서 현시별 현시기간 컬럼의 합을 뺀 값으로 입력\n", " # - 현시시간의 합을 뺀 시간의 +- 10초 이내에 이전 주기정보가 존재하면 그 유닉스 시간을 시작 유닉스시간 값으로 하고, 존재하지 않으면 현시시간의 합을 뺀 유닉스 시간을 시작 유닉스 시간으로 지정\n", " for i, row in rhistory.iterrows():\n", " inter_no = row.inter_no\n", " end_unix = row.end_unix\n", " elapsed_time = row[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]].sum() // 2 # 현시시간 합\n", " # 이전 유닉스 존재하지 않음 : 현시시간 합의 차\n", " start_unix = end_unix - elapsed_time\n", " pre_rows = history[:i] # previous rows\n", " if inter_no in pre_rows.inter_no.unique(): # 이전 유닉스 존재\n", " pre_unix = pre_rows[pre_rows.inter_no == inter_no]['end_unix'].iloc[-1] # previous unix time\n", " # 이전 유닉스 존재, abs < 10 : 이전 유닉스\n", " if abs(pre_unix - start_unix) < 10:\n", " start_unix = pre_unix\n", " # 이전 유닉스 존재, abs >=10 : 현시시간 합의 차\n", " else:\n", " pass\n", " rhistory.loc[i, 'start_unix'] = start_unix \n", " rhistory[rhistory.isna()] = 0\n", " rhistory['start_unix'] = rhistory['start_unix'].astype(int)\n", " rhistory = rhistory[['inter_no', 'start_unix'] + [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)] + ['cycle']]\n", "\n", " # 2-1. 참값 판단 프로세스\n", " hours = np.array(range(midnight, next_day + 1, 3600)) # 정각에 해당하는 시각들 목록\n", "\n", " def calculate_DS(rhist, curr_unix):\n", " if list(hours[hours <= curr_unix]):\n", " ghour_lt_curr_unix = hours[hours <= curr_unix].max() # the greatest hour less than or equal to curr_unix\n", " else:\n", " print(datetime.fromtimestamp(curr_unix), 'else')\n", " ghour_lt_curr_unix = midnight\n", " start_unixes = rhist.start_unix.unique()\n", " start_unixes_lt_ghour = np.sort(start_unixes[start_unixes < ghour_lt_curr_unix]) # start unixes less than ghour_lt_curr_unix\n", " # 기준유닉스(base_unix) : curr_unix보다 작은 hour 중에서 가장 큰 값으로부터 다섯 번째로 작은 start_unix\n", " if len(start_unixes_lt_ghour) > 5:\n", " base_unix = start_unixes_lt_ghour[-5]\n", " # start_unixes_lt_ghour의 길이가 5 미만일 경우에는 맨 앞 start_unix로 base_unix를 지정\n", " else:\n", " base_unix = rhist.start_unix.min()\n", " D_n = curr_unix - base_unix\n", " S_n_durs = rhist[(rhist.start_unix > base_unix) & (rhist.start_unix <= curr_unix)] \\\n", " [[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]]\n", " S_n = S_n_durs.values.sum() // 2\n", " return D_n, S_n\n", "\n", " rhists = []\n", " for inter_no in sorted(rhistory.inter_no.unique()):\n", " rhist = rhistory.copy()[rhistory.inter_no==inter_no]\n", " rhist = rhist.drop_duplicates(subset=['start_unix']).reset_index(drop=True)\n", "\n", " # D_n 및 S_n 값 정의\n", " rhist['D_n'] = 0 # D_n : 시간차이\n", " rhist['S_n'] = 0 # S_n : 현시시간합\n", " for n in range(len(rhist)):\n", " curr_unix = rhist.iloc[n].start_unix # current start_unix\n", " rhist.loc[n, ['D_n', 'S_n']] = calculate_DS(rhist, curr_unix)\n", "\n", " # 이전시각, 현재시각\n", " prev_unix = rhist.loc[0, 'start_unix'] # previous start_unix\n", " curr_unix = rhist.loc[1, 'start_unix'] # current start_unix\n", "\n", " # rhist의 마지막 행에 도달할 때까지 반복\n", " while True:\n", " n = rhist[rhist.start_unix==curr_unix].index[0]\n", " cycle = rhist.loc[n, 'cycle']\n", " D_n = rhist.loc[n, 'D_n']\n", " S_n = rhist.loc[n, 'S_n']\n", " # 참값인 경우\n", " if (abs(D_n - S_n) <= 5):\n", " pass\n", " # 참값이 아닌 경우\n", " else:\n", " # 2-1-1. 결측치 처리 : 인접한 두 start_unix의 차이가 계획된 주기의 두 배보다 크면 결측이 일어났다고 판단, 신호계획의 현시시간으로 \"대체\"\n", " if curr_unix - prev_unix >= 2 * cycle:\n", " # prev_unix를 계획된 주기만큼 늘려가면서 한 행씩 채워나간다.\n", " # (curr_unix와의 차이가 계획된 주기보다 작거나 같아질 때까지)\n", " while curr_unix - prev_unix > cycle:\n", " prev_unix += cycle\n", " # 신호 계획(prow) 불러오기\n", " start_seconds = np.array(timetable.start_seconds)\n", " idx = (start_seconds <= prev_unix).sum() - 1\n", " start_hour = timetable.iloc[idx].start_hour\n", " start_minute = timetable.iloc[idx].start_minute\n", " prow = plan.copy()[(plan.inter_no==inter_no) & (plan.start_hour==start_hour) & (plan.start_minute==start_minute)] # planned row\n", " # prow에서 필요한 부분을 rhist에 추가\n", " prow['start_unix'] = prev_unix\n", " prow = prow.drop(['start_hour', 'start_minute', 'offset'], axis=1)\n", " cycle = prow.iloc[0].cycle\n", " rhist = pd.concat([rhist, prow])\n", " rhist = rhist.sort_values(by='start_unix').reset_index(drop=True)\n", " n += 1\n", "\n", " # 2-1-2. 이상치 처리 : 비율에 따라 해당 행을 \"삭제\"(R_n <= 0.5) 또는 \"조정\"(R_n > 0.5)한다\n", " R_n = (curr_unix - prev_unix) / cycle # R_n : 비율\n", " # R_n이 0.5보다 작거나 같으면 해당 행을 삭제\n", " if R_n <= 0.5:\n", " rhist = rhist.drop(index=n).reset_index(drop=True)\n", " if n >= rhist.index[-1]:\n", " break\n", " # 행삭제에 따른 curr_unix, R_n 재정의\n", " curr_unix = rhist.loc[n, 'start_unix']\n", " R_n = (curr_unix - prev_unix) / cycle # R_n : 비율\n", "\n", " # R_n이 0.5보다 크면 해당 행 조정 (비율을 유지한 채로 현시시간 대체)\n", " if R_n > 0.5:\n", " # 신호 계획(prow) 불러오기\n", " start_seconds = np.array(timetable.start_seconds)\n", " idx = (start_seconds <= curr_unix).sum() - 1\n", " start_hour = timetable.iloc[idx].start_hour\n", " start_minute = timetable.iloc[idx].start_minute\n", " prow = plan[(plan.inter_no==inter_no) & (plan.start_hour==start_hour) & (plan.start_minute==start_minute)] # planned row\n", " # 조정된 현시시간 (prow에 R_n을 곱하고 정수로 바꿈)\n", " adjusted_dur = prow.copy()[[f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] * R_n\n", " int_parts = adjusted_dur.iloc[0].apply(lambda x: int(x))\n", " frac_parts = adjusted_dur.iloc[0] - int_parts\n", " difference = round(adjusted_dur.iloc[0].sum()) - int_parts.sum()\n", " for _ in range(difference): # 소수 부분이 가장 큰 상위 'difference'개의 값에 대해 올림 처리\n", " max_frac_index = frac_parts.idxmax()\n", " int_parts[max_frac_index] += 1\n", " frac_parts[max_frac_index] = 0 # 이미 처리된 항목은 0으로 설정\n", " # rhist에 조정된 현시시간을 반영\n", " rhist.loc[n, [f'dura_{alph}{j}' for alph in ['A', 'B'] for j in range(1,9)]] = int_parts.values\n", " rhist.loc[n, 'cycle'] = int_parts.sum().sum() // 2\n", "\n", " if n >= rhist.index[-1]:\n", " break\n", " prev_unix = curr_unix\n", " curr_unix = rhist.loc[n+1, 'start_unix']\n", "\n", " # 생략해도 무방할 코드\n", " rhist = rhist.reset_index(drop=True)\n", " rhist = rhist.sort_values(by=['start_unix'])\n", "\n", " # D_n 및 S_n 값 재정의\n", " for n in range(len(rhist)):\n", " curr_unix = rhist.iloc[n].start_unix # current start_unix\n", " rhist.loc[n, ['D_n', 'S_n']] = calculate_DS(rhist, curr_unix)\n", " rhists.append(rhist)\n", " rhists = pd.concat(rhists).sort_values(by=['start_unix','inter_no'])\n", " rhists = rhists[rhists.start_unix >= present_time - 3600]\n", " rhists = rhists.drop(columns=['D_n', 'S_n'])\n", " return rhists" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "def make_hrhists(rhists, isplits, timetable):\n", " # 계층화된 형태로 변환\n", " hrhists = [] # hierarchied recent history\n", " for i, row in rhists.iterrows():\n", " inter_no = row.inter_no\n", " start_unix = row.start_unix\n", "\n", " ind = (timetable['start_seconds'] <= row.start_unix).sum() - 1\n", " start_hour = timetable.iloc[ind].start_hour\n", " start_minute = timetable.iloc[ind].start_minute\n", " isplit = isplits[(inter_no, start_hour, start_minute)]\n", " phas_As = [isplit[j][0] for j in isplit.keys()]\n", " phas_Bs = [isplit[j][1] for j in isplit.keys()]\n", " durs_A = row[[f'dura_A{j}' for j in range(1,9)]]\n", " durs_B = row[[f'dura_B{j}' for j in range(1,9)]]\n", " durations = []\n", " for j in range(1, len(isplit)+1):\n", " ja = isplit[j][0]\n", " jb = isplit[j][1]\n", " if ja == jb:\n", " durations.append(min(durs_A[ja-1], durs_B[jb-1]))\n", " else:\n", " durations.append(abs(durs_A[ja-1] - durs_B[ja-1]))\n", " new_rows = pd.DataFrame({'inter_no':[inter_no] * len(durations), 'start_unix':[start_unix] * len(durations),\n", " 'phas_A':phas_As, 'phas_B':phas_Bs, 'duration':durations})\n", " hrhists.append(new_rows)\n", " hrhists = pd.concat(hrhists)\n", " hrhists = hrhists.sort_values(by = ['start_unix', 'inter_no', 'phas_A', 'phas_B']).reset_index(drop=True)\n", " return hrhists" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "def make_movements():\n", " movements_path = '../Intermediates/movement/'\n", " movements_list = [pd.read_csv(movements_path + file, index_col=0) for file in tqdm(os.listdir(movements_path))]\n", " movements = pd.concat(movements_list)\n", " movements = movements.drop(columns=['start_unix'])\n", " movements = movements.drop_duplicates()\n", " movements = movements.sort_values(by=['inter_no', 'phas_A', 'phas_B'])\n", " movements = movements.reset_index(drop=True)\n", " movements.to_csv('../Intermediates/movements.csv')\n", " return movements\n", "# make_movements()" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "def update_movement(hrhists, movement, movements):\n", " # 중복을 제거하고 (inter_no, start_unix) 쌍을 만듭니다.\n", " hrhists_inter_unix = set(hrhists[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None))\n", " movement_inter_unix = set(movement[['inter_no', 'start_unix']].drop_duplicates().itertuples(index=False, name=None))\n", "\n", " # hrhists에는 있지만 movement에는 없는 (inter_no, start_unix) 쌍을 찾습니다.\n", " missing_in_movement = hrhists_inter_unix - movement_inter_unix\n", "\n", " # 새로운 행들을 생성합니다.\n", " new_rows = []\n", " if missing_in_movement:\n", " for inter_no, start_unix in missing_in_movement:\n", " # movements에서 해당 inter_no의 데이터를 찾습니다.\n", " new_row = movements[movements['inter_no'] == inter_no].copy()\n", " # start_unix 값을 설정합니다.\n", " new_row['start_unix'] = start_unix\n", " new_rows.append(new_row)\n", "\n", " # 새로운 데이터프레임을 생성하고 기존 movement 데이터프레임과 합칩니다.\n", " new_movement = pd.concat(new_rows, ignore_index=True)\n", " movement_updated = pd.concat([movement, new_movement], ignore_index=True)\n", " else:\n", " movement_updated = movement\n", " return movement_updated" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "def make_histid(present_time, movedur, inter2node):\n", " # 이동류 매칭 테이블에서 진입id, 진출id를 가져와서 붙임.\n", " for i, row in movedur.iterrows():\n", " inter_no = row.inter_no\n", " start_unix = row.start_unix\n", " # incoming and outgoing edges A\n", " move_A = row.move_A\n", " if move_A in [17, 18]:\n", " inc_edge_A = np.nan\n", " out_edge_A = np.nan\n", " else:\n", " match_A = matching[(matching.inter_no == inter_no) & (matching.move_no == move_A)].iloc[0]\n", " inc_edge_A = match_A.inc_edge\n", " out_edge_A = match_A.out_edge\n", " movedur.loc[i, ['inc_edge_A', 'out_edge_A']] = [inc_edge_A, out_edge_A]\n", " # incoming and outgoing edges B\n", " move_B = row.move_B\n", " if move_B in [17, 18]:\n", " inc_edge_B = np.nan\n", " out_edge_B = np.nan\n", " else:\n", " match_B = matching[(matching.inter_no == inter_no) & (matching.move_no == move_B)].iloc[0]\n", " inc_edge_B = match_B.inc_edge\n", " out_edge_B = match_B.out_edge\n", " movedur.loc[i, ['inc_edge_B', 'out_edge_B']] = [inc_edge_B, out_edge_B]\n", "\n", " # 이동류 컬럼 제거\n", " movedur = movedur.drop(['move_A', 'move_B'], axis=1)\n", "\n", " histid = movedur.copy() # history with edge ids (incoming and outgoing edge ids)\n", " histid['node_id'] = histid['inter_no'].map(inter2node)\n", " histid = histid[['inter_no', 'node_id', 'start_unix', 'phas_A', 'phas_B', 'duration', 'inc_edge_A', 'out_edge_A', 'inc_edge_B', 'out_edge_B']]\n", " histid_start = present_time - 1200\n", " histid = histid[histid.start_unix > histid_start]\n", " return histid" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "def preprocess(m):\n", " '''\n", " 통합테이블(histid)를 만드는 함수\n", "\n", " input : m\n", " - m ranges from 0 to 287, but 0 makes an error where 288 = 86400//300\n", " - present_time = fmins[m] : 현재시점\n", "\n", " output : histid (통합테이블, HISTory with edge_IDs)\n", " - 컬럼 : inter_no, node_id, start_unix, phas_A, phas_B, duration, inc_edge_A, out_edge_A, inc_edge_B, out_edge_B\n", "\n", " 주요 데이터, 중간산출물 및 결과물 :\n", " # 데이터\n", " - history : 신호이력 (inter_no, end_unix, dura_Aj, dura_Bj, cycle, offset)\n", " - plan : 신호계획 (inter_no, start_hour, start_minute, dura_Aj, dura_Bj cycle, offset)\n", " # 중간산출물\n", " - rhists (recent history)\n", " - history에서 현재 시각 이전의 데이터를 가져옴.\n", " - end_unix를 start_unix로 변환\n", " - 참값판단 프로세스(결측·이상치 처리)\n", " - 컬럼 : inter_no, start_unix, dura_Aj, dura_Bj, cycle\n", " - hrhists (hierarchized recent history)\n", " - rhists를 계층화\n", " - 컬럼 : inter_no, start_unix, phas_A, phas_B, duration\n", " - movements\n", " - 각 교차로에 대하여 현시별로 이동류를 정해놓음.\n", " - join시 사용하기 위함.\n", " - 한 번 만들어놓고 두고두고 사용함.\n", " - 컬럼 : inter_no, phas_A, phas_B, move_A, move_B\n", " - movement\n", " - 현재 시점에서의 이동류정보\n", " - 컬럼 : inter_no, phas_A, phas_B, move_A, move_B, start_unix\n", " - movement_updated\n", " - movement와 hrhists를 join하기 전에, movement에는 없지만 hrhists에는 있는 start_unix에 대한 이동류 정보를 가져와 movement에 붙임\n", " - 이동류정보는 앞서 정의한 movements에서 가져옴.\n", " - 컬럼 : inter_no, phas_A, phas_B, move_A, move_B, start_unix\n", " - movedur\n", " - hrhists와 movement_updated를 join\n", " - 컬럼 : inter_no, phas_A, phas_B, move_A, move_B, start_unix, duration\n", " # 결과\n", "\n", " '''\n", " midnight = int(datetime(2024, 1, 5, 0, 0, 0).timestamp())\n", " next_day = int(datetime(2024, 1, 6, 0, 0, 0).timestamp())\n", " fmins = range(midnight, next_day, 300) # fmins : unix time by Five MINuteS\n", "\n", " # 사용할 표준 테이블 목록\n", " plan = pd.read_csv('../Data/tables/plan.csv', index_col=0)\n", " history = pd.read_csv('../Data/tables/history.csv', index_col=0)\n", "\n", " # 참고할 딕셔너리, 데이터프레임 등 목록\n", " splits, isplits = make_splits(plan)\n", " timetable = make_timetable(plan)\n", "\n", " # 현재시점\n", " present_time = fmins[m]\n", " print(datetime.fromtimestamp(present_time))\n", "\n", " # rhists, hrhists\n", " rhists = make_rhists(present_time, timetable, plan, history)\n", " hrhists = make_hrhists(rhists, isplits, timetable)\n", "\n", " # movements, movement, movement_updated\n", " movements = pd.read_csv('../Intermediates/movements.csv', index_col=0)\n", " movement = pd.read_csv(f'../Intermediates/movement/movement_{present_time}.csv', index_col=0)\n", " movement_updated = update_movement(hrhists, movement, movements)\n", "\n", " # movedur\n", " movedur = pd.merge(movement_updated, hrhists, how='inner', on=['inter_no', 'start_unix', 'phas_A', 'phas_B']) # movements and durations\n", " movedur = movedur.sort_values(by=['start_unix', 'inter_no', 'phas_A','phas_B'])\n", " movedur = movedur[['inter_no', 'start_unix', 'phas_A', 'phas_B', 'move_A', 'move_B', 'duration']]\n", "\n", " # histid\n", " histid = make_histid(present_time, movedur, inter2node)\n", " histid.to_csv(f'../Intermediates/histid/histid_{fmins[m]}.csv')\n", "\n", " return histid" ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2024-01-05 00:50:00\n", "2024-01-04 23:59:59 else\n", "2024-01-04 23:59:59 else\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
inter_nonode_idstart_unixphas_Aphas_Bdurationinc_edge_Aout_edge_Ainc_edge_Bout_edge_B
235178i317043822201138571540304_02571556450_01571556450_02571540304_01
236178i317043822202239571556450_02571500475_01571540304_02571540303_01
237178i317043822203340571540303_02.21571556450_01571540303_02.21571500475_01
238178i317043822204423-571500475_01571540303_01-571500475_01571540304_01
618201i817043822301124-571500569_01571500583_02-571500569_01571500618_01
.................................
398210i617043832511124-571542115_01571500535_01NaNNaN
399210i617043832511219-571542115_01571500535_01571500535_02.18571542115_01
400210i617043832512229571500535_02.18571511538_01571500535_02.18571542115_01
401210i617043832513356571511538_02.121571542115_01571511538_02.121571500585_01
402210i617043832514422571500585_02571511538_01571500585_02571500535_01
\n", "

218 rows × 10 columns

\n", "
" ], "text/plain": [ " inter_no node_id start_unix phas_A phas_B duration inc_edge_A \\\n", "235 178 i3 1704382220 1 1 38 571540304_02 \n", "236 178 i3 1704382220 2 2 39 571556450_02 \n", "237 178 i3 1704382220 3 3 40 571540303_02.21 \n", "238 178 i3 1704382220 4 4 23 -571500475_01 \n", "618 201 i8 1704382230 1 1 24 -571500569_01 \n", ".. ... ... ... ... ... ... ... \n", "398 210 i6 1704383251 1 1 24 -571542115_01 \n", "399 210 i6 1704383251 1 2 19 -571542115_01 \n", "400 210 i6 1704383251 2 2 29 571500535_02.18 \n", "401 210 i6 1704383251 3 3 56 571511538_02.121 \n", "402 210 i6 1704383251 4 4 22 571500585_02 \n", "\n", " out_edge_A inc_edge_B out_edge_B \n", "235 571556450_01 571556450_02 571540304_01 \n", "236 571500475_01 571540304_02 571540303_01 \n", "237 571556450_01 571540303_02.21 571500475_01 \n", "238 571540303_01 -571500475_01 571540304_01 \n", "618 571500583_02 -571500569_01 571500618_01 \n", ".. ... ... ... \n", "398 571500535_01 NaN NaN \n", "399 571500535_01 571500535_02.18 571542115_01 \n", "400 571511538_01 571500535_02.18 571542115_01 \n", "401 571542115_01 571511538_02.121 571500585_01 \n", "402 571511538_01 571500585_02 571500535_01 \n", "\n", "[218 rows x 10 columns]" ] }, "execution_count": 55, "metadata": {}, "output_type": "execute_result" } ], "source": [ "preprocess(10)" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2024-01-05 08:45:00\n", "2024-01-05 08:50:00\n" ] } ], "source": [ "for m in range(105, 107):\n", " histid = preprocess(m)" ] } ], "metadata": { "kernelspec": { "display_name": "rts", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 2 }