-
[Batch]Springboot 2020. 6. 15. 15:58
- 대용량데이터처리 최적화
- 로깅,통계,트랜잭션관리 지원
- 자동화
- 예외상황 방어기능
- 스케줄링지원x -> 쿼츠 프레임워크
=================================================
JobLuncher : 배치실행. JobExecution 리턴Job : 배치 단위. 일감 1:M(Step)
Step : 배치 내 작업 단위. 1:1- ItemReader : Step에서 쓸 데이터 읽음
- ItemProcessor<I,O> : Step에서 쓸 데이터 가공
- ItemWriter : 변환 데이터 저장
=================================================
[JobRepository]
- JobInstance,JobExecution 생성 조회
- StepExecution 추가 수정 삭제
[JobLuncher].run(Job, JobParameters)
[JobParameters]*
- Job lunch시 필요한 파라미터. Map형태
- String, Long, Date, Double
new JobParametersBuilder()
.addDate("파람K", Date)
.toJobParameters()@Value("#{jobParameters[파람K]}") Date now
[JobBuilderFactory].get("Job이름")
- JobRepository 셋팅 후 JobBuilder 리턴
[JobBuilder]
- .listener(JobListner)
- .preventRestart() : 잡 중복실행방지(잡 실패해도)
- .start(Step) : 해당 Step 먼저 실행
- .start(Flow).end() : 해당 Flow 실행
- .build() : Job 리턴
[JobInstance] - Job 실행객체, 실행시 생성
[JobExecution] - Job 실행결과 객체- Job 수행실패시 손실x 재실행o (즉, 여러결과O)
[StepBuilderFactory].get("Step이름")
[StepBuilder]
- .<I, O> chunk(Int) : read write시 타입,사이즈지정
- .reader(R)
- .processor(P)
- .writer(W)
- .listener(StepListner | ChunkListner )
- .tasklet(Tasklet)
- .taskExecutor(taskExecutor) : 멀티스레드 실행
- .throttleLimit(*4) : 동시사용 스레드 수
- .build() : Step 리턴
[StepExecution] - 배치작업 결과 객체
==================================================
[ItemReader]- T객체 1개씩 DB select 요청해 비효율적. 수정必 *
- 모든 데이터를 메모리에 올려 대용량에 부적절
- T read() : 읽고 제거
** 구현체
[ListItemReader] - 모두메모리에저장
[JdbcPagingItemReader] - Paging 사용
[JpaPagingItemReader] - Paging- setQueryString("Str")
- setParameterValues(Map)
- setEntityManagerFactory : 트잭관리 매니저지정
- setPageSize(1000) : 크기만큼 읽어옴
- .saveState(*true) : 실패지점 기록.재실행
[JpaPagingItemReaderBuilder] - offset 0 지정불가
[ItemProcessor<I,O>]
- O process(I item)
private ItemProcessor<I, O> processor() {
return O::setInactive();
return O:: I가 실행할 메소드;
}
[ItemWriter]
- write(List<? extends T> items)
- 청크(트랜잭션 커밋된 수)단위로 작업처리함
- [JpaItemWriter] - JPA 쓸 때 사용*
==================================================
[Tasklet] - 비청크지향. 단일작업 처리Tasklet 구현한 클래스 빈등록 (@Component)
Step 빈에 등록(@JobScope 없어야 주입가능)
execute(StepContribution, ChunkContext)
작업 계속시 RepeatStatus.CONTINUABLE 리턴
작업 종료시 RepeatStatus.FINISHED 리턴
==================================================
==================================================
@EnableBatchProcessing- 스프링 배치 스타터에 정의된 설정을 실행시킴!
- Job/StepBuilder,Repository,Luncher.자동주입
- 메인 클래스에 추가
:: Scope
- Spring Bean은 기본 싱글톤이다
- xxScope 지정시 xx실행시 빈이 생성. 종료시 삭제
- 생성할 때 JobParameters 주입 가능
@JobScope
- Step 선언문에 사용. Job사용시 Step 빈 생성
@StepScope
Tasklet, ItemReader, ItemWriter, ItemProcessor
Step 실행시에 Bean 생성.
프록시 모드를 반환하는 클래스를 참조함
반드시 구현된 반환타입으로 지정해야함ItemReader x . XXXItemReader o
==================================================
==================================================
:: Listener - Job, Step, 청크단위 전후 처리 제어- 구현 or @ 메소드 작성 후 빈등록 (@Component)
- Job[Step[Chunk-Chunk]Step]Job
[JobExecutionListener] -- 구현해 사용
[StepExecutionListener]
[ItemReaderListner]
[ItemProcessListner]
[ItemWriterListner]
[ChunkListner]
[SkipListner] -- 처리중 skip 발생시@BeforeXXX
@AfterXXX==================================================
:: Flow - Step 복수실행. 실행 조건 설정.- JobExecutionDecider 구현 클래스 : 조건 작성
- FlowBuilder로 Flow 빈 등록
- Job <- .start(Flow)..end().build();
[FlowBuilder] ** Factory 없음 new 생성자 이용
- FlowBuilder f = new FlowBuilder<>("이름")
- .start(JobExecutionDecider)
- .on(FES.FAILED.getName()).end()
- .on(FES.COMPLETED.getName()).to(Step).end()
- ,from(Flow).end()
[JobExecutionDecider]
- .decide(JobExecution, StepExecution)
- FlowExecutionStatus 리턴
- (COMPLETED, FAILED, STOPED,UNKOWN)
** Job-Flow-Step
==================================================
:: 멀티스레드
[TaskExecutor] ** 스레드:청크 1:1로 실행
- Task실행 (각 스레드 독립 실행하도록 작업 할당)
- Reader , Writer가 thread-safe 여야함 (JavaDoc참고)
- 실패시 실패지점 재실행x. 전 재검사o (전체성공보장x)
Reader 생성시 saveState(fasle)로 저장 - 구현체 @Bean 등록 후 Step에 추가
[SimpleAsyncTaskExecutor]
- 청크요청마다 스레드 생성. limit 넘으면 요청을 막음
- new SimpleAsyncTaskExecutor("스레드 이름")
- Task이름(Thread)은 뒤에 1,2,3 .. 붙는다
[ThreadPoolTaskExecutor]
- 지정된 개수의 스레드를 재사용하며 실행
multiFlow
- Flow 안에 Flow들을 넣어 작업 수행
- 기간별로 나눠서 각 스레드별 처리
파티셔닝
- 파티셔너로 구분한 각 Step에 스레드 할당해 멀티작업
- 슬레이브 : 각 Step. 병렬작업 처리
- 마스터 : 합쳐진 Step. 모든 Step완료 후 실행
- 마스터 - 1:N - 슬레이브 (둘 다 Step이다)
[SimplePartitioner] -- 파티셔너구현체
[MultiResourcePartitioner]==================================================
==================================================
batch:
job:
enabled: false #어플리케이션구동시자동실행xPaging
- offset 0 limit 10 를 이용
- 페이지단위로 나눠 데이터를 읽기-처리-저장 반복
- 조회 누락* 막는 페이지 인덱스 0 으로 고정
new JpaPagingItemReader() {
@Override
public int getPage() {return 0;}
};EntityManager
- 영속성(Entity 영구저장환경) 관리를 위해 지정
조회누락
- 첫조회. offset 0 limit 10
- 1p (1
10) - 2p(1120) -- 1p 조회(변경저장) - 재조회. offset 10 limit 10
- 1p(11
20) - 2p(2130) -- 2p 조회. 1p 누락!
청크지향 프로세싱
- 청크 단위로 작업처리 <-> 테스크킷단위
- 에러 발생시 전체롤백 아닌 해당 청크 롤백
테스트 빌드
@AutoConfigureTestDatabase
(connection = EmbeddedDatabaseConnection.H2)
JobExecution= JobLauncherTestUtils.launchJob();@Bean - Job (JobFatory, Listner, Step)
@Barn - Step (StepFactory)
필요에 따라 item 처리 빈 생성class QReader implements ItemReader {
private Queue<T> queue; public QReader(List<T> data) { this.queue = new LinkedList<>(data); } @Override public T read() throws ...{ return this.queue.poll(); }
}
return ItemProcessor<User, AfterUser>
::: return new ItemProcessor<User, AfterUser>(){
@Override
public User process(User item) throws E {
return item.setInactive();
}
};
::: return item -> {
item.setInactive();
return item;
};::: return AfterUser::setInactive;
'Springboot' 카테고리의 다른 글
[Lombok] 테스트에서 롬복쓰기 (0) 2020.07.27 [H2] (0) 2019.12.20 [REST] (0) 2019.12.06 curl Unexpected character (''' (code 39)) (0) 2019.12.06 [Data] gradle+mysql+jpa (0) 2019.12.05