728x90
반응형
파이썬 버전 : 3.8.8 로 테스트
파이썬을 설치후
패키지를 다운받아서 설치한다.
pip install contextlib
pip install sqlalchemy
pip install jaydebeapi
pip install pandas
pip install apscheduler
1. config.py
TIBERO_DB_CONFIG = {
'driver': 'com.tmax.tibero.jdbc.TbDriver',
'url': 'jdbc:tibero:thin:@192.100.1.10:829:tibero',
'user': 'tiberotest',
'password': 'test1234',
'jdbcDriver': 'd:/python_schedule/jdbc/tibero6-jdbc.jar'
}
PG_DB_CONFIG = {
'host': '192.100.1.45',
'user': 'pgtest',
'password': 'pgtest',
'db': 'postgre',
'port': '5432'
}
2. dbSync.py
from contextlib import contextmanager
import config
from sqlalchemy import create_engine
import sqlalchemy.types as sql_types
import jaydebeapi as jp
import pandas as pd
import time
from apscheduler.schedulers.background import BackgroundScheduler
# db접속 설정
class Connection:
def __init__(self):
self.t_driver = config.TIBERO_DB_CONFIG['driver']
self.t_url = config.TIBERO_DB_CONFIG['url']
self.t_user = config.TIBERO_DB_CONFIG['user']
self.t_password = config.TIBERO_DB_CONFIG['password']
self.t_jdbc_driver = config.TIBERO_DB_CONFIG['jdbcDriver']
self.p_host = config.PG_DB_CONFIG['host']
self.p_user = config.PG_DB_CONFIG['user']
self.p_password = config.PG_DB_CONFIG['password']
self.p_db = config.PG_DB_CONFIG['db']
self.p_port = config.PG_DB_CONFIG['port']
self.pg_db = self.__pg_connect()
self.tb_db = self.__tb_connect()
def __pg_connect(self):
url = 'postgresql://{}:{}@{}:{}/{}'.format(
self.p_user, self.p_password, self.p_host, self.p_port, self.p_db)
engine = create_engine(
url, client_encoding='utf8', executemany_mode='batch')
return engine
def __tb_connect(self):
print(self.t_jdbc_driver)
conn = jp.connect(self.t_driver, self.t_url, [
self.t_user, self.t_password], jars=[self.t_jdbc_driver])
conn.jconn.setAutoCommit(False)
return conn
class TableSyncer:
def __init__(self, db_conn, tbl_name, tbl_keys):
self._db_conn = db_conn
self._tbl_name = tbl_name
self._tbl_keys = tbl_keys
self._tbl_vals = ''
@property
def db_conn(self):
return self._db_conn
@property
def tbl_name(self):
return self._tbl_name
@property
def tbl_keys(self):
return self._tbl_keys
@property
def tbl_vals(self):
return self._tbl_vals
def sync_tbl(self):
start_time = time.time()
start_time2 = time.time()
tb_engine = self.db_conn.tb_db # 티베로 접속하여 읽어 온다
print(f'tb_engine: {tb_engine}')
ss_qry = Query.SQL['ss_TEMPLATE'].format(
self.tbl_name)
for df in pd.read_sql_query(ss_qry, tb_engine, chunksize=10000):
# print(df.columns)
print("to_sql read : %s seconds" % (time.time() - start_time))
df.to_sql(name=self.tbl_name, schema='public', con=self.db_conn.pg_db, if_exists='append',
chunksize=2000, index=False, method='multi')
print("postgre_sql insert : %s seconds" %
(time.time() - start_time2))
# 키컬럼에 값을 리스트로 가지고 온다.
rows = list(df[self.tbl_keys].itertuples(index=False))
sql_param = "and ( "
for row in tuple(rows):
series = pd.Series(row)
index = 0
sql_param += "("
for data in series:
sql_param += f" {self.tbl_keys[index]} = {data} "
index = index + 1
sql_param += " and" if index < series.size else ""
sql_param += ") or "
sql_param = sql_param[:-3]
sql_param += ")"
su_qry = Query.SQL['upd_TEMPLATE'].format(
self.tbl_name, 'sysdate', sql_param)
# print(f'su_qry: {su_qry}')
try:
curs = tb_engine.cursor()
curs.execute(su_qry)
except Exception as e:
tb_engine.rollback()
raise
else:
tb_engine.commit()
finally:
curs.close()
# 처리할 쿼리 정의
class Query(object):
SQL = {
'upd_test_grp': """
UPDATE tibero.{} SET upd_tm = {}
WHERE 1 = 1
{}
""",
'upd_test_jdg_gz': """
UPDATE tibero.{} SET upd_tm = {}
WHERE 1 = 1
{}
""",
'upd_TEMPLATE': """
UPDATE tibero.{} SET upd_tm = {}
WHERE 1 = 1
{}
""",
'ss_test_p_grp': """
SELECT *
FROM tibero.test_prdr_grp
""",
'ss_test_ags_or': """
SELECT *
FROM tibero.test_ags
""",
# 템플릿
'ss_TEMPLATE': """
SELECT *
FROM test.{}
"""
}
# 처리할 테이블에 키컬럼 정의
def qry_condition(x):
return {
'test_prdr_g': ['test_prdr_g', ['CERT_ASGN_NO', 'RCV_SEQ']],
'test_jdg_or': ['test_jdg_or', ['CERT_ASGN_NO', 'JDG_SEQ', 'JDGR_SEQ']],
}[x]
def exec_sync(db_conn, tableNm):
print(f'exec_sync db_conn : {db_conn}, tableNm : {tableNm}')
params = qry_condition(tableNm)
print(f'params : {params}') # params[0]:테이블명, params[1]:키컬럼
db_util = TableSyncer(db_conn, params[0], params[1])
db_util.sync_tbl()
if __name__ == '__main__':
# 티베로,포스트그레스 db접속
# Connection() 한개로 생성하면 db에 한개의 커넥션면 연결이 되어서 하나씩 순차적으로 실행된다.
# 여러개의 테이블을 동시에 처리할려면 해당갯수만큼 DB Connection() 을 생성해서 처리하면 멀티로 처리됨
db_conn = Connection()
#db_conn2 = Connection()
#print(f'db_conn : {db_conn}')
scheduler = BackgroundScheduler()
# 여러개의 테이블을 동기화 할때 필요한 스케줄을 초단위로 시간을 설정하여 스케줄에 등록한다.
# Connection()객체 한개로 하면 먼저 실행한건이 끝날때가지 나중에 실행하는건은 스케줄 큐에 대기 하였다가 앞에것이 끝나야 처리됨 다중처리 필요시 각각의
# 처리건만큼 db커넥션을 생성해야 다중처리 가능
scheduler.add_job(exec_sync, 'interval', seconds=30,
args=[db_conn, 'AGS_PRDR_GRP'])
scheduler.add_job(exec_sync, 'interval', seconds=30,
args=[db_conn, 'AGS_JDGR_ORGZ'])
scheduler.start()
try:
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
728x90
반응형
'프로그램 > 파이쎤' 카테고리의 다른 글
airflow 설치 (0) | 2023.05.10 |
---|---|
[파이쎤] 이기종 db 마이그레이션 (0) | 2022.06.21 |
댓글