# (rts) PS C:\Github\siggen> python .\Scripts\fetch_tables.py
|
|
import pandas as pd
|
|
import pyodbc
|
|
import os, json, csv
|
|
from tqdm import tqdm
|
|
from datetime import datetime
|
|
|
|
starting_time = datetime.now()
|
|
|
|
# 루트폴더 지정
|
|
path_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
with open(os.path.join(path_root, 'Scripts', 'config.json'), 'r') as config_file:
|
|
config = json.load(config_file)
|
|
|
|
# 주요 폴더 경로 지정
|
|
paths = config['paths']
|
|
path_tables = os.path.join(path_root, *paths['tables'])
|
|
path_results = os.path.join(path_root, *paths['results'])
|
|
|
|
# 이슈사항 목록
|
|
issues = []
|
|
|
|
# DB 접속정보
|
|
connection_info = config['connection_info']
|
|
DSNNAME = connection_info["DSNNAME"]
|
|
DBUSER = connection_info["DBUSER"]
|
|
DBPWD = connection_info["DBPWD"]
|
|
|
|
# 오류 발생을 위한 코드
|
|
# DSNNAME += 'a'
|
|
# DBUSER += 'a'
|
|
# DBPWD += 'a'
|
|
|
|
# 데이터베이스 연결
|
|
try:
|
|
cnxn = pyodbc.connect(f'DSN={DSNNAME};UID={DBUSER};PWD={DBPWD};charset=utf-8')
|
|
cursor = cnxn.cursor()
|
|
print("데이터베이스 연결에 성공했습니다.")
|
|
except pyodbc.InterfaceError:
|
|
print("데이터베이스 연결 실패: 데이터 원본 이름을 확인하거나 기본 드라이버를 지정하세요.")
|
|
# 여기서 오류 처리 로직을 추가할 수 있습니다.
|
|
except pyodbc.OperationalError as e:
|
|
if "Login failed" in str(e):
|
|
print("로그인 실패: 사용자 이름 또는 비밀번호가 유효하지 않습니다.")
|
|
else:
|
|
print("연결 실패: 운영 체제 레벨에서 오류가 발생했습니다.")
|
|
# 여기서 오류 처리 로직을 추가할 수 있습니다.
|
|
except Exception as e:
|
|
print(f"예기치 않은 오류가 발생했습니다: {e}")
|
|
# 여기서 오류 처리 로직을 추가할 수 있습니다.
|
|
|
|
|
|
schema = 'SNITS_INT'
|
|
tables = ['S_INT_CONFIG', # 교차로 제어기
|
|
'S_INT_PHASE_CONFIG', # 교차로 현시구성
|
|
'S_INT_TPLAN', # 교차로 시간계획
|
|
'S_SA_CYCLE_PLAN',
|
|
'S_SA_DPLAN', # 그룹 일계획
|
|
'S_SA_WPLAN', # 그룹 주간계획
|
|
'S_TOD_HIS'] # 신호 TOD 이력
|
|
|
|
# 폴더 Data\tables\yyyymmdd_hhmmss 생성
|
|
timestamp = starting_time.strftime('%Y%m%d_%H%M%S')
|
|
# base_dir = os.path.join(path_tables, timestamp)
|
|
os.makedirs(os.path.join(path_tables, timestamp), exist_ok=True)
|
|
|
|
def fetch_table(table, condition=""):
|
|
try:
|
|
query = f"SELECT * FROM {schema}.{table} {condition}"
|
|
cursor.execute(query)
|
|
csv_file_path = os.path.join(path_tables, timestamp, f"{table}.csv")
|
|
with open(csv_file_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
|
csv_writer = csv.writer(csvfile)
|
|
columns = [column[0] for column in cursor.description]
|
|
csv_writer.writerow(columns)
|
|
for row in cursor.fetchall():
|
|
csv_writer.writerow(row)
|
|
except pyodbc.ProgrammingError as e:
|
|
if '42S02' in str(e):
|
|
print(f"오류: '{table}' 테이블이 스키마에 존재하지 않습니다.")
|
|
else:
|
|
print(f"SQL 실행 오류: {e}")
|
|
except Exception as e:
|
|
print(f"예기치 않은 오류가 발생했습니다: {e}")
|
|
|
|
fetch_table('S_INT_CONFIG')
|
|
fetch_table('S_INT_PHASE_CONFIG')
|
|
fetch_table('S_INT_TPLAN')
|
|
fetch_table('S_SA_CYCLE_PLAN')
|
|
fetch_table('S_SA_DPLAN')
|
|
fetch_table('S_SA_WPLAN')
|
|
fetch_table('S_TOD_HIS', condition="WHERE INT_CREATE_DATE >= TO_TIMESTAMP('2023-10-17 23:15:00.0') ORDER BY INT_NO ASC, INT_CREATE_DATE DESC;")
|
|
# 오류 발생을 위한 코드
|
|
fetch_table('foo')
|
|
print("테이블을 모두 불러왔습니다.")
|
|
|
|
cnxn.close()
|
|
|
|
inter_info = pd.read_csv(os.path.join(path_tables, timestamp, 'S_INT_CONFIG.csv'))
|
|
plan = pd.read_csv(os.path.join(path_tables, timestamp, 'S_INT_TPLAN.csv'))
|
|
history = pd.read_csv(os.path.join(path_tables, timestamp, 'S_TOD_HIS.csv'))
|
|
print(inter_info)
|
|
print(plan)
|
|
print(history)
|
|
|
|
# 1-4-2. 교차로정보(inter_info) 검사
|
|
def check_inter_info():
|
|
# 1-4-2-1. inter_lat, inter_lon 적절성 검사
|
|
inter_info.loc[0, 'INT_LAT'] = 38.0 # 에러 발생을 위한 코드
|
|
max_lon, min_lon = 127.3, 127.0
|
|
max_lat, min_lat = 37.5, 37.2
|
|
for _, row in inter_info.iterrows():
|
|
latbool = min_lat <= row['INT_LAT'] <= max_lat
|
|
lonbool = min_lon <= row['INT_LNG'] <= max_lon
|
|
if not(latbool and lonbool):
|
|
msg = f"1-4-2-1. 위도 또는 경도가 범위를 벗어난 교차로가 있습니다: INT_NO : {row['INT_NO']}"
|
|
issues.append(msg)
|
|
|
|
def write_issues():
|
|
print('3. 이슈사항을 저장합니다.')
|
|
path_issues = os.path.join(path_results, "issues_fetch_tables.txt")
|
|
with open(path_issues, "w", encoding="utf-8") as file:
|
|
for item in issues:
|
|
file.write(item + "\n")
|
|
if issues:
|
|
print("데이터 처리 중 발생한 특이사항은 다음과 같습니다. :")
|
|
for review in issues:
|
|
print(review)
|
|
check_inter_info()
|
|
write_issues()
|
|
|
|
print("elapsed time :", datetime.now() - starting_time)
|