728x90
반응형
대용량 데이터를 이관할때 파이쎤을 이용한 벌크인서트와 읽어 올때 chunksize 옵션 처리 기능을 이용해서 메모리 오버플로우를 처리 하였습니다.
메모리가 충분하면 한방에 읽어와서 처리하면 처리가 빠르지만 메모리 용량이 부족할 경우에는 나누어서 읽어서 처리를 해야 메모리 오버플로우가 발생하지 앖습니다.
파이썬 버전 : 3.8.8 로 테스트
파이썬을 설치후
패키지를 다운받아서 설치한다.
pip install contextlib
pip install sqlalchemy
pip install jaydebeapi
pip install pandas
pip install apscheduler
1. config.py
TIBERO_DB_CONFIG = {
'driver': 'com.tmax.tibero.jdbc.TbDriver',
'url': 'jdbc:tibero:thin:@192.100.1.10:829:tibero',
'user': 'tiberotest',
'password': 'test1234',
'jdbcDriver': 'd:/python_schedule/jdbc/tibero6-jdbc.jar'
}
PG_DB_CONFIG = {
'host': '192.100.1.45',
'user': 'pgtest',
'password': 'pgtest',
'db': 'postgre',
'port': '5432'
}
2.dbMigration.py
from contextlib import contextmanager
import config
from sqlalchemy import create_engine
import sqlalchemy.types as sql_types
import jaydebeapi as jp
import pandas as pd
import time
from apscheduler.schedulers.background import BackgroundScheduler
# db 티베로,postgresql 접속 설정
class Connection:
def __init__(self):
self.t_driver = config.TIBERO_DB_CONFIG['driver']
self.t_url = config.TIBERO_DB_CONFIG['url']
self.t_user = config.TIBERO_DB_CONFIG['user']
self.t_password = config.TIBERO_DB_CONFIG['password']
self.t_jdbc_driver = config.TIBERO_DB_CONFIG['jdbcDriver']
self.p_host = config.PG_DB_CONFIG['host']
self.p_user = config.PG_DB_CONFIG['user']
self.p_password = config.PG_DB_CONFIG['password']
self.p_db = config.PG_DB_CONFIG['db']
self.p_port = config.PG_DB_CONFIG['port']
self.pg_db = self.__pg_connect()
self.tb_db = self.__tb_connect()
def __pg_connect(self):
url = 'postgresql://{}:{}@{}:{}/{}'.format(
self.p_user, self.p_password, self.p_host, self.p_port, self.p_db)
engine = create_engine(
url, client_encoding='utf8', executemany_mode='batch')
return engine
def __tb_connect(self):
print(self.t_jdbc_driver)
conn = jp.connect(self.t_driver, self.t_url, [
self.t_user, self.t_password], jars=[self.t_jdbc_driver])
conn.jconn.setAutoCommit(False)
return conn
class TableSyncer:
def __init__(self, db_conn, tbl_name):
self._db_conn = db_conn
self._tbl_name = tbl_name
@property
def db_conn(self):
return self._db_conn
@property
def tbl_name(self):
return self._tbl_name
def sync_tbl(self):
start_time = time.time()
start_time2 = time.time()
tb_engine = self.db_conn.tb_db # 티베로 접속
pg_engine = self.db_conn.pg_db # postgresql 접속
print(f'tb_engine: {tb_engine}')
# 테이블이 존재하는지 체크
table_qry = "SELECT COUNT(*) as cnt FROM pg_tables WHERE schemaname = 'public' AND tablename = '{}'".format(
self.tbl_name.lower())
print(f'table_qry:{table_qry}')
df_exit = pd.read_sql_query(table_qry, pg_engine)
# print(f'df_exit.iloc[0] {df_exit.iloc[0,0]}')
# for index, row in df_exit.iterrows():
# print(f"cnt 값 : {row.cnt} ")
# 기존에 이관된 데이타가 존재하면 삭제 처리후 이관처리
del_qry = Query.SQL['del_TEMPLATE'].format(
self.tbl_name)
print(f'삭제할 쿼리 : {del_qry}')
# 테이블이 존재하면 데이터 삭제
if df_exit.iloc[0, 0] > 0:
pg_engine.execute(del_qry)
# 티베로에 데이터를 읽어온다.
sel_qry = Query.SQL['select_TEMPLATE'].format(
self.tbl_name)
print(f'처리할테이블 : {self.tbl_name}')
for df in pd.read_sql_query(sel_qry, tb_engine, chunksize=10000):
# 컬럼헤더 재정의 (PostgreSQL에 맞게 소문자 처리)
df.columns = df.columns.str.lower()
# print(df.columns)
print("to_sql read : %s seconds" % (time.time() - start_time))
# postgresql에 인서트 한다.
df.to_sql(name=self.tbl_name.lower(), schema='public', con=self.db_conn.pg_db, if_exists='append',
chunksize=2000, index=False, method='multi',
dtype={
'lgn_id': sql_types.VARCHAR(20),
'reg_tm': sql_types.TIMESTAMP,
"upd_tm": sql_types.TIMESTAMP,
"lat_login_dt": sql_types.TIMESTAMP,
"confirm_dt": sql_types.TIMESTAMP,
"cancel_dt": sql_types.TIMESTAMP,
"secsn_dt": sql_types.TIMESTAMP,
"info_agree_dt": sql_types.TIMESTAMP,
"cd_len": sql_types.NUMERIC,
"ara_order": sql_types.NUMERIC,
"latitude": sql_types.NUMERIC,
"longitude": sql_types.NUMERIC,
"reg_dttm": sql_types.TIMESTAMP,
"upd_dttm": sql_types.TIMESTAMP,
"hldy_dt": sql_types.TIMESTAMP
})
print("postgre_sql insert : %s seconds" %
(time.time() - start_time2))
# 이관 처리할 쿼리 정의
class Query(object):
SQL = {
# 삭제처리
'del_TEMPLATE': """
DELETE FROM public.{}
""",
# 데이터 읽어오기 템플릿
'select_TEMPLATE': """
SELECT *
FROM tibero.{}
"""
}
def exec_sync(db_conn, tableNm):
print(f'exec_sync db_conn : {db_conn}, tableNm : {tableNm}')
db_util = TableSyncer(db_conn, tableNm)
db_util.sync_tbl()
# 이관할 테이블 배열 정의..
tables = ['test_table', 'test_table2']
if __name__ == '__main__':
# 티베로,포스트그레스 db접속
db_conn = Connection()
for index, tableNm in enumerate(tables):
exec_sync(db_conn, tableNm)
728x90
반응형
'프로그램 > 파이쎤' 카테고리의 다른 글
airflow 설치 (0) | 2023.05.10 |
---|---|
[파이쎤] 이기종간의 DB 실시간 동기화 (0) | 2022.06.21 |
댓글