본문 바로가기
프로그램/파이쎤

[파이쎤] 이기종간의 DB 실시간 동기화

by cbwstar 2022. 6. 21.
728x90
반응형

파이썬 버전 : 3.8.8 로 테스트

파이썬을 설치후

패키지를 다운받아서 설치한다.

pip install contextlib

pip install sqlalchemy

pip install jaydebeapi

pip install pandas

pip install apscheduler

 

1. config.py

python
닫기
TIBERO_DB_CONFIG = { ​​​​'driver': 'com.tmax.tibero.jdbc.TbDriver', ​​​​'url': 'jdbc:tibero:thin:@192.100.1.10:829:tibero', ​​​​'user': 'tiberotest', ​​​​'password': 'test1234', ​​​​'jdbcDriver': 'd:/python_schedule/jdbc/tibero6-jdbc.jar' } PG_DB_CONFIG = { ​​​​'host': '192.100.1.45', ​​​​'user': 'pgtest', ​​​​'password': 'pgtest', ​​​​'db': 'postgre', ​​​​'port': '5432' }

2. dbSync.py

python
닫기
from contextlib import contextmanager import config from sqlalchemy import create_engine import sqlalchemy.types as sql_types import jaydebeapi as jp import pandas as pd import time from apscheduler.schedulers.background import BackgroundScheduler # db접속 설정 class Connection: ​​​​def __init__(self): ​​​​​​​​self.t_driver = config.TIBERO_DB_CONFIG['driver'] ​​​​​​​​self.t_url = config.TIBERO_DB_CONFIG['url'] ​​​​​​​​self.t_user = config.TIBERO_DB_CONFIG['user'] ​​​​​​​​self.t_password = config.TIBERO_DB_CONFIG['password'] ​​​​​​​​self.t_jdbc_driver = config.TIBERO_DB_CONFIG['jdbcDriver'] ​​​​​​​​self.p_host = config.PG_DB_CONFIG['host'] ​​​​​​​​self.p_user = config.PG_DB_CONFIG['user'] ​​​​​​​​self.p_password = config.PG_DB_CONFIG['password'] ​​​​​​​​self.p_db = config.PG_DB_CONFIG['db'] ​​​​​​​​self.p_port = config.PG_DB_CONFIG['port'] ​​​​​​​​self.pg_db = self.__pg_connect() ​​​​​​​​self.tb_db = self.__tb_connect() ​​​​def __pg_connect(self): ​​​​​​​​url = 'postgresql://{}:{}@{}:{}/{}'.format( ​​​​​​​​​​​​self.p_user, self.p_password, self.p_host, self.p_port, self.p_db) ​​​​​​​​engine = create_engine( ​​​​​​​​​​​​url, client_encoding='utf8', executemany_mode='batch') ​​​​​​​​return engine ​​​​def __tb_connect(self): ​​​​​​​​print(self.t_jdbc_driver) ​​​​​​​​conn = jp.connect(self.t_driver, self.t_url, [ ​​​​​​​​​​​​​​​​​​​​​​​​​​self.t_user, self.t_password], jars=[self.t_jdbc_driver]) ​​​​​​​​conn.jconn.setAutoCommit(False) ​​​​​​​​return conn class TableSyncer: ​​​​def __init__(self, db_conn, tbl_name, tbl_keys): ​​​​​​​​self._db_conn = db_conn ​​​​​​​​self._tbl_name = tbl_name ​​​​​​​​self._tbl_keys = tbl_keys ​​​​​​​​self._tbl_vals = '' @property ​​​​def db_conn(self): ​​​​​​​​return self._db_conn @property ​​​​def tbl_name(self): ​​​​​​​​return self._tbl_name @property ​​​​def tbl_keys(self): ​​​​​​​​return self._tbl_keys @property ​​​​def tbl_vals(self): ​​​​​​​​return self._tbl_vals ​​​​def sync_tbl(self): ​​​​​​​​start_time = time.time() ​​​​​​​​start_time2 = time.time() ​​​​​​​​tb_engine = self.db_conn.tb_db # 티베로 접속하여 읽어 온다 ​​​​​​​​print(f'tb_engine: {tb_engine}') ​​​​​​​​ss_qry = Query.SQL['ss_TEMPLATE'].format( ​​​​​​​​​​​​self.tbl_name) ​​​​​​​​for df in pd.read_sql_query(ss_qry, tb_engine, chunksize=10000): ​​​​​​​​​​​# print(df.columns) ​​​​​​​​​​​​print("to_sql read : %s seconds" % (time.time() - start_time)) ​​​​​​​​​​​​df.to_sql(name=self.tbl_name, schema='public', con=self.db_conn.pg_db, if_exists='append', ​​​​​​​​​​​​​​​​​​​​​​chunksize=2000, index=False, method='multi') ​​​​​​​​​​​​print("postgre_sql insert : %s seconds" % ​​​​​​​​​​​​​​​​​​(time.time() - start_time2)) ​​​​​​​​​​​​# 키컬럼에 값을 리스트로 가지고 온다. ​​​​​​​​​​​​rows = list(df[self.tbl_keys].itertuples(index=False)) ​​​​​​​​​​​​sql_param = "and ( " ​​​​​​​​​​​​for row in tuple(rows): ​​​​​​​​​​​​​​​​series = pd.Series(row) ​​​​​​​​​​​​​​​​index = 0 ​​​​​​​​​​​​​​​​sql_param += "(" ​​​​​​​​​​​​​​​​for data in series: ​​​​​​​​​​​​​​​​​​​​sql_param += f" {self.tbl_keys[index]} = {data} " ​​​​​​​​​​​​​​​​​​​​index = index + 1 ​​​​​​​​​​​​​​​​​​​​sql_param += " and" if index < series.size else "" ​​​​​​​​​​​​​​​​sql_param += ") or " ​​​​​​​​​​​​sql_param = sql_param[:-3] ​​​​​​​​​​​​sql_param += ")" ​​​​​​​​​​​​su_qry = Query.SQL['upd_TEMPLATE'].format( ​​​​​​​​​​​​​​​​self.tbl_name, 'sysdate', sql_param) ​​​​​​​​​​​# print(f'su_qry: {su_qry}') ​​​​​​​​​​​​try: ​​​​​​​​​​​​​​​​curs = tb_engine.cursor() ​​​​​​​​​​​​​​​​curs.execute(su_qry) ​​​​​​​​​​​​except Exception as e: ​​​​​​​​​​​​​​​​tb_engine.rollback() ​​​​​​​​​​​​​​​​raise ​​​​​​​​​​​​else: ​​​​​​​​​​​​​​​​tb_engine.commit() ​​​​​​​​​​​​finally: ​​​​​​​​​​​​​​​​curs.close() # 처리할 쿼리 정의 class Query(object): ​​​​SQL = { ​​​​​​​​'upd_test_grp': """ ​​​​​​​​UPDATE tibero.{} SET upd_tm = {} ​​​​​​​​WHERE 1 = 1 ​​​​​​​​​{} ​​​​​​""", ​​​​​​​​'upd_test_jdg_gz': """ ​​​​​​​​UPDATE tibero.{} SET upd_tm = {} ​​​​​​​​WHERE 1 = 1 ​​​​​​​​​{} ​​​​​​""", ​​​​​​​​'upd_TEMPLATE': """ ​​​​​​​​UPDATE tibero.{} SET upd_tm = {} ​​​​​​​​WHERE 1 = 1 ​​​​​​​​{} ​​​​​​​​""", ​​​​​​​​'ss_test_p_grp': """ ​​​​​​​​​​​​SELECT * ​​​​​​​​​​​​FROM tibero.test_prdr_grp ​​​​​​""", ​​​​​​​​'ss_test_ags_or': """ ​​​​​​​​​​​​SELECT * ​​​​​​​​​​​​FROM tibero.test_ags ​​​​​​""", ​​​​​​​​# 템플릿 ​​​​​​​​'ss_TEMPLATE': """ ​​​​​​​​​​​​SELECT * ​​​​​​​​​​​​FROM test.{} ​​​​​​""" ​​​​} # 처리할 테이블에 키컬럼 정의 def qry_condition(x): ​​​​return { ​​​​​​​​'test_prdr_g': ['test_prdr_g', ['CERT_ASGN_NO', 'RCV_SEQ']], ​​​​​​​​'test_jdg_or': ['test_jdg_or', ['CERT_ASGN_NO', 'JDG_SEQ', 'JDGR_SEQ']], ​​​​}[x] def exec_sync(db_conn, tableNm): ​​​​print(f'exec_sync db_conn : {db_conn}, tableNm : {tableNm}') ​​​​params = qry_condition(tableNm) ​​​​print(f'params : {params}') # params[0]:테이블명, params[1]:키컬럼 ​​​​db_util = TableSyncer(db_conn, params[0], params[1]) ​​​​db_util.sync_tbl() if __name__ == '__main__': ​​​​# 티베로,포스트그레스 db접속 ​​​​# Connection() 한개로 생성하면 db에 한개의 커넥션면 연결이 되어서 하나씩 순차적으로 실행된다. ​​​​# 여러개의 테이블을 동시에 처리할려면 해당갯수만큼 DB Connection() 을 생성해서 처리하면 멀티로 처리됨 ​​​​db_conn = Connection() ​​​​#db_conn2 = Connection() ​​​​#print(f'db_conn : {db_conn}') ​​​​scheduler = BackgroundScheduler() ​​​​# 여러개의 테이블을 동기화 할때 필요한 스케줄을 초단위로 시간을 설정하여 스케줄에 등록한다. ​​​​# Connection()객체 한개로 하면 먼저 실행한건이 끝날때가지 나중에 실행하는건은 스케줄 큐에 대기 하였다가 앞에것이 끝나야 처리됨 다중처리 필요시 각각의 ​​​​# 처리건만큼 db커넥션을 생성해야 다중처리 가능 ​​​​scheduler.add_job(exec_sync, 'interval', seconds=30, ​​​​​​​​​​​​​​​​​​​​​​args=[db_conn, 'AGS_PRDR_GRP']) ​​​​scheduler.add_job(exec_sync, 'interval', seconds=30, ​​​​​​​​​​​​​​​​​​​​​​args=[db_conn, 'AGS_JDGR_ORGZ']) ​​​​scheduler.start() ​​​​try: ​​​​​​​​while True: ​​​​​​​​​​​​time.sleep(2) ​​​​except (KeyboardInterrupt, SystemExit): ​​​​​​​​scheduler.shutdown()

 

 

728x90
반응형

'프로그램 > 파이쎤' 카테고리의 다른 글

airflow 설치  (0) 2023.05.10
[파이쎤] 이기종 db 마이그레이션  (0) 2022.06.21


"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다."