본문 바로가기
프로그램/파이쎤

[파이쎤] 이기종간의 DB 실시간 동기화

by cbwstar 2022. 6. 21.
728x90
반응형

파이썬 버전 : 3.8.8 로 테스트

파이썬을 설치후

패키지를 다운받아서 설치한다.

pip install contextlib

pip install sqlalchemy

pip install jaydebeapi

pip install pandas

pip install apscheduler

 

1. config.py

TIBERO_DB_CONFIG = {
    'driver': 'com.tmax.tibero.jdbc.TbDriver',
    'url': 'jdbc:tibero:thin:@192.100.1.10:829:tibero',
    'user': 'tiberotest',
    'password': 'test1234',
    'jdbcDriver': 'd:/python_schedule/jdbc/tibero6-jdbc.jar'
}

PG_DB_CONFIG = {
    'host': '192.100.1.45',
    'user': 'pgtest',
    'password': 'pgtest',
    'db': 'postgre',
    'port': '5432'
}

2. dbSync.py

from contextlib import contextmanager
import config
from sqlalchemy import create_engine
import sqlalchemy.types as sql_types
import jaydebeapi as jp
import pandas as pd
import time
from apscheduler.schedulers.background import BackgroundScheduler

# db접속 설정


class Connection:
    def __init__(self):
        self.t_driver = config.TIBERO_DB_CONFIG['driver']
        self.t_url = config.TIBERO_DB_CONFIG['url']
        self.t_user = config.TIBERO_DB_CONFIG['user']
        self.t_password = config.TIBERO_DB_CONFIG['password']
        self.t_jdbc_driver = config.TIBERO_DB_CONFIG['jdbcDriver']
        self.p_host = config.PG_DB_CONFIG['host']
        self.p_user = config.PG_DB_CONFIG['user']
        self.p_password = config.PG_DB_CONFIG['password']
        self.p_db = config.PG_DB_CONFIG['db']
        self.p_port = config.PG_DB_CONFIG['port']
        self.pg_db = self.__pg_connect()
        self.tb_db = self.__tb_connect()

    def __pg_connect(self):
        url = 'postgresql://{}:{}@{}:{}/{}'.format(
            self.p_user, self.p_password, self.p_host, self.p_port, self.p_db)
        engine = create_engine(
            url, client_encoding='utf8', executemany_mode='batch')
        return engine

    def __tb_connect(self):
        print(self.t_jdbc_driver)
        conn = jp.connect(self.t_driver, self.t_url, [
                          self.t_user, self.t_password], jars=[self.t_jdbc_driver])
        conn.jconn.setAutoCommit(False)
        return conn


class TableSyncer:
    def __init__(self, db_conn, tbl_name, tbl_keys):
        self._db_conn = db_conn
        self._tbl_name = tbl_name
        self._tbl_keys = tbl_keys
        self._tbl_vals = ''

    @property
    def db_conn(self):
        return self._db_conn

    @property
    def tbl_name(self):
        return self._tbl_name

    @property
    def tbl_keys(self):
        return self._tbl_keys

    @property
    def tbl_vals(self):
        return self._tbl_vals

    def sync_tbl(self):
        start_time = time.time()
        start_time2 = time.time()
        tb_engine = self.db_conn.tb_db  # 티베로 접속하여 읽어 온다
        print(f'tb_engine: {tb_engine}')
        ss_qry = Query.SQL['ss_TEMPLATE'].format(
            self.tbl_name)

        for df in pd.read_sql_query(ss_qry, tb_engine, chunksize=10000):
           # print(df.columns)

            print("to_sql read : %s seconds" % (time.time() - start_time))

            df.to_sql(name=self.tbl_name, schema='public', con=self.db_conn.pg_db, if_exists='append',
                      chunksize=2000, index=False, method='multi')

            print("postgre_sql insert : %s seconds" %
                  (time.time() - start_time2))

            # 키컬럼에 값을 리스트로 가지고 온다.
            rows = list(df[self.tbl_keys].itertuples(index=False))
            sql_param = "and ( "
            for row in tuple(rows):
                series = pd.Series(row)
                index = 0
                sql_param += "("
                for data in series:
                    sql_param += f" {self.tbl_keys[index]} = {data} "
                    index = index + 1
                    sql_param += " and" if index < series.size else ""
                sql_param += ") or "
            sql_param = sql_param[:-3]
            sql_param += ")"
            su_qry = Query.SQL['upd_TEMPLATE'].format(
                self.tbl_name, 'sysdate', sql_param)
           # print(f'su_qry: {su_qry}')

            try:
                curs = tb_engine.cursor()
                curs.execute(su_qry)
            except Exception as e:
                tb_engine.rollback()
                raise
            else:
                tb_engine.commit()
            finally:
                curs.close()

# 처리할 쿼리 정의


class Query(object):
    SQL = {
        'upd_test_grp': """
        UPDATE tibero.{} SET upd_tm = {}
        WHERE 1 = 1
         {} 
      """,
        'upd_test_jdg_gz': """
        UPDATE tibero.{} SET upd_tm = {}
        WHERE 1 = 1
         {} 
      """,
        'upd_TEMPLATE': """
        UPDATE tibero.{} SET upd_tm = {}
        WHERE 1 = 1
        {}
        """,
        'ss_test_p_grp': """
            SELECT *
            FROM tibero.test_prdr_grp
      """,
        'ss_test_ags_or': """
            SELECT *
            FROM tibero.test_ags
      """,
        # 템플릿
        'ss_TEMPLATE': """
            SELECT *
            FROM test.{}
      """
    }

# 처리할 테이블에 키컬럼 정의


def qry_condition(x):
    return {
        'test_prdr_g': ['test_prdr_g', ['CERT_ASGN_NO', 'RCV_SEQ']],
        'test_jdg_or': ['test_jdg_or', ['CERT_ASGN_NO', 'JDG_SEQ', 'JDGR_SEQ']],
    }[x]


def exec_sync(db_conn, tableNm):
    print(f'exec_sync db_conn : {db_conn}, tableNm : {tableNm}')
    params = qry_condition(tableNm)
    print(f'params : {params}')  # params[0]:테이블명,  params[1]:키컬럼
    db_util = TableSyncer(db_conn, params[0], params[1])
    db_util.sync_tbl()


if __name__ == '__main__':
    # 티베로,포스트그레스 db접속
    # Connection() 한개로 생성하면 db에 한개의 커넥션면 연결이 되어서 하나씩 순차적으로 실행된다.
    # 여러개의 테이블을 동시에 처리할려면 해당갯수만큼 DB Connection() 을 생성해서 처리하면 멀티로 처리됨
    db_conn = Connection()
    #db_conn2 = Connection()
    #print(f'db_conn : {db_conn}')
    scheduler = BackgroundScheduler()

    # 여러개의 테이블을 동기화 할때 필요한 스케줄을 초단위로 시간을 설정하여 스케줄에 등록한다.
    # Connection()객체 한개로 하면 먼저 실행한건이 끝날때가지 나중에 실행하는건은 스케줄 큐에 대기 하였다가 앞에것이 끝나야 처리됨 다중처리 필요시 각각의
    # 처리건만큼 db커넥션을 생성해야 다중처리 가능
    scheduler.add_job(exec_sync, 'interval', seconds=30,
                      args=[db_conn, 'AGS_PRDR_GRP'])
    scheduler.add_job(exec_sync, 'interval', seconds=30,
                      args=[db_conn, 'AGS_JDGR_ORGZ'])
    scheduler.start()

    try:
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

 

 

728x90
반응형

'프로그램 > 파이쎤' 카테고리의 다른 글

airflow 설치  (0) 2023.05.10
[파이쎤] 이기종 db 마이그레이션  (0) 2022.06.21

댓글



"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다."

loading