全面解释!使用SpringBatch进行批处理
首先
这篇文章将从零开始介绍Spring Batch,并通过利用它进行环境配置来实现批处理。总体而言,批处理过程是这样的:当批处理程序运行时,它会从某个JSON文件中读取数据,并将其写入MySQL数据库中。
SpringBatch是什么
Spring Batch是Spring的子项目之一。它以Spring为基础,是一个批处理应用程序框架。
SpringBatch的配置
在下图中展示了Spring Batch的主要组成部分和整个处理流程(块模型)。
请注意:本机中文文本是由人工智能生成的,仅供参考。
構成要素役割JobSpring Batchにおけるバッチアプリケーションの一連の処理をまとめた1実行単位。StepJobを構成する処理の単位。1つのJobに1~N個のStepをもたせることが可能。JobLauncherJobを起動するためのインターフェース。ItemReaderチャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。ItemProcessorチャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。ItemWriterチャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。JobRepositoryJobやStepの状況を管理する機構。これらの管理情報は、Spring Batchが規定するテーブルスキーマを元にデータベース上に永続化される。
环境构建
-
- SpringBatch
-
- SpringBoot
-
- Java
- Grade
源代码的实现
接下来,我们将创建一个项目并进行说明。
引入图书馆
这个文件是一个图书馆请求。
buildscript {
ext {
springBootVersion = '2.0.4.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.demo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-batch')
compile('org.springframework.boot:spring-boot-starter-jdbc')
compile("org.springframework.boot:spring-boot-starter-data-jpa")
compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.9.4'
compile group: 'org.jadira.usertype', name: 'usertype.core', version: '6.0.1.GA'
compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6',
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('org.springframework.batch:spring-batch-test')
}
SpringBatch的部署
首先,进行全局设置。然后,进行JOB的配置。
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
@Bean
public ApplicationContextFactory firstJobContext() {
return new GenericApplicationContextFactory(FirstJobConfiguration.class);
}
@Bean
public ApplicationContextFactory secondJobContext() {
return new GenericApplicationContextFactory(SecondJobConfiguration.class);
}
}
创建实体类
这是按照Json数据的格式。
@Entity
@Table(name = "message")
public class Message {
@Id
@Column(name = "object_id", nullable = false)
private String objectId;
@Column(name = "content")
private String content;
@Column(name = "last_modified_time")
private LocalDateTime lastModifiedTime;
@Column(name = "created_time")
private LocalDateTime createdTime;
}
创建工作
public class MessageMigrationJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory
@Autowired
private EntityManagerFactory entityManager;
// Jobを作成
@Bean
public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) {
return jobBuilderFactory.get("messageMigrationJob")
.start(messageMigrationStep)
.build();
}
// Stepを作成
@Bean
public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader,
@Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter,
@Qualifier("errorWriter") Writer errorWriter) {
return stepBuilderFactory.get("messageMigrationStep")
.<Message, Message>chunk(CHUNK_SIZE)
.reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT)
.listener(new MessageItemReadListener(errorWriter))
.writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
.listener(new MessageWriteListener())
.build();
}
// Readerを作成
@Bean
public FlatFileItemReader<Message> jsonMessageReader() {
FlatFileItemReader<Message> reader = new FlatFileItemReader<>();
// 実際のJson Dataファイルパスを入れる
reader.setResource(new FileSystemResource(new File(MESSAGE_FILE)));
reader.setLineMapper(new MessageLineMapper());
return reader;
}
// Writerを作成
@Bean
public JpaItemWriter<Message> messageItemWriter() {
JpaItemWriter<Message> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManager);
return writer;
}
}
实现数据处理方法。
在这里,使用LineMapper从txt源文件逐行读取数据并将其转换为Message对象。
public class MessageLineMapper implements LineMapper<Message> {
private MappingJsonFactory factory = new MappingJsonFactory();
@Override
public Message mapLine(String line, int lineNumber) throws Exception {
JsonParser parser = factory.createParser(line);
Map<String, Object> map = (Map) parser.readValueAs(Map.class);
Message message = new Message();
... // TODO データ転換ロジック
return message;
}
}
创建properties文件
spring.datasource.url=jdbc:mysql://database
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect
spring.jpa.show-sql=true
spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true
spring.jackson.serialization.write-dates-as-timestamps=false
spring.batch.initialize-schema=ALWAYS
spring.jpa.hibernate.ddl-auto=update
实现Listener
当发生错误时,监听器将错误消息输出到文件中。
阅读监听器
public class MessageItemReadListener implements ItemReadListener<Message> {
private Writer errorWriter;
public MessageItemReadListener(Writer errorWriter) {
this.errorWriter = errorWriter;
}
@Override
public void beforeRead() {
}
@Override
public void afterRead(Message item) {
}
@Override
public void onReadError(Exception ex) {
errorWriter.write(format("%s%n", ex.getMessage()));
}
}
2. 写监听器 (Xiě qì)
public class MessageWriteListener implements ItemWriteListener<Message> {
@Autowired
private Writer errorWriter;
@Override
public void beforeWrite(List<? extends Message> items) {
}
@Override
public void afterWrite(List<? extends Message> items) {
}
@Override
public void onWriteError(Exception exception, List<? extends Message> items) {
errorWriter.write(format("%s%n", exception.getMessage()));
for (Message message : items) {
errorWriter.write(format("Failed writing message id: %s", message.getObjectId()));
}
}
}
执行工作
首先,我正在创建一个测试方法。
public static void main(String[] args) {
String jobName = args[0];
try {
ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
JobRegistry jobRegistry = context.getBean(JobRegistry.class);
Job job = jobRegistry.getJob(jobName);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
JobExecution jobExecution = jobLauncher.run(job, createJobParams());
if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
throw new RuntimeException(format("%s Job execution failed.", jobName));
}
} catch (Exception e) {
throw new RuntimeException(format("%s Job execution failed.", jobName));
}
}
private static JobParameters createJobParams() {
return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}
将项目打包成jar文件,并尝试通过以下命令来执行。
使用Java命令运行”YOUR_BATCH_NAME.jar”文件,并执行”YOUR_JOB_NAME”作业。
运行结果
最后
非常感谢您读到最后。
如果有任何觉得不对的地方,请毫不犹豫地指出。
谢谢合作。
参考资料:
1. https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/zh-cn/Ch02_SpringBatchArchitecture.html
2. 这是关于Spring批处理的官方网站。