📌 Job
Job 객체는 Job Configuration을 통해 생성되는 객체 단위로서, 배치 작업을 어떻게 구성하고 실행할 것인지 전체적으로 설정하고 명세해 놓은 객체다. 즉, Job은 작업의 명세서다.
배치 Job을 구성하기 위한 최상위 인터페이스이며, 스프링 배치가 기본 구현체를 제공한다.
여러 Step을 포함하고 있는 컨테이너로서 반드시 한개 이상의 Step으로 구성해야 한다.
Job의 기본 구현체
SimpleJob
순차적으로 Step을 실행시키는 Job이다.
모든 Job에서 유용하게 사용할 수 있는 표준 기능을 갖고 있다.
가장 보편적인 Job이다.
FlowJob
특정 조건과 흐름에 따라서 Step을 구성하여 실행시키는 Job이다. 예를 들어서, 특정 조건에 따라 어떤 Step이 실행되는 분기 처리 등이 필요할 때 쓰인다. 특정 조건에 따라 특정 Step만 실행시킨다.
Flow 객체를 가지고 있으며, Flow 객체를 실행시켜서 작업을 진행한다.
Job 동작 구조
JobLauncher가 Job을 실행시킨다.
JobLauncher가 Job을 실행시킬 때 필요한 인자가 Job과 JobParameters다.
JobLauncher는 job과 parameters를 인자로 받아서 run 메소드를 실행시키면 Job 내부에 있는 execute 메소드를 실행시켜서 각각의 Step을 실행시킨다.
Job은 인터페이스다. AbstractJob 추상 클래스가 Job 인터페이스를 구현하고 있다. AbstractJob 추상 클래스를 상속받고 있는 객체가 SimpleJob과 FlowJob이다.
@Configuration
@RequiredArgsConstructor
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
📌 JobInstance
Job이 실행될 때 생성되는 Job의 실행 단위 객체다.
Job의 설정과 구성은 동일하지만 Job이 실행되는 시점에 처리하는 내용이 다르기 때문에 JobInstance를 만들어서 Job의 실행을 구분한다. 예를 들어서 하루에 한 번씩 배치 Job이 실행된다면, 매일 실행되는 각각의 Job을 JobInstance로 표현한다.
JobInstance 생성 및 실행
- 처음 시작하는 Job + JobParameter일 경우 새로운 JobInstance를 생성하고 DB에 저장한다.
- 이전에 실행한 적이 있는 Job + JobParameter일 경우 이미 DB에 존재하는 JobInstance를 반환한다.
- 내부적으로 JobName + JobKey(JobParameter의 해시값)를 가지고 JobInsance를 얻는다.
Job과 JobInstance는 1 : N 관계를 갖는다.
JobInstance 동작 구조
Job과 JobParameter의 값을 기반으로 JobRepository(Job 실행 메타 데이터 저장 역할)를 통해 동일한 JobInstance가 있는지 확인한다. 없으면 새로운 JobInstance를 반환한다. 이미 존재할 경우에는 DB에 존재하는 JobInstance를 반환하고 예외를 발생시켜서 이미 진행한 Job을 또 실행하지 못하도록 한다.
BATCH_JOB_INSTANCE 테이블과 매핑
BATCH_JOB_INSTACNE 테이블에는 JOB_NAME(Job)과 JOB_KEY(JobParameter 해시값)가 동일한 데이터는 중복해서 저장할 수 없다.
이미 실행된 적이 있는 JobInstance인지 판별 유무는 BATCH_JOB_INSTACNE 테이블의 데이터 존재 유무로 따진다.
실습
@Configuration
@RequiredArgsConstructor
public class JobInstanceConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job jobInstance() {
return jobBuilderFactory.get("jobInstance")
.start(stepA())
.next(stepB())
.build();
}
@Bean
public Step stepA() {
return stepBuilderFactory.get("stepA")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
})
.build();
}
@Bean
public Step stepB() {
return stepBuilderFactory.get("stepB")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
})
.build();
}
}
@Component
@RequiredArgsConstructor
public class JobRunner implements ApplicationRunner {
private final JobLauncher jobLauncher;
private final Job jobInstance;
@Override
public void run(ApplicationArguments args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("name", "user3")
.toJobParameters();
jobLauncher.run(jobInstance, jobParameters);
}
}
ApplicationRunner 인터페이스는 스프링 부트가 실행되면 가장 먼저 초기화되고 실행시켜주는 인터페이스다. run 메소드를 오버라이딩해서, 특정 기능을 수행할 수 있다.
JobLanuncher는 스프링 배치가 실행될 때, 빈 등록을 해놓기 때문에 의존성 주입을 받을 수 있다.
JobInstanceConfiguration 클래스의 Job 객체 의존성 주입 받을 수 있다.
해당 Job을 실행시키면 BATCH_JOB_INSTANCE 테이블과 BATCH_JOB_PARAMS 테이블에 메타 데이터가 저장된다.
그리고 만약에 똑같은 Job과 JobParameters로 수행할 경우 기존에 이미 존재하는 JobInstance이므로 예외가 발생한다.
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={name=user1}. If you want to run this job again, change the parameters.
참고로 배치를 수동으로 돌리기 위해선 false로 옵션을 주면 된다. 기본값은 true이다.
spring:
batch:
job:
enabled: false
📌 JobParameter
Job을 실행할 때 함께 포함되어 사용되는 파라미터를 가진 도메인 객체다.
하나의 Job에 존재할 수 있는 여러 개의 JobInstance를 구분하기 위한 용도다.
JobParameters와 JobInstance는 1:1 관계다.
JobParameter 생성 및 바인딩
- 애플리케이션 실행시 주입하는 방법
- java - jar LogBatch.jar requestDate=20221124
- 코드로 생성
- JobParameterBuilder, DefaultJobParametersConverter 둘 중 하나를 사용하여 JobParamter를 생성할 수 있다.
- 보편적으로 JobParameterBuilder 객체가 더 많이 쓰인다.
- SpEL 이용
- @Value(#{jobParameter[requestDate]}), @JobScope, @StepScope 선언 필수
JobParameter 구조
실습 (코드로 생성)
@Configuration
@RequiredArgsConstructor
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return RepeatStatus.FINISHED;
}
})
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
@Component
@RequiredArgsConstructor
public class JobRunner implements ApplicationRunner {
private final JobLauncher jobLauncher;
private final Job job;
@Override
public void run(ApplicationArguments args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("name", "user1")
.addLong("seq", 2L)
.addDate("date", new Date())
.addDouble("age", 16.5)
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
JobParameter의 값들이 해당 테이블에 저장된다.
날짜같은 경우 지정하지 않는 경우 1970-01-01 로 설정된다.
실습2
@Configuration
@RequiredArgsConstructor
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
JobParameters jobParameters = contribution.getStepExecution().getJobExecution().getJobParameters();
String name = jobParameters.getString("name");
Long seq = jobParameters.getLong("seq");
Date date = jobParameters.getDate("date");
Double age = jobParameters.getDouble("age");
Map<String, Object> jobParameters1 = chunkContext.getStepContext().getJobParameters();
System.out.println("step1 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
step1() 메소드를 보면 StepContribution 클래스와 ChunkContext를 통해 JobParameter의 값을 얻을 수 있다.
StepContribution 클래스는 JobParameters 객체를 얻을 수 있고, ChunkContext 클래스는 Map<String, Object> 타입인 JobParameter를 얻을 수 있다.
실습3 (애플리케이션 실행시 주입하는 방법)
gradle → build → bootJar 누른다.
그러면 build폴더 → libs에 보면 jar 파일이 생긴것을 알 수 있다.
jar 파일이 존재하는 libs 폴더로 이동한다.
java -jar batch-0.0.1-SNAPSHOT.jar name=user1 seq(long)=2L date(date)=2022/11/24 age(double)=16.5
해당 문구를 통해 jar를 실행하면서 파라미터를 전달한다.
📌 JobExecution
JobInstance에 대한 한 번의 시도를 의미하는 객체로서, Job 실행 중에 발생하는 정보들을 저장하는 객체다.
시작시간, 종료시간, 상태(시작됨, 완료, 실패), 종료상태의 속성을 가지고 있다.
JobInstance과의 관계
- JobExecution은 FAILED 또는 COMPLETED 등의 Job의 실행 결과 상태를 가지고 있다.
- JobExecution의 실행 상태 결과가 COMPLETED면 JobInstance 실행이 완료된 것으로 간주해서 재 실행이 불가능하다.
- JobExecution의 실행 상태가 FAILED면 JobInstance가 실행이 완료되지 않은 것으로 간주해서 재실행이 가능하다.
- JobParameter가 동일한 값으로 job을 실행할지라도 JobInstance를 계속 실행할 수 있다.
- 이 때, 이미 생성된 JobInstance는 새로 생성되지 않고 재사용된다, 하지만 JobExecution은 재시작할 때 마다 새로 생성된다.
- JobExecution의 실행 상태 결과가 COMPLETED 될 때까지 하나의 JobInstance 내에서 여러 번의 시도가 생길 수 있다.
동작 구조
JobInstance와 JobExecution은 1:N 관계로서 JobInstance에 대한 성공/실패의 내역을 가지고 있다.
위의 BATCH_JOB_EXECUTION 테이블에는 칼럼들이 더 있지만 생략했다.
JobExecution 실행 상태가 FAILED면 언제든지 재실행이 가능하다.
실습
@Configuration
@RequiredArgsConstructor
public class JobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
@Component
@RequiredArgsConstructor
public class JobRunner implements ApplicationRunner {
private final JobLauncher jobLauncher;
private final Job job;
@Override
public void run(ApplicationArguments args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("name", "user1")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
📌 Step
Batch Job을 구성하는 독립적인 하나의 단계로서 실제 배치 처리를 정의하고, 컨트롤하는 데 필요한 모든 정보를 가지고 있는 도메인 객체다.
단일 테스크 뿐 아니라 입력과 처리 그리고 출력과 관련된 복잡한 비즈니스 로직을 포함하는 모든 설정들을 담고있다.
배치작업을 어떻게 구성하고 실행할 것인지 Job의 세부 작업을 Task 기반으로 설정하고 명세해 놓은 객체다.
모든 Job은 하나 이상의 Step으로 구성된다.
Step 구현체
TaskletStep
가장 기본이 되는 클래스로서 Tasklet 타입의 구현체들을 제어한다.
PartitionStep
멀티 스레드 방식으로 Step을 여러 개로 분리해서 실행한다.
JobStep
Step 내에서 Job을 실행하도록 한다.
FlowStep
Step 내에서 Flow를 실행하도록 한다.
Step 동작 구조
실습
public class CustomTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1 was executed");
return RepeatStatus.FINISHED;
}
}
Tasklet을 @Component로 빈으로 등록해서 사용해도 된다.
@Configuration
@RequiredArgsConstructor
public class StepConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new CustomTasklet())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2 was executed");
return RepeatStatus.FINISHED;
}
})
.build();
}
}
@Component
@RequiredArgsConstructor
public class JobRunner implements ApplicationRunner {
private final JobLauncher jobLauncher;
private final Job job;
@Override
public void run(ApplicationArguments args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("name", "user1")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
📌 StepExecution
Step에 대한 한번의 시도를 의미하는 객체로서 Step 실행 중에 발생한 정보들을 저장하고 있는 객체다. 시작시간, 종료시간, 상태(시작됨, 완료, 실패), commit count, rollback count 등의 속성을 가진다.
Step이 매번 시도될 때 마다 생성되며 각 Step별로 생성한다. Step끼리 공유가 되지 않는다.
Job이 재시작하더라도, 이미 성공적으로 완료된 Step은 재 실행되지 않고 실패한 Step만 실행된다. 하지만 옵션을 통해, 완료한 Step을 재실행할 수 있다.
Job에 속해있는 모든 Step의 StepExecution의 작업 상태가 완료 상태가 되어야 JobExecution 상태도 완료된다. Step의 StepExecution 중 하나라도 실패하면 JobExecution도 실패한다.
동작 구조
JobExecution과 StepExecution은 1:N 관계다.
하나의 Job에 여러 개의 Step으로 구성했을 경우 각 StepExecution은 하나의 JobExecution을 부모로 가진다.
📌 StepContribution
청크 프로세스의 변경 사항을 버퍼링 한 후 StepExecution 상태를 업데이트하는 도메인 객체다.
청크 커밋 직전에 StepExecution의 apply 메소드를 호출하여 상태를 업데이트한다.
ExistStatus의 기본 종료 코드 외 사용자 정의 종료코드를 생성해서 적용할 수 있다.
동작 구조
'[ Spring ] > Batch' 카테고리의 다른 글
[Batch] DB 스키마 생성 및 메타 데이터 테이블 (0) | 2022.11.20 |
---|---|
[Batch] Job 만들어서 실행해보기 (0) | 2022.10.23 |
[Batch] 스프링 배치 초기화 설정 클래스 (0) | 2022.10.22 |