Spring Batch 시리즈를 포스팅하고 있습니다. 기준이 되는 버전은 5.x 버전입니다.
1. Spring Batch 소개
2. Job과 Step의 구조 및 생성
3. Job과 Step의 실행 과정
4. Job의 흐름 제어 - Flow
5. FlowJob의 실행 과정(현재 포스팅)
이번 포스팅에서는 `FlowJob`이 어떤 과정으로 실행이 되는지 확인해볼 것이다.
이 과정을 이해하기 위해 상태 전이를 다루는 `State`와 `StateTransition`에 대해서 우선 살펴볼 것이고, `Flow`를 빌드하면 어떻게 `State`와 `StateTransition`이 빌드되는지도 알아볼 것이다.
그리고 `Flow`가 어떤 방식으로 구현이 되어 동작되는 것인지 `SimpleFlow` 구현체를 알아볼 것이고, 이 과정에서 `State`, `StateTransition`이 어떻게 상태 전이를 시키는 지 확인해볼 것이다.
상태와 상태 전이
"조건부 흐름"은 여러 개의 "상태 전이"로 구성되어 있다. 이전 포스팅에서 간단하게 설명했는데, 상태 전이란 기준 상태가 특정 조건과 매칭된다면 다음 상태로 전이하는 것을 의미한다. 이러한 상태 전이가 여러 개 모여서 조건부 흐름을 구성하는 것이다.
위와 같은 조건부 흐름이 있을 때, 상태 전이를 분석해보면 다음과 같다.
상태 전이 | 기준 상태 | 조건 | 다음 상태 |
A가 성공하면 B상태로 전이한다. | A | 성공 | B |
A가 성공하지 못하면 C 상태로 전이한다. | A | 성공이 아님 | C |
즉, 위의 조건부 흐름은 위 두 가지 상태 전이로 구성된 흐름이다.
Spring Batch에서는 "상태"를 `State`, "상태 전이"를 `StateTransition`으로 구현해놓았다. 한번 이 둘을 살펴보자.
State
public interface State {
String getName();
FlowExecutionStatus handle(FlowExecutor executor) throws Exception;
boolean isEndState();
}
Spring Batch에서 "상태"를 표현한 인터페이스이다.
`Flow` 내에서 유일한 상태를 식별하기 위해 getName 메서드를 정의해 놓았고, 해당 상태를 다뤄서 결과를 얻어내는 것을 `handle` 메서드로 정의해 놓았다. 마지막 흐름에 해당되어 `Flow`를 더 이상 실행시키지 않도록 구분해주는 `isEndState` 메서드도 정의되어 있다.
`State`는 이전 포스팅의 다이어그램에서 5개의 구현체가 있다고 했었다. 그 중 `SplitState`는 추후에 병렬 처리에 대해서 다룰 때 알아보도록 하고, 나머지 구현체들에 대해서 알아보자.
이름 | 설명 |
DecisionState | JobExecutionDecider를 실행해야 하는 상태를 나타낸다. JobExecutionDecider의 결정 결과가 반환된다. |
EndState | 작업의 종료 상태를 나타낸다. |
StepState | Step을 실행해야 하는 상태를 의미한다. Step의 실행 결과가 반환된다. |
FlowState | Flow를 실행해야 하는 상태를 의미한다. Flow의 실행 결과가 반환된다. |
`DecisionState`는 `JobExecutionDecider`의 `decide` 메서드를 호출하여 반환된 `FlowExecutionStatus`를 반환한다.
`EndState`는 이전 상태가 성공적으로 끝났던 실패로 끝났던 상관 없이 작업을 끝내야 하는 상태를 말한다.
`StepState`는 `Step`을 실행하여 반환된 `ExitCode`를 `FlowExecutionStatus`로 만들어 반환한다.
`FlowState`는 `Flow`를 실행하여 반환된 `FlowExecutionStatus`를 반환한다. 아직 다룬 적은 없지만, `Flow`내에 `Flow`가 있을 수 있는데 이런 경우에 해당한다.
StateTransition
상태 전이 자체를 나타내는 클래스이다.
public final class StateTransition {
private final State state;
private final String pattern;
private final String next;
public boolean matches(String status) {
return PatternMatcher.match(pattern, status);
}
public boolean isEnd() {
return next == null;
}
}
엄청 간단하게 핵심만 요약한 코드이다. 보면 `state`, `pattern`, `next`라는 멤버를 가지고 있는데 이것이 바로 상태 전이의 기준 상태, 조건, 다음 상태를 표현하는 변수이다.
따라서, A상태가 성공하면 B상태로 전이한다 라는 상태 전이가 있다면, `state`는 A 상태 객체가 초기화 될 것이고, `pattern`은 "성공", `next`에는 전이해야 할 다음 상태의 이름이 초기화 될 것이다.
`matches` 메서드는 `state`의 실행 결과(`FlowExecutionStatus`)가 `pattern`과 일치하는지를 확인하는 함수이다. 이전 포스팅에서, `TransitionBuilder`의 `on`함수에 지정하는 `String` 인자는 패턴을 기준으로 매칭시킨다고 했는데 바로 이 부분이다.
`isEnd` 메서드는 다음으로 전이할 `State`가 없을 경우에 `Job`을 종료시키기 위한 시그널을 반환하는 메서드이다.
StateTransition의 생성
간단한 `Flow`을 하나 만들어보고, 실제 어떻게 `State`와 `StateTransition`이 만들어지는지 확인해보자.
FlowBuilder<Flow>("sample-flow")
.start(stepA())
.on("COMPLETED").to(stepB())
.from(stepA())
.on("*").to(stepC())
.end()
위와 같이 `Flow`를 만들고, 만들어진 `SimpleFlow`를 디버깅 해보면 `stateTransitions`에 총 13개의 `StateTransition`이 들어있는 리스트가 초기화 된 걸 확인할 수 있다.
보면 상태 전이가 얼마 없어보이는 간단한 `Flow`인데, 왜 13개나 나왔을까? 위에서 상태 전이를 분석했을 때는 정말 간단하게 분석한거고, 사실 다음 상태가 없는 것도 일종의 상태 전이 중 하나이기 때문에 그렇다. 한번 다이어그램을 상세하게 그려보자.
각 분기에서 끝나는 경우, 그리고 모든 분기에서 중단된 경우까지 추가한 다이어 그램이다. 이를 기반으로 상태 전이를 분석해보자.
번호 | 상태 전이 | 기준 상태 | 조건 | 다음 상태 |
1 | A가 성공하면 B로 전이한다. | A | 성공 | B |
2 | A가 실패하면 C로 전이한다. | A | 실패 | C |
3 | B가 성공하면 성공 상태로 전이한다. | B | 성공 | SUCCESS |
4 | B가 실패하면 실패 상태로 전이한다. | B | 실패 | FAILURE |
5 | C가 성공하면 성공 상태로 전이한다. | C | 성공 | SUCCESS |
6 | C가 실패하면 실패 상태로 전이한다. | C | 실패 | FAILURE |
7 | B가 성공해서 성공 상태로 전이되면 작업을 종료한다. | SUCCESS | 모두 | null |
8 | B가 실패해서 실패 상태로 전이되면 작업을 종료한다. | FAILURE | 모두 | null |
9 | C가 성공해서 성공 상태로 전이되면 작업을 종료한다. | SUCCESS | 모두 | null |
10 | C가 실패해서 실패 상태로 전이되면 작업을 종료한다. | FAILURE | 모두 | null |
11 | 성공 상태로 전이되면 작업을 종료한다. | SUCCESS | 모두 | null |
12 | 실패 상태로 전이되면 작업을 종료한다. | FAILURE | 모두 | null |
13 | 중단 상태로 전이되면 작업을 종료한다. | STOPPED | 모두 | null |
모든 분기에서 끝나는 경우와, 중단된 경우를 합해서 분석을 하면 위와 같이 총 13개의 상태 전이가 나오게 된다. 따라서 `StateTransition` 객체가 총 13개가 리스트에 담겨 초기화가 된 것이다.
상태 전이가 추가되는, `FlowBuilder`의 `addTransition`메서드를 한번 살펴보자.
private void addTransition(String pattern, State next) {
tos.put(next.getName(), next);
transitions.add(StateTransition.createStateTransition(currentState, pattern, next.getName()));
if (transitions.size() == 1) {
transitions.add(StateTransition.createEndStateTransition(failedState));
transitions.add(StateTransition.createEndStateTransition(completedState));
transitions.add(StateTransition.createEndStateTransition(stoppedState));
}
if (next.isEndState()) {
transitions.add(StateTransition.createEndStateTransition(next));
}
dirty = true;
}
이 메서드는 `FlowBuilder`에서 `on`을 통해 TransitionBuilder를 생성하고, `to`, `end`, `stop`등 과 같은 종결 메서드를 호출하면 호출된다.
보면 주어진 패턴과, 다음 `State`를 인자로 받는데 이걸 그대로 `StateTransition`에 `state`, `pattern`, `next`에 초기화하여 객체를 만든 다음 `transitions`에 추가해준다.
그리고 `transitions`가 만약 처음 추가되는 거라면 성공 상태, 실패 상태, 중단 상태에 대한 상태 전이도 초기에만 추가해주게 된다. 그리고 각 분기의 마지막 상태인 경우에도 `EndState`를 추가해주는 것을 볼 수 있다.
또한 FlowBuilder의 flow 함수가 호출될 때, `addDanglingEndStates` 메서드를 통해 전환할 상태가 정의되지 않은 상태를 식별하여 추가적으로 `addTransition`을 호출한다.
결론적으로 위와 같은 부분에서 `StateTransition` 객체들을 만들어서 정학환 상태 전이를 구현하고 있는 것이다.
이제 `State`, `StateTransition`이 어떻게 상태 전이를 표현했는지 알아보았으니, 실제 `FlowJob`의 실행 과정을 살펴보며 이 과정에서 `State`와 `StateTransition`이 어떻게 작동 되는지도 살펴보자.
FlowJob의 실행 과정
이전에 3번 포스팅에서 `Job`의 실행 과정에 중 AbstractJob에서 execute가 호출되는 부분에 대해 다뤘었다.
여기서 `Job` 구현체의 `doExecute` 메서드를 통해 `Job`이 실질적으로 실행된다는 사실을 알 수 있었고, `SimpleJob`의 `doExecute` 메서드를 살펴봤었다.
이번에는 `FlowJob`의 `doExecute`를 살펴보자.
FlowJob
@Override
protected void doExecute(final JobExecution execution) throws JobExecutionException {
try {
JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(),
new SimpleStepHandler(getJobRepository()), execution);
executor.updateJobExecutionStatus(flow.start(executor).getStatus());
}
catch (FlowExecutionException e) {
if (e.getCause() instanceof JobExecutionException) {
throw (JobExecutionException) e.getCause();
}
throw new JobExecutionException("Flow execution ended unexpectedly", e);
}
}
`JobFlowExecutor` 객체를 생성한 후, `flow`의 `start`메서드를 호출하는 것을 볼 수 있다.
`start`메서드는 `FlowExecution`을 반환하는데, `StepExecution`, `JobExecution` 처럼 `Flow`의 실행 정보들을 담고 있는 객체라고 보면 된다.
이 객체는 `JobExecution`, `StepExecution`의 `BatchStatus`처럼 `Flow`의 실행 결과를 나타내는 `FlowExecutionStatus` 타입의 `status` 멤버를 가지고 있는데, 이 값을 `JobExecution`의 `ExitStatus`로 갱신해주게 된다.
SimpleFlow
앞서 `FlowJob`에서 `Flow`의 `start` 메서드를 호출하는 것을 확인했으니, 유일한 구현체인 `SimpleFlow`의 `start` 메서드를 살펴보자.
@Override
public FlowExecution start(FlowExecutor executor) throws FlowExecutionException {
initializeTransitionsIfNotInitialized();
State state = startState;
String stateName = state.getName();
return resume(stateName, executor);
}
우선 `initializeTransitionsIfNotInitialized`를 통해 상태 전이를 초기화 시켜준다.
private State startState;
private final Map<String, Set<StateTransition>> transitionMap = new HashMap<>();
private final Map<String, State> stateMap = new HashMap<>();
private List<StateTransition> stateTransitions = new ArrayList<>();
`SimpleFlow`에는 상태 전이와 관련해서 위와 같이 4개의 멤버를 사용하게 된다. `stateTransition`은 위에서 `FlowBuilder`를 통해 `StateTransition`을 빌드할 때 초기화 됐던 멤버이다.
나머지 3개의 멤버는 `initializeTransitionsIfNotInitialized` 메서드를 통해 초기화 되는데, `startState`는 이 `Flow`의 시작 상태를 의미하고, `trainsitionMap`은 `State`의 이름을 `Key`로, 해당 이름을 가지고 있는 `StateTransition`을 `Set`에 담아 `Value`로 담는 `Map`이다. `stateMap`은 `StateStransition`의 모든 `State`에 대해 `State`의 이름을 `Key`로 `State`객체를 `Value`로 한 `Map`이다.
위에서 13개의 `StateTransition`이 생겼던 예시를 그대로 `stateTransition`, `transitionMap`, `stateMap`으로 표현해 보았다.
`stateMap`의 경우는 `State`의 `name` 값을 `key`로 하기 때문에 유일한 `State` 객체를 저장할 수 있고, `transitionMap`의 경우는 같은 `State`에서 분기가 어떻게 나뉘는지를 확인할 수 있게 되는 것이다.
더 자세한 로직은 직접 `initializeTransitionsIfNotInitialized`메서드를 살펴보자.
다시 계속해서, 위 멤버들을 초기화 한 이후에는 시작 상태의 이름을 `resume` 메서드를 통해 `Flow`을 계속하게 된다. 한번 살펴보자.
@Override
public FlowExecution resume(String stateName, FlowExecutor executor) throws FlowExecutionException {
FlowExecutionStatus status = FlowExecutionStatus.UNKNOWN;
State state = stateMap.get(stateName);
StepExecution stepExecution = null;
while (isFlowContinued(state, status, stepExecution)) {
stateName = state.getName();
try {
status = state.handle(executor);
stepExecution = executor.getStepExecution();
}
catch (FlowExecutionException e) {
executor.close(new FlowExecution(stateName, status));
throw e;
}
catch (Exception e) {
executor.close(new FlowExecution(stateName, status));
}
state = nextState(stateName, status, stepExecution);
}
FlowExecution result = new FlowExecution(stateName, status);
executor.close(result);
return result;
}
우선 `stateMap`으로 부터 인자로 넘어온 `stateName`에 해당하는 `State`를 꺼낸다.
그리고 `isFlowContinued`메서드를 통해 현재 `Flow`를 계속해야 하는지 확인한다.
protected boolean isFlowContinued(State state, FlowExecutionStatus status, StepExecution stepExecution) {
boolean continued = true;
continued = state != null && status != FlowExecutionStatus.STOPPED;
if (stepExecution != null) {
Boolean reRun = (Boolean) stepExecution.getExecutionContext().get("batch.restart");
Boolean executed = (Boolean) stepExecution.getExecutionContext().get("batch.executed");
if ((executed == null || !executed) && reRun != null && reRun && status == FlowExecutionStatus.STOPPED
&& !state.getName().endsWith(stepExecution.getStepName())) {
continued = true;
}
}
return continued;
}
`isFlowContinued` 메서드에서는 `state`가 `null`이 아니고, 현재 `Flow`의 `FlowExecutionStatus`가 `STOPPED`가 아닌지에 따라 `continued`를 `false`로 변경한다.
그리고 다음 `if`문에서 `FlowExecutionStatus`가 `STOPPED`인 경우임에도 계속 되어야 하는 경우를 따져 `continued`를 `true`로 변경해주고 반환한다.
다시 `resume` 메서드로 넘어오면, `state`의 `handle`을 호출하여 실질적으로 `State`를 실행시키게 된다. `State`는 5개의 구현체가 있는걸 보았는데, 이 중 `StepState`라면 `Step`이 실행되게 되는 것이다. 한번 살펴보자.
@Override
public FlowExecutionStatus handle(FlowExecutor executor) throws Exception {
executor.abandonStepExecution();
return new FlowExecutionStatus(executor.executeStep(step));
}
`executor`의 `executeStep`을 통해 `Step`을 실행시킨다.
@Override
public String executeStep(Step step)
throws JobInterruptedException, JobRestartException, StartLimitExceededException {
boolean isRerun = isStepRestart(step);
StepExecution stepExecution = stepHandler.handleStep(step, execution);
stepExecutionHolder.set(stepExecution);
if (stepExecution == null) {
return ExitStatus.COMPLETED.getExitCode();
}
if (stepExecution.isTerminateOnly()) {
throw new JobInterruptedException("Step requested termination: " + stepExecution,
stepExecution.getStatus());
}
if (isRerun) {
stepExecution.getExecutionContext().put("batch.restart", true);
}
return stepExecution.getExitStatus().getExitCode();
}
`stepHandler`의 `handleStep`을 통해 `Step`을 실행시키는 것을 볼 수 있다. `stepHandler`는 `FlowJob`에서 `JobFlowExecutor`를 생성할 때 `SimpleStepHandler` 객체를 만들어서 생성자의 인자로 넣어주었는데, 이 객체의 `handleStep`을 호출하게 되는 것이다.
`SimpleStepHandler`의 `handleStep`은 이전 포스팅에서 다뤘으니 이후 과정은 넘어가겠다.
다시 돌아와서, `SimpleFlow`의 `resume` 메서드에서 `State`의 `handle`함수를 호출하면 `FlowExecutionStatus`를 반환하게 된다. 그 다음에 반환된 `FlowExecutionStatus`를 가지고 `nextState` 메서드를 호출하여 다음에 실행되어야 할 `State`를 가져오게 된다.
protected State nextState(String stateName, FlowExecutionStatus status, StepExecution stepExecution)
throws FlowExecutionException {
Set<StateTransition> set = transitionMap.get(stateName);
if (set == null) {
throw new FlowExecutionException(
String.format("No transitions found in flow=%s for state=%s", getName(), stateName));
}
String next = null;
String exitCode = status.getName();
for (StateTransition stateTransition : set) {
if (stateTransition.matches(exitCode)
|| (exitCode.equals("PENDING") && stateTransition.matches("STOPPED"))) {
if (stateTransition.isEnd()) {
// End of job
return null;
}
next = stateTransition.getNext();
break;
}
}
if (next == null) {
throw new FlowExecutionException(
String.format("Next state not found in flow=%s for state=%s with exit status=%s", getName(),
stateName, status.getName()));
}
if (!stateMap.containsKey(next)) {
throw new FlowExecutionException(
String.format("Next state not specified in flow=%s for next=%s", getName(), next));
}
return stateMap.get(next);
}
보면 현재 진행했던 `State`의 `name`이 인자로 넘어왔었는데, 이걸 가지고 `transitionMap`에서 `Set<StateTransition>`을 가지고 온다.
이걸 루프를 돌며, `stateTransition`의 `matches` 메서드를 호출해 현재 `FlowExecutionStatus`와 `StateTransition` 내의 `pattern`과 매핑되는지를 확인한다.
맞는 경우, 다음에 진행해야할 `State`의 이름을 `next` 변수에 초기화하고, 최종적으로 해당 이름을 `Key`로 가지고 있는 `State`를 반환하게 된다.
다시 `resume`함수로 돌아와보면, `nextState`에서 반환된 `State`를 `state` 지역 변수에 초기화 한 후, `while`문의 조건부로 다시 올라가서 `isFlowContinued`를 통해 `Flow`가 계속되어야 하는지 확인한 후 다음 `State`를 실행시키게 되는 것이다.
정리
간단하게 흐름만 순서대로 한번 정리해보자.
1. `State`를 `handle`한다.
2. `status` 값을 받는다.
3. `transitionMap`으로 부터 루프를 돌며 `StateTransition`의 `pattern`과 `status`를 매핑시킨다.
4. 매핑되는 `StateTransition`의 `State`를 다음에 다룰 `State`로 설정한다.
5. 1번부터 반복한다.
6. 더 이상 `Flow`를 진행시키지 않는 경우 마지막 `State`의 `status`를 반환한다.
JobExecutionDecider
지금까지 봤던 `StepState`의 경우, 전이되어야 할 조건은 `Step`을 실제 실행시킨 다음 종료되면 반환되는 `ExitStatus`의 `exitCode` 값과 매칭됐었다. 하지만 `JobExecutionDecider`를 통하면 `Step`의 실행 없이, `FlowExecutionStatus`를 반환하여 이 값과 매핑을 시킬수가 있다.
@FunctionalInterface
public interface JobExecutionDecider {
FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution);
}
`JobExecutionDecider`는 `decide` 메서드 하나만 정의되어 있는 SAM 인터페이스로, 파라미터로 `JobExecution`과 `StepExecution`을 받아 현재 `Job`의 실행 정보와, 최근 `Step`의 실행 정보만을 가지고 `FlowExecutionStatus`를 생성하여 반환한다. 반환된 `FlowExecutionStatus`과 `StateTransition`의 `pattern`과 일치하는 경우 다음 상태로 전이하게 되는 것이다.
한번 예제를 만들어보자.
@Bean
fun deciderJob(): Job {
val decider = CustomDecider()
return JobBuilder("deciderJob", jobRepository)
.start(initStep())
.next(decider)
.on("EVEN").to(stepA())
.from(decider)
.on("ODD").to(stepB())
.end()
.build()
}
class CustomDecider(): JobExecutionDecider {
override fun decide(jobExecution: JobExecution, stepExecution: StepExecution?): FlowExecutionStatus {
val randomValue = stepExecution?.executionContext?.get("random") as Int
return if (randomValue % 2 == 0) FlowExecutionStatus("EVEN")
else FlowExecutionStatus("ODD")
}
}
@Bean
fun initStep(): Step {
return StepBuilder("init", jobRepository)
.tasklet(transactionManager) { contribution, chunkContext ->
val randomValue = Random.Default.nextInt(10, 1000)
println("Random Value: $randomValue")
contribution.stepExecution.executionContext.put("random", randomValue)
RepeatStatus.FINISHED
}
.build()
}
@Bean
fun stepA(): Step {
return StepBuilder("stepA", jobRepository)
.tasklet(transactionManager) { contribution, chunkContext ->
println("stepA")
RepeatStatus.FINISHED
}
.build()
}
@Bean
fun stepB(): Step {
return StepBuilder("stepB", jobRepository)
.tasklet(transactionManager) { contribution, chunkContext ->
println("stepB")
RepeatStatus.FINISHED
}
.build()
}
`initStep`에서 통해 해당 `Step`의 `ExecutionContext`에 랜덤한 숫자를 넣어놓는다.
다음으로 `CustomDecider`를 실행시키도록 해놨는데, `CustomDecider`를 보면 `stepExecution`에서 `ExecutionContext`를 꺼내 `initStep`에서 넣어준 `random` 값을 꺼내고 있다. 이 값이 짝수면 EVEN을, 홀수면 ODD를 `FlowExecutionStatus`의 `name`으로 하여 반환한다.
그러면 `on` 메서드의 조건에 따라서, `decide` 메서드의 결과가 EVEN이면 stepA로 상태가 전이되고, 아니라면 stepB로 상태가 전이되는 것이다.
실제 로그를 한번 확인해보자.
랜덤한 값은 822가 나왔고, 짝수이기 때문에 stepA가 실행된 것을 볼 수 있다.
정리해보자면, `JobExecutionDecider`는 단지 현재 `Job`과, 이전 `Step`의 정보를 가지고 `FlowExecutionStatus`를 반환하여 단순히 다음 전이될 상태를 결정하는 역할만 한다고 보면 된다.
좀 헷갈릴 수 있는 점이, 이전에 `ExecutionContext`에 대해 다룰 때 `StepExecution`의 `ExecutionContext`는 각 `Step` 안에서만 공유가 된다고 했었다. 근데 `initStep`에서 넣어준 값을 어떻게 `JobExecutionDecider`에서 꺼내올 수 있는거지? 생각할 수 있는데, `JobExecutionDecider`의 인자로 들어오는 `StepExecution`은 이전 `Step`의 `StepExecution`이라는 점을 알아야 한다. 따라서 이전 `Step`의 정보만 가지고 결정을 하려면, 굳이 `JobExecution`의 `ExecutionContext`을 사용하지 않아도 된다.
DecisionState
`JobExecutionDecider`를 실행해야 하는 상태는 `DecisionState`로 표현된다.
public class DecisionState extends AbstractState {
private final JobExecutionDecider decider;
@Override
public FlowExecutionStatus handle(FlowExecutor executor) throws Exception {
return decider.decide(executor.getJobExecution(), executor.getStepExecution());
}
}
`handle`을 호출하면, 단지 멤버의 `decide` 메서드를 호출해서 반환한다.
여기서 `executor`를 통해 `JobExecution`과, `StepExecution`을 얻는걸 볼 수 있는데, `executor`는 `JobFlowExecutor` 클래스의 인스턴스가 사용되며 이 클래스의 구현을 보면 `StepExecution`을 얻을 때 `ThreadLocal`을 사용하는 것을 볼 수 있다.
Spring Batch는 기본적으로 단일 스레드로 작동하기 때문에, `StepExecution`을 `ThreadLocal`에 저장하여 최근 Step의 `StepExecution`을 가져올 수 있는 것이다.
FlowStep
`Step`의 실행을 `Job`으로 위임시키는 `JobStep`처럼, `Step`의 실행을 `Flow`로 위임시키는 클래스이다.
`StepBuilder`의 `flow` 메서드를 호출하여 `FlowStepBuilder`를 얻을 수 있으며, 해당 객체로 `FlowStep`을 생성하면 된다.
public class FlowStep extends AbstractStep {
private Flow flow;
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
try {
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
StepHandler stepHandler = new SimpleStepHandler(getJobRepository(), stepExecution.getExecutionContext());
FlowExecutor executor = new JobFlowExecutor(getJobRepository(), stepHandler,
stepExecution.getJobExecution());
executor.updateJobExecutionStatus(flow.start(executor).getStatus());
stepExecution.upgradeStatus(executor.getJobExecution().getStatus());
stepExecution.setExitStatus(executor.getJobExecution().getExitStatus());
}
catch (FlowExecutionException e) {
if (e.getCause() instanceof JobExecutionException) {
throw (JobExecutionException) e.getCause();
}
throw new JobExecutionException("Flow execution ended unexpectedly", e);
}
}
}
`AbstractJob`에서 호출되는 `doExecute` 함수는 위와 같이 구현이 되어 있는데, 보면 `FlowJob`의 `doExecute`함수와 굉장히 유사하다. `JobFlowExecutor` 인스턴스를 생성한 후 `flow.start`를 호출하여 `Flow`를 실행시키는 부분은 동일하다.
정리
이전 포스팅과 이번 포스팅을 걸쳐서, `Job`의 흐름을 제어하는 `Flow`에 대해서 자세하게 알아보았다. 전체적으로 정리해보자.
순차적 흐름이든 조건부 흐름이든 `Flow`를 만들어서 `Job`의 흐름을 제어할 수 있고, 이 `Flow`는 `FlowBuilder`를 통해 생성할 수 있다.
그리고 생성된 `Flow`에 `Job`의 실행을 위임하는 `FlowJob`을 `FlowJobBuilder`를 통해 만들 수 있으며, `JobFlowBuilder`를 통해 `Flow`를 빌드함과 동시에 `FlowJobBuilder`에 빌드된 `Flow`를 초기화 시켜줄 수 있다는 점까지 알아보았다.
`Flow`는 조건에 따라 상태를 전이하기 위해 `State`, `StateTransition`을 사용하며 `State`는 상태 자체를, `StateTransition`은 기준 상태, 조건, 다음 상태로 구성되어 상태 전이를 표현한다.
실질직으로 상태를 전이하는 것은 `Flow`의 유일한 구현체 `SimpleFlow`에서 이루어지는데, `StateTransition`을 여러 데이터 형태로 가공하여 이전 상태의 결과에 따라 다음 상태로 넘기거나 `Flow`를 종료시키게 된다.
여기까지, `Flow`에 대해 정말 간략하게 정리했다. 이제 다음 포스팅에서는 Spring Batch가 제공하는 청크 기반 프로세싱에 대해서 알아보도록 하자.