Spring Batch 시리즈를 포스팅하고 있습니다. 기준이 되는 버전은 5.x 버전입니다.
1. Spring Batch 소개
2. Job과 Step의 구조 및 생성
3. Job과 Step의 실행 과정
4. Job의 흐름 제어 - Flow
5. FlowJob의 실행 과정
6. 청크 기반 프로세싱의 구조 및 생성
7. 청크 기반 프로세싱의 진행 과정
8. Flat File, JSON 형식 파일 읽기 및 쓰기
9. DB 데이터 읽기 및 쓰기
10. Step의 예외 제어하기(스킵, 재시도)
11. 배치 작업 확장하기 - 현재 포스팅
서버 개발자라면 스케일 업, 스케일 아웃이라는 용어를 들어봤을 것이다. 방식의 차이가 있지만, 공통적으로 시스템의 처리량과 성능을 향상하기 확장(스케일링) 방법론을 지칭하는 용어들이다. 배치 작업 역시 하나의 시스템이라고 본다면, 마찬가지로 데이터의 처리량과 처리 성능을 향상시키기 위해 확장을 고려해볼 수 있다.
이를 위해 Spring Batch에서는 멀티 프로세스, 멀티 스레드를 활용하여 확장할 수 있도록 Step을 구성하는 다양한 방법을 제공하고 있다.
- Multi-threaded Step (single-process)
- Parallel Steps (single-process)
- Remote Chunking of Step (multi-process)
- Partitioning a Step (single or multi-process)
공식 문서에서는 총 4가지 방법을 소개하고 있다.
이번 포스팅에서는 사실상 구성할 일이 정말 드문 멀티 프로세스로 확장하는 방식(Remote Chunking, Remote Partitioning)은 제외하고, 나머지 싱글 프로세스로 진행하지만 멀티 스레드를 사용하는 방법에 대해서만 다룰 것이다.
Multi-threaded Step
이 방식은 `Step`에 정의된 `Tasklet`을 멀티 스레드로 처리하는 방식이다. 따로 `Step`구현체가 있는 건 아니고, `Tasklet`의 반복 수행을 담당하는 `RepeatOperations`의 구현체 `TaskExecutorRepeatTemplate`을 사용하여 `Tasklet`을 멀티 스레드 환경에서 실행하는 방식이다.
기존의 `TaskletStep`의 구조를 다시 한번 상기시켜보자.
`TaskletStep`에서는 `Tasklet`을 실행시키도록 `RepeatCallback` 구현체를 만들어 이를 `RepeatOperations`의 `iterate` 메서드의 인자로 넘겨주었고, `RepeatOperations` 구현체인 `RepeatTemplate`은 `getNextResult`를 호출하여 `RepeatCallback` 구현을 반복 실행하는 방식으로 구성되어 있었다.
이 과정에서 멀티 스레드 방식으로 확장하기 위해서는 `RepeatOperations` 구현체를 `TaskExecutorRepeatTemplate`로 변경하면 된다는 것이다. `TaskExecutorRepeatTemplate`은 `RepeatCallback`을 실행 가능한 `Runnable` 타입으로 만들어, `TaskExecutor`를 통해 `Runnable`을 실행시키도록 `getNextResult`가 재정의 되어있는데, `TaskExecutor` 구현체를 멀티 스레드를 사용하여 `Runnable`을 실행시키는 구현체(대표적으로 `ThreadPoolTaskExecutor`)로 초기화 한다면, 멀티 스레드 방식으로 `Tasklet`을 실행시킬 수 있도록 확장이 가능해지는 것이다.
간단하게 흐름만 그림으로 표현하면 위와 같다.
한번 코드 레벨에서 간단하게 `TaskExecutorRepeatTemplate`를 살펴보자.
TaskExecutorRepeatTemplate
`TaskExecutorRepeatTemplate`은 `RepeatCallback`을 실행 가능한 `Runnable` 타입으로 만들어, `TaskExecutor`를 통해 이를 실행시킨다.
@Override
protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
throws Throwable {
ExecutingRunnable runnable;
ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
do {
runnable = new ExecutingRunnable(callback, context, queue);
runnable.expect();
taskExecutor.execute(runnable);
update(context);
}
while (queue.isEmpty() && !isComplete(context));
ResultHolder result = queue.take();
if (result.getError() != null) {
throw result.getError();
}
return result.getResult();
}
`TaskExecutorRepeatTemplate`에서 오버라이딩된 `getNextResult` 메서드를 살펴보면, 인자로 넘어온 `RepeatCallback`을 `ExecutingRunnable`로 만들어, `taskExecutor.execute`를 통해 실행시키고 있다.
즉, 원래 `RepeatTemplate`에서 `getNextResult`를 호출하게 되면 단순히 `RepeatCallback`의 `doInIteration`을 실행했었는데, `TaskExecutorRepeatTemplate`에서 이를 `TaskExecutor`를 사용하여 실행하도록 추가 구현이 되어있는 것이다.
`TaskExecutor`는 `Runnable` 인터페이스를 다양한 전략으로 실행시키는 역할의 정의된 인터페이스인데, 여기서는 스레드 풀을 사용하여 멀티 스레드로 작업을 수행하는 `ThreadPoolTaskExecutor`를 활용하여 멀티 스레드에서 `RepeatCallback`을 수행하도록 해볼 것이다.
Multi-Threaded Step 예제
ThreadPoolTaskExecutor 생성
@Bean
fun taskExecutor(): ThreadPoolTaskExecutor {
val taskExecutor = ThreadPoolTaskExecutor()
taskExecutor.corePoolSize = 5
taskExecutor.maxPoolSize = 20
taskExecutor.queueCapacity = 20
taskExecutor.setThreadNamePrefix("batch-thread-")
taskExecutor.initialize()
return taskExecutor
}
우선 `ThreadPool`을 사용하여 스레드를 할당하고 실행시키는 `ThreadPoolTaskExecutor` 인스턴스를 생성하여 스프링 빈으로 등록해주었다.
`ThreadPoolTaskExecutor`는 내부적으로 `Java`의 `ThreadPoolExecutor`에게 `Runnable`의 실행을 위임시킨다. 따라서 실제 `Runnable` 실행 전략은 `ThreadPoolExecutor`의 실행 전략을 사용한다고 보면 된다.
실행 전략을 이해하기 위해 `ThreadPoolExecutor`를 정말 간단하게만 살펴보도록 하자. `Java`의 멀티스레드 프로그래밍에 대해서 살펴보는 시간은 아니기 때문에, 더 자세한 내용은 직접 알아보길 바란다.
`corePoolSize`는 동시에 실행 가능한 스레드의 수를 의미한다.
`maxPoolSize`는 대기중인 작업이 저장되는 큐의 용량이 초과된 경우, 최대로 수행 가능한 스레드의 수를 의미한다.
`queueCapacity`는 `corePoolSize`만큼 이미 스레드가 실행중일 경우, 작업을 대기 상태로 넣는 큐의 용량을 의미한다.
예를 들어 `corePoolSize`, `queueCapacity`를 5로, `maxPoolSize`를 10으로 설정했고, 15개의 작업을 실행한다고 가정한다면, 처음에 `corePoolSize`만큼 5개의 작업이 스레드에 할당되어 실행되고, 이외 5개의 작업은 대기 상태로 큐에 들어가게 된다. 하지만 큐의 용량이 5임으로, 큐의 용량도 다 찼기 때문에 `maxPoolSize`만큼 스레드를 더 생성하여 작업을 수행하게 된다.
즉, `maxPoolSize`만큼 스레드를 생성하는 것은 대기 큐가 다 찬 이후라는 점을 기억하면 좋을 거 같다. 만약 간헐적으로 갑자기 많은 작업이 몰려드는 일의 경우라면, 큐의 용량을 작게 하고 `maxPoolSize`를 높게 조정하여 일시적으로 작업이 몰리더라도 병목 없이 작업이 진행될 수 있도록 구성할 수도 있을 것이다.
아무튼 상황에 맞게 `ThreadPoolTaskExecutor`를 위와 같이 생성하여 빈으로 등록해주면 된다.
TaskExecutorRepeatTemplate 생성
@Bean
fun repeatOperations(): TaskExecutorRepeatTemplate {
val repeatTemplate = TaskExecutorRepeatTemplate()
repeatTemplate.setTaskExecutor(taskExecutor())
return repeatTemplate
}
`TaskExecutorRepeatTemplate` 인스턴스를 생성한 후, 위에서 빈으로 등록한 `ThreadPoolTaskExecutor`를 `taskExecutor`에 초기화 시켜주면 된다.
`taskExecutor`를 초기화 시키지 않는다면, 기본 값으로는 `SyncTaskExecutor`가 `taskExecutor`에 초기화 되어있기 때문에 동기적으로 메인스레드만 사용해서 작업을 수행하게 된다.
Step 생성
/** 어느 쓰레드에서 작업된건지 확인하기 위해 만든 예제 엔티티 **/
@Entity
class SampleItem() {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
val id = 0
var readingThread: String? = null
var processingThread: String? = null
var writingThread: String? = null
}
/** 아이템을 읽는 쓰레드를 초기화 하기 위한 Listener **/
class ReadListener() : ItemReadListener<SampleItem> {
private val log = LoggerFactory.getLogger(ReadListener::class.java)
override fun afterRead(item: SampleItem) {
log.info("READ ITEM: ${item.id}")
item.readingThread = Thread.currentThread().name
Thread.sleep(50)
}
}
/** 아이템을 쓰는 쓰레드를 초기화 하기 위한 Listener **/
class WriteListener() : ItemWriteListener<SampleItem> {
private val log = LoggerFactory.getLogger(WriteListener::class.java)
override fun beforeWrite(items: Chunk<out SampleItem>) {
log.info("WRITE ITEM: ${items.joinToString(", ") { it.id.toString() }}")
items.forEach {
it.writingThread = Thread.currentThread().name
}
Thread.sleep(50)
}
}
/** Step 시작 전 엔티티를 초기화 하기 위한 Listener **/
class InitListener(
private val entityManagerFactory: EntityManagerFactory,
) : StepExecutionListener {
override fun beforeStep(stepExecution: StepExecution) {
val em = entityManagerFactory.createEntityManager()
em.transaction.begin()
repeat(500) {
val sampleItem = SampleItem()
em.persist(sampleItem)
}
em.transaction.commit()
em.clear()
em.close()
}
}
@Bean
fun step(): Step {
return StepBuilder("multi-step", jobRepository)
.chunk<SampleItem, SampleItem>(100, transactionManager)
.reader(jpaPagingItemReader())
.processor {
it.processingThread = Thread.currentThread().name
it
}
.writer(jpaItemWriter())
.stepOperations(repeatOperations()) // TaskExecutorRepeatTemplate 구현체로 설정
.listener(initListener())
.listener(writeListener())
.listener(readListener())
.build()
}
간단하게 설명하자면, 아이템을 읽고, 처리하고, 쓸 때 각각 어떤 스레드에서 처리했는지 파악하기 위해 `SampleItem`를 만들었고, 리스너와 프로세서에서는 각각 어느 쓰레드에서 처리했는지 값을 초기화 하도록 했다. 그리고 `Step`이 시작되기 전에, 500개의 `SampleItem` 엔티티를 영속하도록 리스너를 만들어 설정해두었다.
`Step`의 `stepOperations`에 위에서 빈으로 등록한 `TaskExecutorRepeatTemplate`을 초기화 하여, 멀티 스레드 방식으로 작업이 수행되도록 설정하면 된다.
한번 배치 작업을 실행시켜보고 결과를 살펴보자.
보면 아이템 별로 각기 다른 스레드에서 처리된 것을 볼 수 있고, 또한 하나의 아이템이 읽히고, 처리되고, 쓰이는 스레드는 모두 같은 것을 확인할 수 있다.
진행 과정과 주의점
위에서 `Step`을 만들어보고 실행까지 시켜보았는데, 여러 스레드에 걸쳐서 작업이 이루어지는 이유는 알겠는데, 읽고, 처리하고, 쓰는 스레드는 모두 같은 것이 조금 의문일 수도 있을 것이다. 한번 간단하게 다이어그램으로 왜 그런지 살펴보자.
`ThreadPoolTaskExecutor`가 `Runnable`을 실행하는 부분에서 다른 쓰레드가 사용되기 시작하는데, `Tasklet`을 실행시키는 `RepeatCallback` 구현은 `TaskletStep`에 구현되어 있고, 여기서 `Tasklet`은 `TaskletStep`의 멤버변수이기 때문에 메인 스레드의 힙 메모리 영역에 저장된 객체이다.
따라서 다른 쓰레드에서 `RepeatCallback`이 실행되더라도, 실제 `Tasklet`은 메인 스레드의 `ChunkOrientedTasklet` 객체의 `execute`를 호출하는 것이기 때문에, 그 하위의 `ChunkProvider`, `ChunkProcessor`, `ItemReader`, `ItemProcessor`, `ItemWriter` 모두 메인 스레드의 힙 메모리 영역에 할당되어 있는 객체를 사용한다.
이런 구조이기 때문에 특정 스레드에서 `ChunkOrientedTasklet`을 실행하게 되면, 그 이후의 과정들은 해당 스레드에서 모두 쌍으로 진행하게 되기 때문에 읽고, 처리하고, 작성하는 모든 과정은 `ChunkOrientedTasklet`을 실행시킨 그 스레드에서 진행이 된다.
그리고 가장 주의해야 할 점은 바로 멀티 스레드 프로그래밍에서 항상 나오는 동시성/동기화 문제이다.
지금 구조를 보면 모든 스레드가 하나의 `ChunkOrientedTasklet`을 공유하고 있기 때문에, 스택에 저장되는 지역 변수(`Chunk`는 지역변수로 선언되어 각 쓰레드의 스택 메모리에 할당된다)가 아닌 이상 내부 멤버 변수는 모든 스레드에서 공유한다. 따라서, `ItemReader`, `ItemProcessor`, `ItemWriter`는 스레드 안전하게 구현되어야 동시성/동기화 문제가 발생하지 않는다.
이전에 페이징 방식의 `ItemReader`와, 커서 방식의 `ItemReader`를 알아보았을 때 이 둘의 차이점 중 하나가 페이징 방식의 `ItemReader`는 스레드에 안전하다는 점이었다. `AbstractPagingItemReader`는 페이징과 관련된 멤버 변수를 모두 `volatile` 키워드로 선언하여 메인 스레드의 메모리에서 읽을 수 있게 설계해 놓았고, 읽는 시점에 `Lock`을 통해 잠금을 걸어 페이징 관련 멤버 변수를 변경할 때 동시성 문제가 발생하지 않도록, 쓰레드 안전하게 설계되어 있다.
반면 커서 기반의 `ItemReader`는 그 어떠한 처리도 안되어 있기 때문에, 동시에 여러 스레드가 내부 커서에 접근할 경우 아이템이 중복으로 읽혀 중복으로 처리된던지, 예외가 발생한다던지, 작업이 정상적으로 처리되지 않을 가능성이 높다.
결론적으로 멀티 스레드 방식으로 `Step`을 확장할 때는 언제나 관련 구현체들이 스레드 안전하게 설계되어 있는지 확인하고 구성해야한다는 점을 주의해야 한다.
Parallel Steps
이 방식은 `SplitState`를 사용하여 `Flow`를 여러 스레드에서 병렬로 실행하는 방법이다.
이전에 `Flow`에 대해서 다뤘을 때, 상태를 나타내는 `State`와, 상태 전이를 나타내는 `StateTransition`을 통해 `Step`의 흐름을 제어할 수 있다고 했었다. 간단하게 설명하자면, `State`의 `handle`을 실행시켜 `FlowExecutionStatus`를 반환받으면, 이에 따라 전이되어야 하는 상태를 `StateTransition`을 통해 찾아 다음 `State`를 실행시키는 구조였다.
따라서 `SplitState`의 `handle` 메서드를 한번 살펴보며, 어떻게 `Flow`를 병렬로 실행하는지, 병렬로 실행된 `Flow`의 상태를 어떻게 합치는지에 대해서 한번 살펴보자.
SplitState
@Override
public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception {
Collection<Future<FlowExecution>> tasks = new ArrayList<>();
for (final Flow flow : flows) {
final FutureTask<FlowExecution> task = new FutureTask<>(() -> flow.start(executor));
tasks.add(task);
try {
taskExecutor.execute(task);
}
catch (TaskRejectedException e) {
throw new FlowExecutionException("TaskExecutor rejected task for flow=" + flow.getName());
}
}
FlowExecutionStatus parentSplitStatus = parentSplit == null ? null : parentSplit.handle(executor);
Collection<FlowExecution> results = new ArrayList<>();
for (Future<FlowExecution> task : tasks) {
try {
results.add(task.get());
}
catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
}
else {
throw e;
}
}
}
FlowExecutionStatus flowExecutionStatus = doAggregation(results, executor);
if (parentSplitStatus != null) {
return Collections.max(Arrays.asList(flowExecutionStatus, parentSplitStatus));
}
return flowExecutionStatus;
}
protected FlowExecutionStatus doAggregation(Collection<FlowExecution> results, FlowExecutor executor) {
return aggregator.aggregate(results);
}
우선 `FutureTask`를 담는 리스트를 만든다. 이 `FutureTask`는 `Java`에서 제공하는 API로, `Runnable`과 `Future` 인터페이스를 다중 구현한 `RunnableFuture`의 구현체이다. 따라서 `run`을 통해 실행 가능한 클래스이며, 결과를 `Future` 타입으로 반환하도록 되어 있다.
`Future` 타입은 `get`을 통해서 결과가 완료될 때 까지 대기한 후 실제 결과를 얻을 수 있는데, 바로 여기서 받는 결과가 타입 파라미터의 인자로 들어가 있는 `FlowExecution`이다. 아무튼 추후에 다시 보기로 하고,
`Flow` 리스트인 `flows`를 루프를 돌며, `Flow`를 실행시키도록 `FutureTask` 객체를 생성한 후 리스트에 추가한다. 그리고 `TaskExecutor`를 통해 `FutureTask`를 실행하도록 해놓았다. 이러면 `TaskExecutor`의 전략에 따라 `Flow`를 실행하게 된다. 이번에도 역시 스레드 풀을 사용하여 멀티 스레드 환경에서 작업하는 `ThreadPoolTaskExecutor`를 사용할 것이다.
루프가 끝나면, `FutureTask`를 저장했던 리스트 `tasks`를 루프를 돌며 `get`을 호출한다. 이러면 `Flow`의 작업이 완료될 때 까지 대기했다가 결과를 받아오고, 모든 결과를 모아서 `doAggregation` 메서드를 호출하여 결과를 집계한다.
여기서 `aggregator`는 `FlowExecutionAggregator` 타입으로, `FlowExecution` 리스트를 인자로 받아 하나의 `FlowExecutionStatus`를 반환하는 역할을 수행한다. 기본 구현체로는 `MaxValueFlowExecutionAggregator` 하나만 있으며, 이 클래스는 리스트에 담긴 `FlowExecutionStatus` 요소 중 가장 높은 순위의 값을 반환한다.
Parallel Steps 예제
class LoggerTasklet(
private val prefix: String,
) : Tasklet {
private var index: Int = 0
private val log = LoggerFactory.getLogger(javaClass)
override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
return if (++index <= 3) {
Thread.sleep(100)
log.info("$prefix - $index")
RepeatStatus.CONTINUABLE
} else RepeatStatus.FINISHED
}
}
@Bean
fun flow1(): Flow {
return FlowBuilder<Flow>("flow1")
.start(step1())
.next(step2())
.end()
}
@Bean
fun flow2(): Flow {
return FlowBuilder<Flow>("flow2")
.start(step3())
.end()
}
@Bean
fun step1(): TaskletStep {
return StepBuilder("step1", jobRepository)
.tasklet(LoggerTasklet("step1"), transactionManager)
.build()
}
@Bean
fun step2(): TaskletStep {
return StepBuilder("step2", jobRepository)
.tasklet(LoggerTasklet("step2"), transactionManager)
.build()
}
@Bean
fun step3(): TaskletStep {
return StepBuilder("step3", jobRepository)
.tasklet(LoggerTasklet("step3"), transactionManager)
.build()
}
간단하게 로그를 3번까지 남기는 `Tasklet`과, 이를 실행시키는 `Step`을 3개 만들었다. 그리고 `Flow`는 `step1`을 실행한 후 `step2`를 실행하는 `flow1`과, `step3`을 실행하는 `flow2`를 만들었다.
이렇게 구성한 `flow1`과 `flow2`를 병렬로 실행할 수 있도록 구성해보자.
@Bean
fun job(): Job {
return JobBuilder("flowJob", jobRepository)
.incrementer(RunIdIncrementer())
.start(flow1())
.split(taskExecutor()).add(flow2())
.end()
.build()
}
@Bean
fun taskExecutor(): ThreadPoolTaskExecutor {
val taskExecutor = ThreadPoolTaskExecutor()
taskExecutor.corePoolSize = 5
taskExecutor.maxPoolSize = 10
taskExecutor.queueCapacity = 20
taskExecutor.setThreadNamePrefix("batch-thread-")
taskExecutor.initialize()
return taskExecutor
}
`FlowJob`을 만들 때 처럼 `Job`을 만들어주면 된다. 병렬로 실행할 `Flow`의 경우, 위처럼 `split`을 호출하여 `TaskExecutor` 객체를 인자로 넣어준 다음, `add` 메서드에 병렬로 실행할 `Flow`를 넣어주면 된다.
한번 로그를 살펴보자.
로그를 보면, `flow1`과 `flow2`가 서로 다른 스레드를 사용하여 동시에 실행된 것을 볼 수 있다. 그리고 `flow1`의 경우, `step1`이 끝나면 `step2`를 실행하도록 구성해놨기 때문에 `step1`이 끝나자 `step2`가 실행된 것을 볼 수 있다.
진행과정과 주의점
병렬 처리 방식은 쓰레드가 분리되는 지점이 `SplitState`에서 `TaskExecutor`를 통해 `Flow`를 실행시키는 시점이다. 따라서 동일한 `Flow` 객체를 병렬 실행하는 것이 아닌 이상 각기 다른 메모리 영역에서 실행되기 때문에 동시성 문제가 발생하진 않는다.
하지만 `Step`, `Tasklet` 등이 스프링 빈으로 사용하는 등 싱글톤 객체로 메인 스레드 메모리 영역에 할당된 경우, 여러 스레드에서 동시에 공유 자원에 접근할 수 있기 때문에 이를 주의해야 한다.
같은 `S`tep 객체의 `doExecute`를 실행한다고 하더라도, 마치 `ChunkOrientedTasklet`의 `Chunk` 처럼, `StepExecution` 자체는 지역변수로 선언되어 인자로 전해지기 때문에 `StepExecution`에서 동시성 문제가 발생하진 않는다. 하지만, 이 외에 `Tasklet`의 멤버 변수에 접근하는 등의 공유 자원에 여러 `Flow`가 접근한다면, 당연히 동시성 문제는 발생한다.
한번 문제가 되는 상황을 예시로 만들어보자.
class IncrementTasklet() : Tasklet {
private val log = LoggerFactory.getLogger(javaClass)
private var num = 0
override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
return if (++num <= 6) {
Thread.sleep(100)
log.info("$num")
RepeatStatus.CONTINUABLE
} else RepeatStatus.FINISHED
}
}
@Bean
fun incrementTasklet(): Tasklet {
return IncrementTasklet()
}
@Bean
fun step1(): TaskletStep {
return StepBuilder("step1", jobRepository)
.tasklet(incrementTasklet(), transactionManager)
.listener(CustomStepListener())
.build()
}
@Bean
fun flow1(): Flow {
return FlowBuilder<Flow>("flow1")
.start(step1())
.end()
}
@Bean
fun flow2(): Flow {
return FlowBuilder<Flow>("flow2")
.start(step1())
.end()
}
@Bean
fun job(): Job {
return JobBuilder("flowJob", jobRepository)
.incrementer(RunIdIncrementer())
.start(flow1())
.split(taskExecutor()).add(flow2())
.end()
.build()
}
실행될 때마다 `num`의 값을 1씩 늘려, 6이 되면 중단하는 `IncrementTasklet`을 만들고 이를 스프링 빈으로 등록하여 싱글톤 객체를 만들었다. 그리고 `step1`은 이 `Tasklet`을 실행시키도록 했다. 그리고 `Flow` 두개를 만들어, 두 `Flow` 모두 `step1`을 실행시키도록 했다. 그리고 두 `Flow`를 병렬로 실행시키도록 `Job`을 구성했다.
이러면 코드만 봤을 때 `flow1`에서 총 6번의 로그가 찍히고, 동시에 `flow2`에서 6번의 로그가 찍힐거라고 예상할 수도 있고, 싱글톤 객체라는 점을 인지하고 좀 더 생각해본다면 `flow1`과 `flow2`가 번갈아가면서 1~6 까지 로그를 찍을거라고 예상할 수도 있는데, 이는 맞지 않다.
`num` 멤버 변수에 아무런 처리도 안되어 있기 때문에, 쓰레드가 100ms동안 대기하는 동안 두 `Flow`가 동시에 `IncrementTasklet` 객체에 접근하여 `num`의 값을 올린 후 로그를 남기기 때문에 1이 아니라 2부터 로그가 남게 되고, 이후에도 값이 2씩 증가하여 로그가 찍히는 것을 볼 수 있다. 이는 당연히 `Tasklet` 객체가 메인 스레드 힙 메모리 영역에 할당되어 있고, 동일한 멤버변수에 접근하기 때문이다.
class IncrementTasklet() : Tasklet {
private val log = LoggerFactory.getLogger(javaClass)
private var num = 0
private val lock = ReentrantLock()
override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
lock.lock()
return try {
if (++num <= 6) {
Thread.sleep(100)
log.info("$num")
RepeatStatus.CONTINUABLE
} else RepeatStatus.FINISHED
} finally {
lock.unlock()
}
}
}
`Java`에서 제공하는 `ReentrantLock`을 사용해서 다른 스레드가 사용 중일때 잠그고, 작업이 끝난 경우 잠금을 해제하도록 해보자.
그럼 위와 같이 스레드 안전하게 작업이 이루어진 것을 볼 수 있다.
Partitioning Step
이 방식은 동일한 `Step`을 여러 개로 복제(분리)하여 실행시키는 방식이며, `PartitionStep`을 통해 이 방식으로 확장할 수 있다.
Master-Slave 패턴으로 구성되어 있다고 보면 된다. `PartitionStep`이 `Step`을 분리시키는 Master 역할을 수행하고, 내부의 `Step`은 여러 개의 Slave로 분리되는 방식이다.
Multi-Threaded 방식과 헷갈릴 수 있는데, 주요한 차이점은 Multi-Threaded는 `Step` 내의 `Tasklet`을 여러 스레드에서 동시에 실행하는 방식이고, `Partitioning` 방식은 `Step` 자체를 복제해 여러 스레드에서 실행시킨다는 점이 다르다.
PartitionStep
public class PartitionStep extends AbstractStep {
private StepExecutionSplitter stepExecutionSplitter;
private PartitionHandler partitionHandler;
private StepExecutionAggregator stepExecutionAggregator = new DefaultStepExecutionAggregator();
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
stepExecution.upgradeStatus(BatchStatus.COMPLETED);
stepExecutionAggregator.aggregate(stepExecution, executions);
if (stepExecution.getStatus().isUnsuccessful()) {
throw new JobExecutionException("Partition handler returned an unsuccessful step");
}
}
}
`Step`의 실질적인 시작점인 `doExecute` 메서드가 재정의 되어있는데, `PartitionHandler`를 통해 Slave Step들을 다룬 후, `StepExecutionAggregator`를 통해 결과를 집계하는 구조로 구현되어 있다.
PartitionHandler
public abstract class AbstractPartitionHandler implements PartitionHandler {
protected int gridSize = 1;
protected abstract Set<StepExecution> doHandle(StepExecution managerStepExecution,
Set<StepExecution> partitionStepExecutions) throws Exception;
@Override
public Collection<StepExecution> handle(final StepExecutionSplitter stepSplitter,
final StepExecution managerStepExecution) throws Exception {
final Set<StepExecution> stepExecutions = stepSplitter.split(managerStepExecution, gridSize);
return doHandle(managerStepExecution, stepExecutions);
}
}
Spring Batch의 기본 도메인에서 다뤘었는데, `StepExecution`은 `Step`의 실행을 나타낸다고 했었다. `StepExecution`에는 한 번의 `Step` 실행에 대한 정보들이 담겨있고, `SimpleStepHandler`로부터 `StepExecution`을 인자로 받아 `Step`을 실행시킬 수 있는 구조였다. (`Step`의 `execute` 함수는 인자로 `StepExecution`이 필요하다.)
즉, 간단하게 요약하면 `Step`을 실행하기 위해서는 `StepExecution`이 필요하다. `Partitioning` 방식은 동일 `Step`을 여러개로 분리하여 실행시키기 때문에, 분리되는 만큼 `StepExecution`이 필요하다. 따라서 `PartitionHandler`에서 `PartitionStep`로 부터 받은 `StepExecutionSplitter`로 `gridSize` 만큼 `StepExecution`을 분리한다.
분리된 `StepExecution`은 `doHandle` 메서드를 통해 하위 클래스에서 복제 대상 `Step`을 실행시키게 된다.
`AbstractPartitionHandler`를 상속받는 하위 클래스는 `MessageChannelPartitionHandler`, `TaskExecutorPartitionHandler` 두 개가 있는데, 이번 포스팅에서는 멀티 프로세스 방식의 확장은 다루지 않는다고 했기 때문에 `TaskExecutorPartitionHandler`를 살펴볼 것이다.
참고로 멀티 프로세스 방식으로 확장하는 것은 미들웨어를 통해 다른 시스템과 통신하며 여러 개의 프로세스를 통해 진행하게 되는데, `MessageChannelPartitionHandler`가 이 역할을 수행한다. 메시지 채널을 미들웨어로 사용하여 리모트 시스템과 `Step`을 진행한다.
StepExecutionSplitter
`StepExecution`을 `gridSize`만큼 분리하는 역할을 수행한다. 기본 구현체로 `SImpleStepExecutionSplitter`만 있다.
`SImpleStepExecutionSplitter`는 내부 구현이 좀 복잡해보이긴 하는데, 그냥 간단하게 `Partitioner`를 통해 `StepExecution`을 분리한다고만 알아도 될 거 같다.
`Partitioner`는 특정 기준을 가지고 `StepExecution`을 분리하는 역할을 수행하고, 기본 구현체가 있긴 하지만 실제 개발자가 상황에 맞게 자주 구현해야 할 인터페이스이기도 하다.
Partitioner
@FunctionalInterface
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
`gridSize` 만큼 실제 `ExecutionContext`를 생성해서 유니크한 키를 포함하여 `Map`으로 반환하는 역할을 수행한다. 앞서 말했듯, 실제 요구사항에 맞게 개발자가 직접 구현해야하는 인터페이스이다.
TaskExecutorPartitionHandler
다시 `PartitionHandler`로 돌아와서, 이제 `StepExecution`이 `gridSize`만큼 분리되었을 때 다뤄지는 부분에 대해 살펴보자.
public class TaskExecutorPartitionHandler extends AbstractPartitionHandler implements StepHolder, InitializingBean {
private TaskExecutor taskExecutor = new SyncTaskExecutor();
private Step step;
@Override
protected Set<StepExecution> doHandle(StepExecution managerStepExecution,
Set<StepExecution> partitionStepExecutions) throws Exception {
final Set<Future<StepExecution>> tasks = new HashSet<>(getGridSize());
final Set<StepExecution> result = new HashSet<>();
for (final StepExecution stepExecution : partitionStepExecutions) {
final FutureTask<StepExecution> task = createTask(step, stepExecution);
try {
taskExecutor.execute(task);
tasks.add(task);
}
catch (TaskRejectedException e) {
ExitStatus exitStatus = ExitStatus.FAILED
.addExitDescription("TaskExecutor rejected the task for this step.");
stepExecution.setStatus(BatchStatus.FAILED);
stepExecution.setExitStatus(exitStatus);
result.add(stepExecution);
}
}
for (Future<StepExecution> task : tasks) {
result.add(task.get());
}
return result;
}
protected FutureTask<StepExecution> createTask(final Step step, final StepExecution stepExecution) {
return new FutureTask<>(() -> {
step.execute(stepExecution);
return stepExecution;
});
}
}
구조 자체는 `Parallel` 방식과 비슷하다. 분리된 `StepExecution` 리스트를 루프돌며, `FutureTask`를 만들어 `TaskExecutor`를 통해 실행시킨다. `FutureTask`는 `run`이 호출되면 `step.execute`를 호출하게 구현되어 있다. 그리고 마지막에 각 `FutureTask`의 `get`을 호출하여 결과를 반환받게 된다.
StepExecutionAggregator
다시 `PartitionStep`으로 넘어와서, `PartitionHandler`로 부터 실행된 Slave Step들의 `StepExecution`을 받아오면, Master Step에 모든 Slave Step의 상태를 집계하여 업데이트 하게 된다.
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
stepExecution.upgradeStatus(BatchStatus.COMPLETED);
stepExecutionAggregator.aggregate(stepExecution, executions);
if (stepExecution.getStatus().isUnsuccessful()) {
throw new JobExecutionException("Partition handler returned an unsuccessful step");
}
}
Parallel 방식에서 `Flow`의 실행 결과를 `FlowExecutionAggregator`가 집계했던 것 처럼, 여기서는 `StepExecutionAggregator`가 이 역할을 한다.
public class DefaultStepExecutionAggregator implements StepExecutionAggregator {
@Override
public void aggregate(StepExecution result, Collection<StepExecution> executions) {
Assert.notNull(result, "To aggregate into a result it must be non-null.");
if (executions == null) {
return;
}
for (StepExecution stepExecution : executions) {
BatchStatus status = stepExecution.getStatus();
result.setStatus(BatchStatus.max(result.getStatus(), status));
result.setExitStatus(result.getExitStatus().and(stepExecution.getExitStatus()));
result.setFilterCount(result.getFilterCount() + stepExecution.getFilterCount());
result.setProcessSkipCount(result.getProcessSkipCount() + stepExecution.getProcessSkipCount());
result.setCommitCount(result.getCommitCount() + stepExecution.getCommitCount());
result.setRollbackCount(result.getRollbackCount() + stepExecution.getRollbackCount());
result.setReadCount(result.getReadCount() + stepExecution.getReadCount());
result.setReadSkipCount(result.getReadSkipCount() + stepExecution.getReadSkipCount());
result.setWriteCount(result.getWriteCount() + stepExecution.getWriteCount());
result.setWriteSkipCount(result.getWriteSkipCount() + stepExecution.getWriteSkipCount());
}
}
}
기본 구현으로는 `DefaultStepExecutionAggregator`가 있는데, 모든 Slave Step의 `StepExecution` 상태 값들을 Master Step의 `StepExecution`에 더하도록 구현되어 있다.
Partitioning 예제
`Partitioning`은 `PartitionStep`을 사용하기 때문에, `StepBuilder`를 통해 `PartitionStep`을 빌드하면 된다.
@Bean
fun masterStep(): Step {
return StepBuilder("master-step", jobRepository)
.partitioner("master-step", partitioner())
.step(slaveStep())
.gridSize(5)
.taskExecutor(taskExecutor())
.build()
}
위와 같이 `partitioner`를 통해 사용할 `Partitioner` 객체를 인자로 넣어주고, `step`을 통해 분리할 `Step`을, `gridSize`를 통해 몇 개로 분리할 지를, 그리고 분리된 `Step`을 실행시킬 `TaskExecutor`를 설정하고 빌드하면 된다.
그렇다면 이제 어떻게 분리할지 `Partitioner`를 한번 직접 구현해보자.
class RangePartitioner(
private val totalCount: Long,
) : Partitioner {
override fun partition(gridSize: Int): Map<String, ExecutionContext> {
val sizeToPartitioning = totalCount / gridSize
val contextMap = mutableMapOf<String, ExecutionContext>()
for (i in 0 until gridSize) {
val start = i * sizeToPartitioning + 1
val end = if (i == gridSize - 1) totalCount else (i + 1) * sizeToPartitioning
val context = ExecutionContext()
context.putLong("start", start)
context.putLong("end", end)
contextMap["partition-$i"] = context
}
return contextMap
}
}
멤버 `totalCount`를 `gridSize`만큼 나누어 처리할 수 있도록 `Partitioner`를 구현했다.
위에서 `StepExecutionSplitter`는 `Partitioner`를 통해 여러 개의 Slave Step을 실행시킬 수 있도록 분리된 `ExecutionContext`를 가져온다고 했었다. 그렇기 때문에 `ExecutionContext`를 `gridSize`만큼 만들어 `Map`에 넣어 반환해주면 된다.
`totalCount`가 1000이라면 1~200, 201~400, 401~600, 601~800, 801~1000 으로 나누어 처리할 수 있도록 `start`와 `end` 값을 `ExecutionContext`에 넣어 유일한 키를 가지고 `Map`에 넣어 반환한다. 이렇게 반환된 `ExecutionContext` 만큼, Slave Step이 분리되어 실행될 것이다.
@Bean
fun slaveStep(): Step {
return StepBuilder("slave-step", jobRepository)
.chunk<Int, Int>(10, transactionManager)
.reader(itemReader(null, null))
.writer{ chunk ->
log.info("WRITE: [${chunk.items.joinToString(", ")}]")
}
.build()
}
@Bean
@StepScope
fun itemReader(
@Value("#{stepExecutionContext['start']}") start: Int? = null,
@Value("#{stepExecutionContext['end']}") end: Int? = null,
): ItemReader<Int> {
return ListItemReader((start!!..end!!).toList())
}
@Bean
fun partitioner(): Partitioner {
return RangePartitioner(100)
}
그리고 위와 같이 분리될 `Step`을 빈으로 등록했다. 간단하게 로그를 찍는 `Step`이다.
`ItemReader`를 잘 보면, 이전에 `Partitioner`에서 `StepExecutionContext`에 넣어준 `start`와 `end`값을 빈 생성 시점에 인자로 받고 있다. 이를 받아 `ListItemReader`에서 반환하는 `List`의 시작과 끝으로 설정하였다.
참고로 `ItemReader`에서 `StepExecutionContext` 안의 값을 가져오기 위해서는, `@StepScope` 어노테이션을 붙여야 한다. 이는 빈의 스코프를 `Step` 스코프로 지정한다는 뜻이다.
Spring의 빈 스코프에 대해서 알고 있다면 이해하기 쉬운데, Spring 빈의 생명주기를 `Step`의 실행과 끝으로 두는 것이다. `Step`에서 해당 `ItemReader`를 사용할 시점에 빈으로 생성하고, `Step`이 끝나면 더 이상 관리하지 않게 된다. `ExecutionContext` 안의 `start`, `end` 값은 `Job`이 실행되어 `Partitioner`에서 넣어주기 전에는 가져올 수 없는 값이기 때문에, `StepScope`를 통해 빈의 생성 시점을 `Step`에서 호출하는 시점으로 변경해주어야만 `ExecutionContext`의 값을 가져올 수 있게 되는 것이다.
`RangePartitioner`의 `totalCount` 값을 100으로, 위에서 `PartitionStep`을 생성할 때 `gridSize`를 5로 주었기 때문에, 총 1~20, 21~40, 41~60, 61~80, 81~100 으로 분리되어 `Step`이 실행될 것이며, Slave Step의 `chunkSize`는 10이기 때문에 이 안에서도 10씩 청크 단위로 나누어 처리되게 될 것이다.
한번 로그를 살펴보자.
보면 main 스레드에서 master-step을 실행시킨 것을 볼 수 있고, 이후에는 batch-thread-1 부터 batch-thread-5 까지 각각 다른 스레드로 분리되어 아이템이 처리된 것을 볼 수 있다.
`chunkSize`를 10으로 설정했기 때문에, 아이템은 10개씩 쓰여지게 되며, batch-thread-1 에서 처리된 아이템을 보면 61~70 까지 한 청크, 71~80까지 한 청크, 총 2청크가 처리된 것을 볼 수 있다.
즉, 청크 프로세싱은 2번 진행되었으며, `gridSize`만큼 61~80까지는 batch-thread-1에 분리되어 실행된 것이다.
진행과정과 주의점
`Partitioning` 방식은 `PartitionHander`에서 `Step`의 `execute`를 호출하는 `FutureTask`를 만든 후 `TaskExecutor`를 통해 실행시킨다. `TaskExecutor` 구현체에 따라 실행 방식이 달라지는데, `SimpleAsyncTaskExecutor`나 `ThreadPoolTaskExecutor`처럼 멀티 스레드를 사용하는 `TaskExecutor`를 사용한다면, 위와 같이 메인 쓰레드에 있는 `Step`을 각각 다른 `StepExecution` 객체를 가지고 다른 스레드에서 실행하게 될 것이다.
앞서 멀티 스레드를 사용하는 방식에서도 얘기했지만, 언제나 스레드 안전하도록 관련 구현체들이 설계되어 있어야 한다. 위에서 만든 예제의 경우, `Step`은 스프링 빈으로 등록했기 때문에 싱글톤 객체임으로 모든 스레드에서 이를 사용하게 된다.
하지만 `ItemReader`의 경우, `@StepScope`를 통해 호출 시점에 빈으로 등록하기 때문에, 각 스레드별로 `ItemReader` 객체가 생성이 되어 동시성 문제는 발생하지 않는다. 이후 `ItemProcessor`, `ItemWriter` 역시 지역 변수로 선언된 `chunk`를 참조하기 때문에 싱글톤 객체라도 동시성 문제가 발생하진 않는다.
정리
이번 포스팅에서는 배치 작업을 여러 방식으로 확장하는 방법에 대해서 살펴보았다.
Multi-Threaded Step의 경우는 `Step`의 `Tasklet`을 확장해 실행하는 방식이고, Parallel Steps의 경우에는 `Flow`를 병렬로 실행 시켜 확장하는 방식, `Partitioning`은 하나의 동일한 `Step`을 여러 개로 나누어 실행해 확장한다는 방식이라는 차이점을 알고, 상황에 따라 적절하게 활용하면 될 거 같다.
또한 이러한 방식들은 거의 대부분 멀티 스레드를 사용하는 `TaskExecutor`를 사용하기 때문에, 그 방식과 설계에 따라서 공유되는 자원들에 여러 스레드가 접근했을 때 문제가 없도록, 스레드 안전하게 설계되어야 한다는 점도 꼭 주의하여 구현을 해야 할 것이다.
이 방식에서는 어떤 부분에서 스레드 안전해야 한다 라고 무조건 정리하기 보다, 내가 설계한 `Step`이 어느 부분에서 다른 스레드로 확장되어 실행되며, 이 과정에서 여러 스레드에서 공유하는 객체들이 뭔지 파악한 후 해당 객체들이 스레드 안전한 지 확인하는 것이 좋을 거 같다.