Spring Batch 시리즈를 포스팅하고 있습니다. 기준이 되는 버전은 5.x 버전입니다.
1. Spring Batch 소개
2. Job과 Step의 구조 및 생성
3. Job과 Step의 실행 과정 - 현재 포스팅
4. Job의 흐름 제어 - Flow
5. FlowJob의 실행 과정
6. 청크 기반 프로세싱의 구조 및 생성
7. 청크 기반 프로세싱의 진행 과정
8. Flat File, JSON 형식 파일 읽기 및 쓰기
9. DB 데이터 읽기 및 쓰기
10. Step의 예외 제어하기(스킵, 재시도)
11. 배치 작업 확장하기
이번 포스팅에서는 `Job`을 실행하면 어떤 과정으로 실행되는지, `Step`은 또 어떻게 실행되는 건지 한번 알아보자.
이 과정을 통해 1번 포스팅에서 설명했던 도메인 객체들이 어떤 역할을 하는지, 언제 생성이 되는지 확인할 수 있으며, 2번 포스팅에서 다뤘던 `JobBuilder`, `StepBuilder`에서 설정했던 부분들이 어느 시점에 작동하는지 확인할 수 있을 것이다.
직접 Spring Batch의 소스코드를 들여다보며 그 과정을 천천히 설명할 건데, 소스코드 자체가 너무 장황하기 때문에 필자가 생각하기에 `Job`과 `Step`의 실행 과정을 알기 위한 최소한의 코드만 살펴볼 것이다. 포스팅 내의 소스코드에는 생략된 코드들이 굉장히 많다는 점을 유의(?)하며 읽기를 권장한다. 만약 필요하다면 직접 디버깅 해보면서 실제로 어떤 과정들이 더 이루어지는지 알아보면 좋을 거 같다.
Job의 실행
만약 아무런 설정도 하지 않았다면, `application.yaml`에 `spring.batch.job.enabled = true`로 설정해두고, 스프링 빈으로 `Job`이을 한 개만 등록할 경우 자동으로 `Job`이 실행된다. 이는 Spring Batch에서 만들어 놓은 `BatchAutoConfiguration` 덕분이다. Spring의 Auto-configuration을 이용해 기본적으로 Batch 작업에 필요한 기본 객체들을 빈으로 등록한다. 더 자세한 내용은 Spring Batch 5.x 버전과 그 이하 버전간의 차이에 대해 다루면서 다룰 예정이다.
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true)
public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
JobRepository jobRepository, BatchProperties properties) {
JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobLauncher, jobExplorer, jobRepository);
String jobName = properties.getJob().getName();
if (StringUtils.hasText(jobName)) {
runner.setJobName(jobName);
}
return runner;
}
`BatchAutoConfiguration`을 보면 `JobLauncherApplicationRunner`를 빈으로 등록하는 부분이 있는데, 이 객체가 `Job`을 실행시켜주는 역할을 한다. 보면 `spring.batch.job.enabled = true` 인 경우 스프링 빈으로 등록되도록 정의되어 있다.
JobLauncherApplicationRunner
`JobLauncherApplicationRunner`는 `ApplicationRunner` 인터페이스의 구현체로, `ApplicationRunner`의 구현체는 Spring Boot를 실행하는 경우 `ApplicationArguments`와 함께 `run`메서드를 호출하게 된다. 따라서 애플리케이션을 실행하기만 해도 `Job`이 실행되는 것이다.
protected void execute(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
JobParameters parameters = getNextJobParameters(job, jobParameters);
JobExecution execution = this.jobLauncher.run(job, parameters);
}
private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) {
if (this.jobRepository != null && this.jobRepository.isJobInstanceExists(job.getName(), jobParameters)) {
return getNextJobParametersForExisting(job, jobParameters);
}
if (job.getJobParametersIncrementer() == null) {
return jobParameters;
}
JobParameters nextParameters = new JobParametersBuilder(jobParameters, this.jobExplorer)
.getNextJobParameters(job)
.toJobParameters();
return merge(nextParameters, jobParameters);
}
`JobLauncherApplicationRunner`의 `run`메서드가 호출되면, `JobLauncherApplicationRunner`의 `execute`가 최종적으로 호출되면서 `jobLauncher.run`을 호출하게 된다.
`execute`에서는 `getNextJobParameters`를 통해 `JobParameters`를 가져오게 되는데, 이 과정에서 `JobParameteresBuilder`를 이용하게 된다.
public JobParametersBuilder getNextJobParameters(Job job) {
JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
...
else {
JobExecution previousExecution = this.jobExplorer.getLastJobExecution(lastInstance);
if (previousExecution == null) {
nextParameters = incrementer.getNext(new JobParameters());
}
else {
nextParameters = incrementer.getNext(previousExecution.getJobParameters());
}
}
return this;
}
`JobParameteresBuilder`는 `incrementer`를 통해 다음 파라미터를 가져오게 되는데, 이 `incrementer`는 `JobBuilder`를 통해 `Job`을 생성할 당시에 설정했던 `JobParametersIncrementer`이다. 바로 이 시점에 작동하게 되는 것이다.
JobLauncher
`JobLauncher`는 `run` 메서드 하나만 가지고 있는 SAM 인터페이스이다. 5.0.0 버전 기준으로 `SimpleJobLauncher`, `TaskExecutorJobLauncher` 두 개의 구현체가 있는데, `TaskExecutorJobLauncher`는 부모인 `SimpleJobLauncher`의 메서드를 호출하는게 끝이다. `SimpleJobLauncher`의 이름이 인프라에 대해 명시적이지 못하기 때문에 이름을 `TaskExecutorJobLauncher`로 변경하기 위해서 이렇게 되어있다고 한다(참고). 5.2.0 버전 이후에 `SimpleJobLauncher`는 삭제될 예정이라고 한다.
아무튼 이어서 `JobLauncher`의 `run`메서드에서 어떻게 흘러가는지 살펴보자.
@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
final JobExecution jobExecution;
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters); // (1) 최근 JobExecution 조회
if (lastExecution != null) {
if (!job.isRestartable()) { // (2) 재시작 가능한 Job인지 확인
throw new JobRestartException("JobInstance already exists and is not restartable");
}
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning()) {
throw new JobExecutionAlreadyRunningException(
"A job execution for this job is already running: " + lastExecution);
}
else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
}
}
job.getJobParametersValidator().validate(jobParameters); // (3) JobParameter 검증
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); // (4) JobExecution 생성
try {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
job.execute(jobExecution); // (5) Job 실행
}
}
});
}
return jobExecution;
}
우선 (1)에서 `jobRepository`로 부터 `JobParameters`와 `Job`의 이름을 가지고 최근 실행을 가져온다. 이를 통해 이전에 시작한 적 있는 Job인지 판단하게 된다.
그리고 (2)에서 `Job`의 재시작 가능 여부를 확인하여 예외를 던지게 된다. 이전에 `JobBuilder`를 통해 `Job`을 생성할 때, `preventRestart`를 통해 재시작을 허용하지 않을 수 있었는데, 바로 이 부분에서 걸러지는 것이다.
(3)에서는 `JobParameters`를 검증하게 된다. 이전에 `JobBuilder`에서 `validator` 메서드를 통해 `JobParametersValidator`를 설정할 수 있었는데, 바로 여기서 설정한 `JobParametersValidator`의 `validate`를 호출해 검증하게 된다.
(4)에서는 `JobExcution` 도메인 객체를 생성한 후, (5)에서 `execute`를 통해 `Job`을 실행하게 된다.
(4)번 과정에서 `JobRepository`에서 `JobExecution`을 생성할 때 `JobExecution`의 `BatchStatus`는 `STARTING`이 된다.
`JobRepository`에서 이전 `JobExecution`을 가져오고, 새로운 `JobExecution`을 생성하는 부분에 대해서 필요하다면 따로 디버깅 해보길 권장한다.
AbstractJob
`SimpleJobLauncher`에서 `execute`를 호출하게 되면, `AbstractJob`의 `execute`가 호출된다. 한번 메서드를 살펴보자.
@Override
public final void execute(JobExecution execution) {
try (Observation.Scope scope = observation.openScope()) {
jobParametersValidator.validate(execution.getJobParameters()); // (1) JobParameters가 한번 더 검증된다.
if (execution.getStatus() != BatchStatus.STOPPING) {
updateStatus(execution, BatchStatus.STARTED); // (2) // BatchStatus가 STARTED로 변경된다.
listener.beforeJob(execution); // (3) // JobExecutionListener의 beforeJob이 실행된다.
try {
doExecute(execution); // (4) // Job을 실행한다.
}
}
}
catch (Throwable t) { // (5) 예외가 발생한 경우 ExitStatus와 BatchStatus를 변경한다.
execution.setExitStatus(getDefaultExitStatusForFailure(t, execution));
execution.setStatus(BatchStatus.FAILED);
execution.addFailureException(t);
}
finally {
try {
try {
listener.afterJob(execution); // (6) JobExecutionListener의 afterJob이 실행된다.
}
jobRepository.update(execution); // (7) JobExecution의 정보를 저장한다.
}
}
}
정말 간단하게 요약된 코드이다.
(1)에서 우선 `JobParameters`가 한번 더 검증된다.
(2)에서는 `JobExecution`의 `status`가 `STOPPING`이 아니라면 `status`를 `STARTED`로 바꾸고, (3)에서 `JobExecutionListener`의 `beforeJob`을 실행시킨다. 이 때 `JobExecutionListener`는 `JobBuilder`를 통해 `Job`을 생성할 때 당시 `listener` 메서드를 호출해 설정한 객체가 들어가게 된다.
(4)에서 `doExecute`를 통해 `Job`을 실행하게 되는데, 이 메서드는 추상 메서드라서 `AbstractJob`의 구현체에서 구현한 메서드가 호출되게 된다.
`Job` 실행 중 예외가 발생할 경우 (5)에서 `BatchStatus`는 `FAILED`로, `ExitStatus`는 예외에 따라 적합한 객체로 변경된다. 보통 `BatchStatus`와 대응되는 값으로 설정된다.
최종적으로 (6)에서 `JobExecutionListener`의 `afterJob`을 호출하게 되고, (7)에서 `JobExecution`을 영속하게 된다.
Step의 실행
`AbstractJob`에서 `doExecute`를 호출하면 구현체의 메서드가 호출된다고 했다. 위에 명시된 코드로 `Job`을 생성하면 `SimpleJob`이 생성되기 때문에, 이 클래스의 구현부를 살펴보자.
SimpleJob
@Override
protected void doExecute(JobExecution execution)
throws JobInterruptedException, JobRestartException, StartLimitExceededException {
StepExecution stepExecution = null;
for (Step step : steps) {
stepExecution = handleStep(step, execution); // (1) Job에 정의된 step들을 루프돌며 처리한다.
if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
break;
}
}
if (stepExecution != null) {
// (2) 가장 마지막에 실행된 stepExecution의 BatchStatus와 ExitStatus를 Job에 반영시킨다.
execution.upgradeStatus(stepExecution.getStatus());
execution.setExitStatus(stepExecution.getExitStatus());
}
}
(1)에서 `handleStep`을 통해 `Job`에 정의된 모든 `step`을 루프 돌며 처리하게 된다. 처리 중 `Step`의 상태가 `COMPLETED`가 아니라면 루프를 탈출하게 되면서 이후의 `Step`은 진행하지 않게 된다.
그 이후에는 (2)에서 가장 마지막 `StepExecution`의 `BatchStatus`와 `ExitStatus`를 `JobExecution`에 반영하게 된다. 만약 `Step`이 실패하게 되는 경우, `Job`도 실패에 따른 `BatchStatus`와 `ExistStatus`를 가지게 되는 것이다.
참고로 이는 `SimpleJob`만 해당된다. 추후에 `FlowJob`은 다른 조건으로 `BatchStatus`와 `ExitStatus`가 업데이트 되니 혼동되지 않도록 주의하자.
protected final StepExecution handleStep(Step step, JobExecution execution)
throws JobInterruptedException, JobRestartException, StartLimitExceededException {
return stepHandler.handleStep(step, execution);
`handleStep`은 부모인 `AbstractJob`에 정의된 메서드로, `StepHandler`의 `handleStep`을 호출한다.
`StepHandler`는 `Job`을 대신하여 `Step`을 처리하는 전략 인터페이스로, `StepExecution`을 생성하고 관리하는 역할을 수행한다. 5.x 버전 기준으로는 `SimpleStepHandler` 구현체만 있다.
SimpleStepHandler
그렇다면 `SimpleStepHandler`의 `handleStep`메서드를 한번 살펴보자. 코드가 굉장히 긴데, 이것도 많이 요약한거다.. 차근차근 살펴보자.
@Override
public StepExecution handleStep(Step step, JobExecution execution)
throws JobInterruptedException, JobRestartException, StartLimitExceededException {
JobInstance jobInstance = execution.getJobInstance();
// (1) 해당 JobInstance에서 실행되었던 가장 마지막 해당 Step을 가져온다.
StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) {
// (2) 중복된 Step인지 확인하여 로그를 남기고, Step이 실행될 수 있게 한다.
if (logger.isInfoEnabled()) { logger.info("Duplicate step...") }
lastStepExecution = null;
}
StepExecution currentStepExecution = lastStepExecution;
if (shouldStart(lastStepExecution, execution, step)) { // (3) Step을 실행해야 하는지 확인한다.
currentStepExecution = execution.createStepExecution(step.getName()); // (4) 새로운 StepExecution을 생성한다.
boolean isRestart = (lastStepExecution != null
&& !lastStepExecution.getStatus().equals(BatchStatus.COMPLETED));
// (5) 만약 재시작된 Step이라면 이전의 ExecutionContext를 그대로 설정한다. 아니라면 새로운 ExecutionContext를 설정한다.
if (isRestart) {
currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
}
else {
currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
}
// (6) 생성된 StepExecution을 영속한다.
jobRepository.add(currentStepExecution);
// (7) Step을 실행한다.
try {
step.execute(currentStepExecution);
}
// (8) 실행 후 변경된 StepExecution을 저장한다.
jobRepository.updateExecutionContext(execution);
}
// (9) Step이 실행되지 않았다면 이전 StepExecution을 반환한다.(이전에 실행된적이 있는 Step이며 COMPLETED거나, ABANDONED 상태일 때만)
return currentStepExecution;
}
protected boolean shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step)
throws JobRestartException, StartLimitExceededException {
BatchStatus stepStatus;
if (lastStepExecution == null) {
stepStatus = BatchStatus.STARTING;
}
else {
stepStatus = lastStepExecution.getStatus();
}
// (3-1) Step이 UNKNOWN 상태일 때는 예외를 던진다.
if (stepStatus == BatchStatus.UNKNOWN) {
throw new JobRestartException(...);
}
// (3-2) Step이 COMPLETE 상태인데 다시 시작하는게 허용되지 않았거나, ABANDONED 상태일 경우는 false를 반환한다..
if ((stepStatus == BatchStatus.COMPLETED && !step.isAllowStartIfComplete())
|| stepStatus == BatchStatus.ABANDONED) {
return false;
}
// (3-3) 해당 Step이 해당 JobInstance에서 실행된 횟수를 가져와, startLimit 값보다 적다면 true를 반환한다.
if (jobRepository.getStepExecutionCount(jobExecution.getJobInstance(), step.getName()) < step.getStartLimit()) {
return true;
}
// (3-4) startLimit을 초과했다면 예외를 던진다.
else {
throw new StartLimitExceededException(
"Maximum start limit exceeded for step: " + step.getName() + "StartMax: " + step.getStartLimit());
}
}
// (2) 참고
private boolean stepExecutionPartOfExistingJobExecution(JobExecution jobExecution, StepExecution stepExecution) {
return stepExecution != null && stepExecution.getJobExecutionId() != null
&& stepExecution.getJobExecutionId().equals(jobExecution.getId());
}
우선 `StepExecution`과 `JobExecution`, `JobInstance`의 관계부터 다시 한번 상기하고 가보자. `JobInstance`는 `JobParameters`를 가지고 `Job`을 실체화 한 것을 의미하며, `JobParameters`에 유일하게 만들어지는 객체이다. `JobExecution`은 해당 `JobInstance`에 종속되어 `Job`의 실행 한번을 나타내는 객체이다. 그리고 이 `JobExecution`에서 실행된 `Step`을 `StepExecution`으로 나타낸다.
자 여기서 `StepExecution`은 결국 `JobInstance`에도 종속될 수 있고, `JobExecution`에도 종속될 수가 있다. `JobInstance`, `JobExecution`, `StepExecution` 순으로 관계가 1:N:N 이기 때문이다.
(1)에서 조회하는 마지막 `StepExecution`은 `JobInstance`에 종속된 `StepExecution`이다. 즉, 해당 `JobParameters`로 실행된 `Job`에 대해서, 이전에 해당 `Step`이 실행된 적이 있다면 `null`이 아니게 된다.
한번 그림을 통해 이해해보자.
위 그림처럼 Job_1과 이걸 파라미터 1로 실체화 한 JobInstance_1이 있고, 이 인스턴스를 실행해 JobExecution_1과, Job_1에 속한 `Step`을 실행시켜 StepExecution_1이 생성되었다고 해보자. 이 상황에서 StepExecution_1에서 예외가 발생해 Job_1이 실패했고, 파라미터 1로 다시 실행하는 상황이라고 가정해보자.
그러면 JobInstance_1에 종속된 JobExecution_2가 생성이 될 것이고, `SimpleStepHandler`로 들어와 `getLastStepExecution`을 호출하면, 이전에 JobInstance_1에서 생성된 StepExecution_1이 있기 때문에 이게 조회가 된다. 즉, 현재 `JobExecution`에 종속된 `StepExecution`을 조회하는 것이 아니라 `JobInstance`에 종속된 `StepExecution`을 조회하는게 바로 (1)번 과정이다.
(2)번 과정은 조회된 `lastStepExecution`이 현재 `JobExecution`에 종속되는지 확인하는 과정이다. 위 그림만 봤을 때, `lastStepExecution`이 현재 `JobInstance`에 종속되며 현재 `JobExecution`에도 종속되는 경우가 생길 수 있을까? 싶을 것이다. 하지만, 동일 `Job`내에, 중복된 `Step`이 등록된 경우에 이런 상황이 발생한다.
Job_1에 Step_1이 중복으로 2번 구성되어 있는 걸 그림으로 표현해봤다. StepExecution_1이 생길 때 `lastStepExecution`을 구해보면, 이전에 해당 `Step`이 실행된 적이 없이 때문에 `null`이 반환될 것이다. 그런데 StepExecution_2가 생길 때 `lastStepExecution`을 구해보면, 동일 JobInstance_1 내에 이전에 Step_1의 실행을 나타내는 StepExecution_1이 있기 때문에 이게 조회가 된다.
즉, 이 상황에서 StepExecution_1은 동일한 JobInstance_1에 종속되어 있으면서도, JobExecution_1에도 종속되어 있는 `StepExecution`에 해당된다.
Step을 중복으로 구성하는 것이 이 경우는 무조건 오류라고 봐야 할까? 일반적인 경우는 아니지만, 무조건 오류라고 판단하는 건 아니다. 개발자의 의도가 담겨있을 수도 있다. 따라서 Spring Batch도 이 경우를 예외로 처리하지 않는다. 단지 로그를 남기고, 이미 실행된 적이 있는 `Step`이지만, 다시 새롭게 실행될 수 있도록 `lastStepExecution`을 `null`로 설정해버린다.
이제 (3)번 과정을 봐보자. 여기서는 이 `Step`이 실행되어야 하는지 판단한다.
`shouldStart` 메서드에 가서 (3-2)를 보면, `lastStepExecution`이 `COMPLETED`인데, `allowStartIfComplete`가 아니거나, `ABANDONED` 상태라면 `false`를 반환한다. 따라서 이후의 과정이 진행되지 않고, `lastStepExecution`을 (9)번에서 그대로 반환하게 된다.
여기서 `allowStartIfComplete`는 이전에 `StepBuilder`에서 `Step`을 생성할 때 `allowStartIfComplete`를 통해 설정한 값이다. 바로 이 부분에서 작동되게 되는 것이다.
이에 해당하지 않는다면 (3-3)으로 가는데, 여기서는 `startLimit`을 검증하게 된다. 여기서 `jobRepository.getStepExecutionCount` 메서드는 해당`JobInstance`에 종속된 해당 `Step`의 `StepExecution`의 수를 조회하게되고, 이 값이 `startLimit`으로 설정한 값 보다 크다면 (3-4)에서 예외가 발생하게 된다.
`startLimit`에 대해서는 이전 포스팅에서 자세히 설명했으니 참고하길 바란다.
`shouldStart`에서 true을 응답한 경우 드디어 (4)에서 새로운 `StepExecution`을 생성하게 된다.
그리고 (5)에서는 재시작된 `Step`인지 확인하여, 최근 `StepExecution`의 `ExecutionContext`를 그대로 물려준다. 아니라면 새로운 `ExecutionContext`가 설정된다.
(6)에서는 새롭게 생성한 `StepExecution`을 저장하고, (7)에서 `Step`의 `execute` 메서드를 호출하여 `Step`이 실행된다.
`Step`이 실행된 후에는 (8)에서 `StepExecution`을 저장한 후, (9)에서 최종적으로 해당 `StepExecution`을 반환한다.
반환된 `StepExecution`은 위에서 봤던 `SimpleJob`의 `doExecute` 함수에서 받아, 최종적으로 `JobExecution`의 `BatchStatus`와 `ExitStatus`를 해당 `StepExecution`의 `BatchStatus`와 `ExitStatus`로 설정하게 된다.
AbstractStep
`SimpleStepHandler`에서 `Step`의 `execute` 메서드를 호출하면, `AbstractStep.execute`가 호출된다.
@Override
public final void execute(StepExecution stepExecution)
throws JobInterruptedException, UnexpectedJobExecutionException {
// (1) Step의 시작과 관련된 속성을 저장한다.
stepExecution.setStartTime(LocalDateTime.now());
stepExecution.setStatus(BatchStatus.STARTED);
getJobRepository().update(stepExecution);
// (2) exitStatus가 EXECUTING 상태로 변경된다.
ExitStatus exitStatus = ExitStatus.EXECUTING;
try (Observation.Scope scope = observation.openScope()) {
// (3) StepExecutionListener의 beforeStep을 호출한다.
getCompositeListener().beforeStep(stepExecution);
// (4) 추후 ItemStream을 다룰 때 사용한다.
open(stepExecution.getExecutionContext());
try {
// (5) Step을 실행한다.
doExecute(stepExecution);
}
// (6) Step의 실행 결과에 따라 exitStatus가 설정된다.
exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
}
catch (Throwable e) {
// (7) 실행 중 발생한 예외에 따라 BatchStatus, ExitStatus을 업데이트 한다.
stepExecution.upgradeStatus(determineBatchStatus(e));
exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
stepExecution.addFailureException(e);
}
finally {
try {
exitStatus = exitStatus.and(stepExecution.getExitStatus());
stepExecution.setExitStatus(exitStatus);
// (8) StepExecutionListener의 afterStep에 따라 ExitStatus을 설정한다.
exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
}
// (9) ExecutionContext를 저장하고, 예외 발생 시 UNKNOWN 상태로 변경한다.
try {
getJobRepository().updateExecutionContext(stepExecution);
}
catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
stepExecution.addFailureException(e);
}
// (10) Step의 종료와 관련된 속성을 설정한다.
stepExecution.setEndTime(LocalDateTime.now());
stepExecution.setExitStatus(exitStatus);
// (11) StepExecution을 저장하고, 예외 발생 시 UNKNOWN 상태로 변경한다.
try {
getJobRepository().update(stepExecution);
}
catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
stepExecution.addFailureException(e);
}
try {
// (12) 추후 ItemStream을 다룰 때 사용한다.
close(stepExecution.getExecutionContext());
}
}
}
(1)에서는 `StepExecution`에 `Step`이 시작하면 설정해야 하는 속성들을 설정한 후 이를 저장한다.
그리고 `Step`을 실행하기 전, (2)에서 `ExitStatus`를 `EXECUTING`으로 변경하게 된다.
(3)은 `CompositeStepExecutionListener`를 통해 Step이 실행되기 전에 실행해야 할 콜백을 호출한다. `StepBuilder`에서 `listener`를 통해 `StepExecutionListener` 구현체를 설정했다면, 바로 이 과정에서 `CompositeStepExecutionListener`로 부터 위임된 구현체가 실행되게 된다.
(4)의 `open` 메서드는 `AbstractStep`의 하위 구현체에서 `Step`을 실행하기 전에 취해야 할 행동을 지정할 수 있도록 만들어진 확정 지점이다. 추후에 `ItemStream`에 대해 다룰 때 이 부분이 나오니 기억해두자.
(5)에서 `doExecute`를 통해 `Step`을 실행시킨다. 이 메서드는 추상 메서드로, 각 `Step` 구현체에 구현되어 있다.
(6)에서는 `Step`의 실행 결과에 따라 `ExitStatus`를 업데이트 하게 된다. `ExitStatus`는 심각도(?)를 나타내는 `severity`라는 속성을 가지고 있는데, `and` 메서드를 통해 `severity`에 따라 더 심각한 `ExitStatus`를 반환하게 된다. `severity`는 `EXECUTING`, `COMPLETED`, `NOOP`, `STOPPED`, `FAILED`, `UNKNOWN` 순으로 더 높아진다.
지금 (6)번 과정을 예로 들면, `Step`이 완료되었기 때문에 무조건 `COMPLETED` 상태보다 이전 상태를 지닐 수는 없도록 해야 한다. 즉, 누군가 `Step`을 잘못 구현해서, `Step`이 완료되었음에도 불구하고 상태는 계속 `EXECUTING`이라면 문제가 되기 때문에 최소한 `COMPLETED`가 될 수 있도록 구현을 해 놓은것이다.
근데 `Step`을 진행하는 도중에 예외가 발생해서 `FAILED`로 상태가 설정되었다면, `COMPLETED` 상태보다 더 심각한 `FAILED`로 상태를 설정해야 할 것이다. 이 역할을 `and`메서드가 담당한다. 인자로 넘어오는 `ExitStatus`의 `severity`가 더 높다면 그 값을, 아니라면 현재 값을 반환해준다.
(7)에서는 `Step` 실행중에 예외가 발생한 경우 예외에 따라 적합한 `BatchStatus`와 `ExitStatus`를 지정한다.
(8)에서는 (3)처럼 `CompositeStepExecutionListener`를 통해 `Step`이 실행 된 후의 실행해야 할 콜백을 호출하여 `ExitStatus` 값을 변경한다. (3)과 마찬가지로 `StepBuilder`에서 `listener`를 통해 설정한 `StepExecutionListener` 구현체가 바로 이 부분에서 작동된다.
(9), (10), (11)은 주석에 설명된 그대로이다. `ExitStatus`가 `UNKNOWN`으로 설정되는 경우는 대부분 `StepExecution`을 영속하는 과정에서 발생한 예외로 인해 설정된다고 보면 된다.
(12)는 (4)와 비슷한 맥락인데, `Step`이 실행된 후에 취해야 할 행동을 하위 구현체에서 지정할 수 있도록 만들어진 확장 지점이다. 이 역시 추후 `ItemStream`에 대해 다룰 때 한번 더 보게 될 것이다.
(5)에서 `doExecute`를 통해 하위 구현부를 호출하는 부분은 Spring Batch가 제공하는 기능들을 점차 배우면서 각 주제별로 적합한 구현체들에 대해 다룰 예정이다. 지금은 우선 간단하게 `Step`을 생성할 때 지정한 `Tasklet`이 실행된다고만 생각하자.
JobStep
`Step`의 실행을 `Job`으로 위임하는 구현체이다.
`StepBuilder`에서 `job` 메서드를 통해 `JobStepBuilder`을 얻을 수 있으며, 해당 객체로 `JobStep`을 만들면 된다.
public class JobStep extends AbstractStep {
private Job job;
private JobLauncher jobLauncher;
private JobParametersExtractor jobParametersExtractor = new DefaultJobParametersExtractor();
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
ExecutionContext executionContext = stepExecution.getExecutionContext();
executionContext.put(STEP_TYPE_KEY, this.getClass().getName());
JobParameters jobParameters;
if (executionContext.containsKey(JOB_PARAMETERS_KEY)) {
jobParameters = (JobParameters) executionContext.get(JOB_PARAMETERS_KEY);
}
else {
jobParameters = jobParametersExtractor.getJobParameters(job, stepExecution);
executionContext.put(JOB_PARAMETERS_KEY, jobParameters);
}
JobExecution jobExecution = jobLauncher.run(job, jobParameters);
stepExecution.setExitStatus(determineStepExitStatus(stepExecution, jobExecution));
if (jobExecution.getStatus().isUnsuccessful()) {
// AbstractStep will take care of the step execution status
throw new UnexpectedJobExecutionException("Step failure: the delegate Job failed in JobStep.");
}
else if (jobExecution.getStatus().equals(BatchStatus.STOPPED)) {
stepExecution.setStatus(BatchStatus.STOPPED);
}
}
}
`doExecute` 함수를 보면, `jobLauncher.run`을 통해 `Job`을 실행시키는 것을 볼 수 있다. 따라서 그 이후의 과정은 이번 포스팅에서 다뤘던 `Job`이 실행되는 과정과 동일하다.
보통 `Step`의 로직이 크고 복잡한 경우 `Job`을 통해 이를 모듈화하기 위해 사용하거나, 여러 `Job 간에 종속성이 있는 경우 이를 관리하기 위해 사용한다.
당연한 소리이긴 한데 내부의 `Job`이 실패하는 경우, `JobStep` 역시 실패 상태가 되며, 해당 `JobStep`을 실행시킨 `Job 역시 실패된다.
정리
이번 포스팅에서는 Job과 Step이 어떤 과정을 거쳐 실행되는지, 관련 도메인이 어떻게 생성되고 작동되는지 알아보았다. 다음 포스팅에서는 Flow에 대해 알아보며 특정 상태에 따라 Job의 흐름을 전환하는 방법에 대해서 알아보자.