본문 바로가기
프로그램/스프링

스프링 대용량 멀티 배치_스케줄

by cbwstar 2022. 8. 16.
728x90
반응형

여러개 배치를 실행할때 스케줄을 여러개 등록하기 위해서는 job 객체를 이름을 지정해서 생성하여 Qualifier로 객체를 주입하여 스케줄을 여러개 생성해서 기동한다.

스케줄 시간설정은 환경설정 파일을 이용하여 셋팅

Scheduler.java

package kr.go.naq.batch.schedulers;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@EnableScheduling // 스케쥴러 기능 활성화
@RequiredArgsConstructor
@Component
public class Scheduler {

   /* job 1 주입 */
    @Autowired
    @Qualifier("addressJob")
    private final Job job;

   /* job 2 주입 */
    @Autowired
    @Qualifier("addressTestJob")
    private final Job job2;

    private final JobLauncher jobLauncher;
    /* 초(0-59),분(0-59),시간(0-23),일(1-31),월(1-12),요일(0-7) 0과7은 일요일이며,1부터 월요일이고 6일 토요일 */
    //@Scheduled(cron = "0 0/1 * * * *")   //1분마다 실행
    //@Scheduled(cron = "0 40 17 * * *")
    //@Scheduled(fixedDelay = 30000)
    @Scheduled(fixedDelayString = "${fixed.rate.string}")
    public void startJob() {
        try {
            Map<String, JobParameter> jobParametersMap = new HashMap<>();

            log.info("시작");
            log.info("job=======> : " + job);

            log.info("${logging.level.root}");

            SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date time = new Date();

            String time1 = format1.format(time);

            jobParametersMap.put("requestDate", new JobParameter(time1));

            JobParameters parameters = new JobParameters(jobParametersMap);

            JobExecution jobExecution = jobLauncher.run(job, parameters);

            while (jobExecution.isRunning()) {
                log.info("isRunning...");
            }

        } catch (JobExecutionAlreadyRunningException e) {
            e.printStackTrace();
        } catch (JobRestartException e) {
            e.printStackTrace();
        } catch (JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        } catch (JobParametersInvalidException e) {
            e.printStackTrace();
        }
    }


    @Scheduled(fixedDelay = 40000)
    public void startJob2() {
        try {
            Map<String, JobParameter> jobParametersMap = new HashMap<>();

            log.info("시작2");
            log.info("job2=======> : " + job2);

            SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date time = new Date();

            String time1 = format1.format(time);

            jobParametersMap.put("requestDate", new JobParameter(time1));

            JobParameters parameters = new JobParameters(jobParametersMap);

            JobExecution jobExecution = jobLauncher.run(job2, parameters);

            while (jobExecution.isRunning()) {
                log.info("isRunning...");
            }

        } catch (JobExecutionAlreadyRunningException e) {
            e.printStackTrace();
        } catch (JobRestartException e) {
            e.printStackTrace();
        } catch (JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        } catch (JobParametersInvalidException e) {
            e.printStackTrace();
        }
    }
}

BatchApplication.java

package kr.go.naq.batch;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableBatchProcessing  // 배치 기능 활성화
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }

}

application.yaml

spring:
  batch:
    job:
      enabled: false
  #  jdbc:
 #     initialize-schema: always
# batch-core script sql을 실행
# batch-core package 안에 초기화 sql 문들이 들어있다.
# 기본적으로 메모리 DB들은 자동적으로 실행될 때 해당 스크립트들을 실행하고 다른 종류에 데이터베이스들은 위와 같은 설정으로 script sql을 실행시킬 수 있다.
  datasource:
    driver-class-name: org.postgresql.Driver
    url: jdbc:postgresql://192.168.x.xx:5432/xxxx
    username: batch
    password: batch2022
#    schema: classpath:/org/springframework/batch/core/schema-postgresql.sql # 스키마 위치 설정

logging:
  level:
    root: info

fixed:
  rate:
    string: 30000

BatchConfig.java

package kr.go.naq.batch.config;

import kr.go.naq.batch.model.NcomCoMstr;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaItemWriterBuilder;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.net.ssl.HttpsURLConnection;
import javax.persistence.EntityManagerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class BatchConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    // 주소를 위도,경도 변환하는 Job 생성
    @Bean(name = "addressJob")
    @Primary
    public Job addressJob() throws Exception {
        log.info("addressJob==========>");
        return jobBuilderFactory.get("addressJob")
                .start(addressStep()).build();
    }

    // 테스트 주소를 위도,경도 변환하는 Job 생성
    @Bean(name = "addressTestJob")
    public Job addressTestJob() throws Exception {
        log.info("addressTestJob==========>");
        return jobBuilderFactory.get("addressTestJob")
                .start(addressStep2()).build();
    }

    // addressStep 생성
    @Bean
    @JobScope
    public Step addressStep() throws Exception {
        log.info(("addressStep=====>"));
        return stepBuilderFactory.get("addressStep")
                .<CoMstr, CoMstr>chunk(1000)
                .reader(reader(null))
                .processor(processor(null))
                .writer(writer(null))
                .build();
    }


    // addressStep2 생성
    @Bean
    @JobScope
    public Step addressStep2() throws Exception {
        log.info(("addressStep2=====>"));
        return stepBuilderFactory.get("addressStep")
                .<NcomCoMstr, NcomCoMstr>chunk(1000)
                .reader(reader(null))
                .processor(processor(null))
                .writer(writer(null))
                .build();
    }


    @Bean
    @StepScope
    public JpaPagingItemReader<NcomCoMstr> reader(@Value("#{jobParameters[requestDate]}")  String requestDate) throws Exception {
        log.info("==> reader value : " + requestDate);

        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("adresCnvrYn", "N");

        return new JpaPagingItemReaderBuilder<NcomCoMstr>()
                .pageSize(1000)
                .parameterValues(parameterValues)
                .queryString("SELECT m FROM CoMstr m WHERE coalesce(m.adresCnvrYn,'N') = : adresCnvrYn and m.coCd in ('C003898159')")
            //    .queryString("SELECT m FROM CoMstr m WHERE coalesce(m.adresCnvrYn,'N') = : adresCnvrYn and addr1 is not null and trim(addr1) != ''")
                .entityManagerFactory(entityManagerFactory)
                .name("JpaPagingItemReader")
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<CoMstr, CoMstr> processor(@Value("#{jobParameters[requestDate]}")  String requestDate){
        return new ItemProcessor<CoMstr, CoMstr>() {
            @Override
            public CoMstr process(CoMstr CoMstr) throws Exception {

              log.info("==> processor CoMstr : " + CoMstr);
               log.info("==> processor value : " + requestDate);

                /* 도로명 주소 네이버 api이용하여 위도,경도 변환 */

                String addr = URLEncoder.encode(ncomCoMstr.getAddr1(),"utf-8");

              //  String addr = "전라남도 여수시 무선중앙로 78(선원동)";
                String api = "https://naveropenapi.apigw.ntruss.com/map-geocode/v2/geocode?query="+addr;
                StringBuffer sb = new StringBuffer();

                URL url = new URL(api);
                HttpsURLConnection http = (HttpsURLConnection)url.openConnection();
                int TIMEOUT_VALUE = 3000;   // 3초
                try {

                    http.setRequestProperty("Content-Type", "application/json");
                    http.setRequestProperty("X-NCP-APIGW-API-KEY-ID", "xxxx");
                    http.setRequestProperty("X-NCP-APIGW-API-KEY", "xxxxxxxxx");
                    http.setConnectTimeout(TIMEOUT_VALUE);
                    http.setReadTimeout(TIMEOUT_VALUE);
                    http.setRequestMethod("GET");
                    http.connect();

                    InputStreamReader in = new InputStreamReader(http.getInputStream(),"utf-8");
                    BufferedReader br = new BufferedReader(in);

                    String line;
                    while ((line = br.readLine()) != null) {
                        sb.append(line).append("\n");
                    }

                    log.info("네이버 위경도 : " + sb);

                    JSONParser parser = new JSONParser();
                    JSONObject jsonObject;
                    JSONObject jsonObject2;
                    JSONArray jsonArray;
                    String x = "";
                    String y = "";

                    jsonObject = (JSONObject)parser.parse(sb.toString());


                    String status = (String)jsonObject.get("status");

                    if (status.equals("OK")) {
                        jsonArray = (JSONArray) jsonObject.get("addresses");
                        for (int i = 0; i < jsonArray.size(); i++) {
                            jsonObject2 = (JSONObject) jsonArray.get(i);
                            if (null != jsonObject2.get("x")) {
                                x = (String) jsonObject2.get("x").toString();
                            }
                            if (null != jsonObject2.get("y")) {
                                y = (String) jsonObject2.get("y").toString();
                            }
                        }

                        if (!y.equals("")) {
                            CoMstr.setLa(Double.parseDouble(y));
                        }
                        if (!x.equals("")) {
                            CoMstr.setLo(Double.parseDouble(x));
                        }
                        CoMstr.setAdresCnvrYn("Y");
                    }

                    br.close();
                    in.close();
                    http.disconnect();
                 //   log.info("Latitude >> " + y + "Longitude >> " + x);

                } catch (IOException e) {
                    log.info("에러" + e.getMessage());
                    ncomCoMstr.setAdresCnvrYn("N");
                }
                return CoMstr;
            }
        };
    }

    @Bean
    @StepScope
    public JpaItemWriter<CoMstr> writer(@Value("#{jobParameters[requestDate]}")  String requestDate){
        log.info("==> writer value : " + requestDate);

        return new JpaItemWriterBuilder<CoMstr>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }




}

CoMstr.java

package kr.go.naq.batch.model;

import lombok.Getter;
import lombok.Setter;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Setter
@Getter
@Entity
@Table(name="co_mstr" , schema = "com")
public class CoMstr {
    @Id
    @Column(name = "co_cd")
    private String coCd;  //업체코드

    private String coNm;  //업체명
    private String addr1; //업체주소
    private Double la;      //위도
    private Double lo;      //경도
    private String adresCnvrYn;  //주소변환여부

    @Override
    public String toString() {
        return "CoMstr{" +
                "coCd='" + coCd + '\'' +
                ", coNm='" + coNm + '\'' +
                ", addr1='" + addr1 + '\'' +
                ", la=" + la +
                ", lo=" + lo +
                ", adresCnvrYn='" + adresCnvrYn + '\'' +
                '}';
    }
}
728x90
반응형

댓글



"이 포스팅은 쿠팡 파트너스 활동의 일환으로, 이에 따른 일정액의 수수료를 제공받습니다."

loading