@ -1,42 +0,0 @@ | |||
import pyodbc, os, json, csv | |||
from datetime import datetime, timedelta | |||
def fetch_history(term_dt:str): | |||
term_DT = datetime.strptime(term_dt, '%Y_%m%d_%H_%M') | |||
init_DT = term_DT - timedelta(minutes=5) | |||
# 루트폴더 지정 | |||
path_root = os.path.dirname(os.path.abspath('__file__')) | |||
with open(os.path.join(path_root, 'configs', 'config_revised.json'), 'r') as config_file: | |||
config = json.load(config_file) | |||
# 세팅 로드 | |||
paths = config['paths'] | |||
connection_info = config['connection_info'] | |||
# 접속 | |||
DSNNAME = connection_info["DSNNAME"] | |||
DBUSER = connection_info["DBUSER"] | |||
DBPWD = connection_info["DBPWD"] | |||
cnxn = pyodbc.connect(f'DSN={DSNNAME};UID={DBUSER};PWD={DBPWD};charset=utf-8') | |||
cursor = cnxn.cursor() | |||
# 스키마명, 테이블명 정의 | |||
schema = 'SNMS' | |||
table = 'TL_IF_SIGL_CYCL' | |||
# 테이블 불러와서 저장 | |||
try: | |||
query = f"SELECT * FROM {schema}.{table} WHERE OCRN_DT >= '{str(init_DT)}' AND OCRN_DT < '{str(term_DT)}'" | |||
cursor.execute(query) | |||
csv_file_path = os.path.join(path_root, *paths['tables'], 'fetched', f"{table}_{term_dt}.csv") | |||
with open(csv_file_path, 'w', newline='', encoding='utf-8-sig') as csvfile: | |||
csv_writer = csv.writer(csvfile) | |||
columns = [column[0] for column in cursor.description] | |||
csv_writer.writerow(columns) | |||
for row in cursor.fetchall(): | |||
csv_writer.writerow(row) | |||
except Exception as e: | |||
print(f"오류 발생: {e}") | |||
fetch_history('2024_0626_16_00') |
@ -1,44 +0,0 @@ | |||
import pyodbc, os, json, csv | |||
from datetime import datetime | |||
def fetch_movement(term_dt:str): | |||
term_DT = datetime.strptime(term_dt, '%Y_%m%d_%H_%M_%S') | |||
# 루트폴더 지정 | |||
path_root = os.path.dirname(os.path.abspath('__file__')) | |||
with open(os.path.join(path_root, 'configs', 'config_revised.json'), 'r') as config_file: | |||
config = json.load(config_file) | |||
# 세팅 로드 | |||
paths = config['paths'] | |||
connection_info = config['connection_info'] | |||
# 접속 | |||
DSNNAME = connection_info["DSNNAME"] | |||
DBUSER = connection_info["DBUSER"] | |||
DBPWD = connection_info["DBPWD"] | |||
cnxn = pyodbc.connect(f'DSN={DSNNAME};UID={DBUSER};PWD={DBPWD};charset=utf-8') | |||
cursor = cnxn.cursor() | |||
# 스키마명, 테이블명 정의 | |||
schema = 'SNMS' | |||
table = 'TL_IF_SIGL' | |||
# 테이블 불러와서 저장 | |||
try: | |||
query = f"SELECT * FROM {schema}.{table} WHERE PHASE_DT = '{str(term_DT)}'" | |||
cursor.execute(query) | |||
csv_file_path = os.path.join(path_root, *paths['tables'], 'fetched', f"{table}_{term_dt}.csv") | |||
row_count = 0 #### | |||
with open(csv_file_path, 'w', newline='', encoding='utf-8-sig') as csvfile: | |||
csv_writer = csv.writer(csvfile) | |||
columns = [column[0] for column in cursor.description] | |||
csv_writer.writerow(columns) | |||
for row in cursor.fetchall(): | |||
csv_writer.writerow(row) | |||
row_count += 1 ### | |||
print(row_count) ### | |||
except Exception as e: | |||
print(f"오류 발생: {e}") | |||
fetch_movement('2024_0624_17_38_24') |