ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Batch]
    Springboot 2020. 6. 15. 15:58
    • 대용량데이터처리 최적화
    • 로깅,통계,트랜잭션관리 지원
    • 자동화
    • 예외상황 방어기능
    • 스케줄링지원x -> 쿼츠 프레임워크

    =================================================
    JobLuncher : 배치실행. JobExecution 리턴

    Job : 배치 단위. 일감 1:M(Step)
    Step : 배치 내 작업 단위. 1:1

    • ItemReader : Step에서 쓸 데이터 읽음
    • ItemProcessor<I,O> : Step에서 쓸 데이터 가공
    • ItemWriter : 변환 데이터 저장

    =================================================

    [JobRepository]

    • JobInstance,JobExecution 생성 조회
    • StepExecution 추가 수정 삭제

    [JobLuncher].run(Job, JobParameters)

    [JobParameters]*

    • Job lunch시 필요한 파라미터. Map형태
    • String, Long, Date, Double

    new JobParametersBuilder()
    .addDate("파람K", Date)
    .toJobParameters()

    @Value("#{jobParameters[파람K]}") Date now


    [JobBuilderFactory].get("Job이름")

    • JobRepository 셋팅 후 JobBuilder 리턴

    [JobBuilder]

    • .listener(JobListner)
    • .preventRestart() : 잡 중복실행방지(잡 실패해도)
    • .start(Step) : 해당 Step 먼저 실행
    • .start(Flow).end() : 해당 Flow 실행
    • .build() : Job 리턴

    [JobInstance] - Job 실행객체, 실행시 생성
    [JobExecution] - Job 실행결과 객체

    • Job 수행실패시 손실x 재실행o (즉, 여러결과O)

    [StepBuilderFactory].get("Step이름")

    [StepBuilder]

    • .<I, O> chunk(Int) : read write시 타입,사이즈지정
    • .reader(R)
    • .processor(P)
    • .writer(W)
    • .listener(StepListner | ChunkListner )
    • .tasklet(Tasklet)
    • .taskExecutor(taskExecutor) : 멀티스레드 실행
    • .throttleLimit(*4) : 동시사용 스레드 수
    • .build() : Step 리턴

    [StepExecution] - 배치작업 결과 객체

    ==================================================
    [ItemReader]

    • T객체 1개씩 DB select 요청해 비효율적. 수정必 *
    • 모든 데이터를 메모리에 올려 대용량에 부적절
    • T read() : 읽고 제거

    ** 구현체
    [ListItemReader] - 모두메모리에저장
    [JdbcPagingItemReader] - Paging 사용
    [JpaPagingItemReader] - Paging

    • setQueryString("Str")
    • setParameterValues(Map)
    • setEntityManagerFactory : 트잭관리 매니저지정
    • setPageSize(1000) : 크기만큼 읽어옴
    • .saveState(*true) : 실패지점 기록.재실행

    [JpaPagingItemReaderBuilder] - offset 0 지정불가


    [ItemProcessor<I,O>]

    • O process(I item)

    private ItemProcessor<I, O> processor() {
    return O::setInactive();
    return O:: I가 실행할 메소드;
    }


    [ItemWriter]

    • write(List<? extends T> items)
    • 청크(트랜잭션 커밋된 수)단위로 작업처리함
    • [JpaItemWriter] - JPA 쓸 때 사용*

    ==================================================
    [Tasklet] - 비청크지향. 단일작업 처리

    • Tasklet 구현한 클래스 빈등록 (@Component)

    • Step 빈에 등록(@JobScope 없어야 주입가능)

    • execute(StepContribution, ChunkContext)

    • 작업 계속시 RepeatStatus.CONTINUABLE 리턴

    • 작업 종료시 RepeatStatus.FINISHED 리턴

    ==================================================

    ==================================================
    @EnableBatchProcessing

    • 스프링 배치 스타터에 정의된 설정을 실행시킴!
    • Job/StepBuilder,Repository,Luncher.자동주입
    • 메인 클래스에 추가

    :: Scope

    • Spring Bean은 기본 싱글톤이다
    • xxScope 지정시 xx실행시 빈이 생성. 종료시 삭제
    • 생성할 때 JobParameters 주입 가능

    @JobScope

    • Step 선언문에 사용. Job사용시 Step 빈 생성

    @StepScope

    • Tasklet, ItemReader, ItemWriter, ItemProcessor

    • Step 실행시에 Bean 생성.

    • 프록시 모드를 반환하는 클래스를 참조함
      반드시 구현된 반환타입으로 지정해야함

    • ItemReader x . XXXItemReader o

    ==================================================

    ==================================================
    :: Listener - Job, Step, 청크단위 전후 처리 제어

    • 구현 or @ 메소드 작성 후 빈등록 (@Component)
    • Job[Step[Chunk-Chunk]Step]Job

    [JobExecutionListener] -- 구현해 사용
    [StepExecutionListener]
    [ItemReaderListner]
    [ItemProcessListner]
    [ItemWriterListner]
    [ChunkListner]
    [SkipListner] -- 처리중 skip 발생시

    @BeforeXXX
    @AfterXXX

    ==================================================
    :: Flow - Step 복수실행. 실행 조건 설정.

    • JobExecutionDecider 구현 클래스 : 조건 작성
    • FlowBuilder로 Flow 빈 등록
    • Job <- .start(Flow)..end().build();

    [FlowBuilder] ** Factory 없음 new 생성자 이용

    • FlowBuilder f = new FlowBuilder<>("이름")
    • .start(JobExecutionDecider)
    • .on(FES.FAILED.getName()).end()
    • .on(FES.COMPLETED.getName()).to(Step).end()
    • ,from(Flow).end()

    [JobExecutionDecider]

    • .decide(JobExecution, StepExecution)
    • FlowExecutionStatus 리턴
    • (COMPLETED, FAILED, STOPED,UNKOWN)

    ** Job-Flow-Step

    ==================================================
    :: 멀티스레드


    [TaskExecutor] ** 스레드:청크 1:1로 실행

    • Task실행 (각 스레드 독립 실행하도록 작업 할당)
    • Reader , Writer가 thread-safe 여야함 (JavaDoc참고)
    • 실패시 실패지점 재실행x. 전 재검사o (전체성공보장x)
      Reader 생성시 saveState(fasle)로 저장
    • 구현체 @Bean 등록 후 Step에 추가

    [SimpleAsyncTaskExecutor]

    • 청크요청마다 스레드 생성. limit 넘으면 요청을 막음
    • new SimpleAsyncTaskExecutor("스레드 이름")
    • Task이름(Thread)은 뒤에 1,2,3 .. 붙는다

    [ThreadPoolTaskExecutor]

    • 지정된 개수의 스레드를 재사용하며 실행

    multiFlow

    • Flow 안에 Flow들을 넣어 작업 수행
      • 기간별로 나눠서 각 스레드별 처리

    파티셔닝

    • 파티셔너로 구분한 각 Step에 스레드 할당해 멀티작업
    • 슬레이브 : 각 Step. 병렬작업 처리
    • 마스터 : 합쳐진 Step. 모든 Step완료 후 실행
    • 마스터 - 1:N - 슬레이브 (둘 다 Step이다)

    [SimplePartitioner] -- 파티셔너구현체
    [MultiResourcePartitioner]

    ==================================================

    ==================================================

    batch:
    job:
    enabled: false #어플리케이션구동시자동실행x

    Paging

    • offset 0 limit 10 를 이용
    • 페이지단위로 나눠 데이터를 읽기-처리-저장 반복
    • 조회 누락* 막는 페이지 인덱스 0 으로 고정

    new JpaPagingItemReader() {
    @Override
    public int getPage() {return 0;}
    };

    EntityManager

    • 영속성(Entity 영구저장환경) 관리를 위해 지정

    조회누락

    • 첫조회. offset 0 limit 10
    • 1p (110) - 2p(1120) -- 1p 조회(변경저장)
    • 재조회. offset 10 limit 10
    • 1p(1120) - 2p(2130) -- 2p 조회. 1p 누락!

    청크지향 프로세싱

    • 청크 단위로 작업처리 <-> 테스크킷단위
    • 에러 발생시 전체롤백 아닌 해당 청크 롤백

    테스트 빌드

    @AutoConfigureTestDatabase
    (connection = EmbeddedDatabaseConnection.H2)
    JobExecution= JobLauncherTestUtils.launchJob();

    @Bean - Job (JobFatory, Listner, Step)
    @Barn - Step (StepFactory)
    필요에 따라 item 처리 빈 생성

    class QReader implements ItemReader {

      private Queue<T> queue;
    
      public QReader(List<T> data) {
            this.queue = new LinkedList<>(data);
      }
    
      @Override
      public T read() throws ...{
          return this.queue.poll();
      }

    }

    return ItemProcessor<User, AfterUser>

    ::: return new ItemProcessor<User, AfterUser>(){
    @Override
    public User process(User item) throws E {
    return item.setInactive();
    }
    };
    ::: return item -> {
    item.setInactive();
    return item;
    };

    ::: return AfterUser::setInactive;

    'Springboot' 카테고리의 다른 글

    [Lombok] 테스트에서 롬복쓰기  (0) 2020.07.27
    [H2]  (0) 2019.12.20
    [REST]  (0) 2019.12.06
    curl Unexpected character (''' (code 39))  (0) 2019.12.06
    [Data] gradle+mysql+jpa  (0) 2019.12.05
Designed by Tistory.