본문 바로가기
프로그램/파이쎤

[파이쎤] 이기종 db 마이그레이션

by cbwstar 2022. 6. 21.
728x90
반응형

대용량 데이터를 이관할때 파이쎤을 이용한 벌크인서트와 읽어 올때 chunksize 옵션 처리 기능을 이용해서 메모리 오버플로우를 처리 하였습니다.

메모리가 충분하면 한방에 읽어와서 처리하면 처리가 빠르지만 메모리 용량이 부족할 경우에는 나누어서 읽어서 처리를 해야 메모리 오버플로우가 발생하지 앖습니다.

 

파이썬 버전 : 3.8.8 로 테스트

파이썬을 설치후

패키지를 다운받아서 설치한다.

pip install contextlib

pip install sqlalchemy

pip install jaydebeapi

pip install pandas

pip install apscheduler

 

1. config.py

TIBERO_DB_CONFIG = {
    'driver': 'com.tmax.tibero.jdbc.TbDriver',
    'url': 'jdbc:tibero:thin:@192.100.1.10:829:tibero',
    'user': 'tiberotest',
    'password': 'test1234',
    'jdbcDriver': 'd:/python_schedule/jdbc/tibero6-jdbc.jar'
}

PG_DB_CONFIG = {
    'host': '192.100.1.45',
    'user': 'pgtest',
    'password': 'pgtest',
    'db': 'postgre',
    'port': '5432'
}

2.dbMigration.py

from contextlib import contextmanager
import config
from sqlalchemy import create_engine
import sqlalchemy.types as sql_types
import jaydebeapi as jp
import pandas as pd
import time
from apscheduler.schedulers.background import BackgroundScheduler

# db 티베로,postgresql 접속 설정


class Connection:
    def __init__(self):
        self.t_driver = config.TIBERO_DB_CONFIG['driver']
        self.t_url = config.TIBERO_DB_CONFIG['url']
        self.t_user = config.TIBERO_DB_CONFIG['user']
        self.t_password = config.TIBERO_DB_CONFIG['password']
        self.t_jdbc_driver = config.TIBERO_DB_CONFIG['jdbcDriver']
        self.p_host = config.PG_DB_CONFIG['host']
        self.p_user = config.PG_DB_CONFIG['user']
        self.p_password = config.PG_DB_CONFIG['password']
        self.p_db = config.PG_DB_CONFIG['db']
        self.p_port = config.PG_DB_CONFIG['port']
        self.pg_db = self.__pg_connect()
        self.tb_db = self.__tb_connect()

    def __pg_connect(self):
        url = 'postgresql://{}:{}@{}:{}/{}'.format(
            self.p_user, self.p_password, self.p_host, self.p_port, self.p_db)
        engine = create_engine(
            url, client_encoding='utf8', executemany_mode='batch')
        return engine

    def __tb_connect(self):
        print(self.t_jdbc_driver)
        conn = jp.connect(self.t_driver, self.t_url, [
                          self.t_user, self.t_password], jars=[self.t_jdbc_driver])
        conn.jconn.setAutoCommit(False)
        return conn


class TableSyncer:
    def __init__(self, db_conn, tbl_name):
        self._db_conn = db_conn
        self._tbl_name = tbl_name

    @property
    def db_conn(self):
        return self._db_conn

    @property
    def tbl_name(self):
        return self._tbl_name

    def sync_tbl(self):
        start_time = time.time()
        start_time2 = time.time()
        tb_engine = self.db_conn.tb_db  # 티베로 접속
        pg_engine = self.db_conn.pg_db  # postgresql 접속
        print(f'tb_engine: {tb_engine}')

        # 테이블이 존재하는지 체크
        table_qry = "SELECT COUNT(*) as cnt FROM pg_tables WHERE schemaname = 'public' AND tablename = '{}'".format(
            self.tbl_name.lower())

        print(f'table_qry:{table_qry}')

        df_exit = pd.read_sql_query(table_qry, pg_engine)
       # print(f'df_exit.iloc[0] {df_exit.iloc[0,0]}')
       # for index, row in df_exit.iterrows():
       #     print(f"cnt 값 : {row.cnt} ")

        # 기존에 이관된 데이타가 존재하면 삭제 처리후 이관처리
        del_qry = Query.SQL['del_TEMPLATE'].format(
            self.tbl_name)

        print(f'삭제할 쿼리 : {del_qry}')
        # 테이블이 존재하면 데이터 삭제
        if df_exit.iloc[0, 0] > 0:
            pg_engine.execute(del_qry)

        # 티베로에 데이터를 읽어온다.
        sel_qry = Query.SQL['select_TEMPLATE'].format(
            self.tbl_name)

        print(f'처리할테이블 : {self.tbl_name}')

        for df in pd.read_sql_query(sel_qry, tb_engine, chunksize=10000):
            # 컬럼헤더 재정의 (PostgreSQL에 맞게 소문자 처리)
            df.columns = df.columns.str.lower()
            # print(df.columns)

            print("to_sql read : %s seconds" % (time.time() - start_time))

            # postgresql에 인서트 한다.
            df.to_sql(name=self.tbl_name.lower(), schema='public', con=self.db_conn.pg_db, if_exists='append',
                      chunksize=2000, index=False, method='multi',
                      dtype={
                          'lgn_id': sql_types.VARCHAR(20),
                          'reg_tm': sql_types.TIMESTAMP,
                          "upd_tm": sql_types.TIMESTAMP,
                          "lat_login_dt": sql_types.TIMESTAMP,
                          "confirm_dt": sql_types.TIMESTAMP,
                          "cancel_dt": sql_types.TIMESTAMP,
                          "secsn_dt": sql_types.TIMESTAMP,
                          "info_agree_dt": sql_types.TIMESTAMP,
                          "cd_len": sql_types.NUMERIC,
                          "ara_order": sql_types.NUMERIC,
                          "latitude": sql_types.NUMERIC,
                          "longitude": sql_types.NUMERIC,
                          "reg_dttm": sql_types.TIMESTAMP,
                          "upd_dttm": sql_types.TIMESTAMP,
                          "hldy_dt": sql_types.TIMESTAMP
            })

            print("postgre_sql insert : %s seconds" %
                  (time.time() - start_time2))

# 이관 처리할 쿼리 정의


class Query(object):
    SQL = {
        # 삭제처리
        'del_TEMPLATE': """
        DELETE FROM public.{} 
        """,
        # 데이터 읽어오기 템플릿
        'select_TEMPLATE': """
            SELECT *
            FROM tibero.{}
      """
    }


def exec_sync(db_conn, tableNm):
    print(f'exec_sync db_conn : {db_conn}, tableNm : {tableNm}')
    db_util = TableSyncer(db_conn, tableNm)
    db_util.sync_tbl()


# 이관할 테이블 배열 정의..
tables = ['test_table', 'test_table2']

if __name__ == '__main__':
    # 티베로,포스트그레스 db접속
    db_conn = Connection()
    for index, tableNm in enumerate(tables):
        exec_sync(db_conn, tableNm)
728x90
반응형

댓글



"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다."

loading