全面解释!使用SpringBatch进行批处理

首先

这篇文章将从零开始介绍Spring Batch,并通过利用它进行环境配置来实现批处理。总体而言,批处理过程是这样的:当批处理程序运行时,它会从某个JSON文件中读取数据,并将其写入MySQL数据库中。

SpringBatch是什么

Spring Batch是Spring的子项目之一。它以Spring为基础,是一个批处理应用程序框架。

SpringBatch的配置

在下图中展示了Spring Batch的主要组成部分和整个处理流程(块模型)。

请注意:本机中文文本是由人工智能生成的,仅供参考。

springbatch.png
構成要素役割JobSpring Batchにおけるバッチアプリケーションの一連の処理をまとめた1実行単位。StepJobを構成する処理の単位。1つのJobに1~N個のStepをもたせることが可能。JobLauncherJobを起動するためのインターフェース。ItemReaderチャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。ItemProcessorチャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。ItemWriterチャンクモデルを実装する際に、データの入力/加工/出力の3つに分割するためのインターフェース。JobRepositoryJobやStepの状況を管理する機構。これらの管理情報は、Spring Batchが規定するテーブルスキーマを元にデータベース上に永続化される。

环境构建

    • SpringBatch

 

    • SpringBoot

 

    • Java

 

    Grade
grade.png

源代码的实现

接下来,我们将创建一个项目并进行说明。

引入图书馆

这个文件是一个图书馆请求。

buildscript {
   ext {
      springBootVersion = '2.0.4.RELEASE'
   }
   repositories {
      mavenCentral()
   }
   dependencies {
      classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
   }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.demo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
   mavenCentral()
}

dependencies {
   compile('org.springframework.boot:spring-boot-starter-batch')
   compile('org.springframework.boot:spring-boot-starter-jdbc')
   compile("org.springframework.boot:spring-boot-starter-data-jpa")
   compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.9.4'
   compile group: 'org.jadira.usertype', name: 'usertype.core', version: '6.0.1.GA'
   compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6',
   testCompile('org.springframework.boot:spring-boot-starter-test')
   testCompile('org.springframework.batch:spring-batch-test')
}

SpringBatch的部署

首先,进行全局设置。然后,进行JOB的配置。

@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
    @Bean
    public ApplicationContextFactory firstJobContext() {
        return new GenericApplicationContextFactory(FirstJobConfiguration.class);
    }

    @Bean
    public ApplicationContextFactory secondJobContext() {
        return new GenericApplicationContextFactory(SecondJobConfiguration.class);
    }

}

创建实体类

这是按照Json数据的格式。

@Entity
@Table(name = "message")
public class Message {
    @Id
    @Column(name = "object_id", nullable = false)
    private String objectId;

    @Column(name = "content")
    private String content;

    @Column(name = "last_modified_time")
    private LocalDateTime lastModifiedTime;

    @Column(name = "created_time")
    private LocalDateTime createdTime;
}

创建工作

public class MessageMigrationJobConfiguration {
  @Autowired
  private JobBuilderFactory jobBuilderFactory;

  @Autowired
  private StepBuilderFactory stepBuilderFactory

  @Autowired
  private EntityManagerFactory entityManager;

  // Jobを作成
  @Bean
  public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step    messageMigrationStep) {
    return jobBuilderFactory.get("messageMigrationJob")
            .start(messageMigrationStep)
            .build();
  }

  // Stepを作成
  @Bean
  public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader,
                                 @Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter,
                                 @Qualifier("errorWriter") Writer errorWriter) {
    return stepBuilderFactory.get("messageMigrationStep")
            .<Message, Message>chunk(CHUNK_SIZE)
            .reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT)
            .listener(new MessageItemReadListener(errorWriter))
            .writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
            .listener(new MessageWriteListener())
            .build();
  }

  // Readerを作成
  @Bean
  public FlatFileItemReader<Message> jsonMessageReader() {
    FlatFileItemReader<Message> reader = new FlatFileItemReader<>();
    // 実際のJson Dataファイルパスを入れる
    reader.setResource(new FileSystemResource(new File(MESSAGE_FILE)));
    reader.setLineMapper(new MessageLineMapper());
    return reader;
 }

  // Writerを作成
  @Bean
  public JpaItemWriter<Message> messageItemWriter() {
    JpaItemWriter<Message> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManager);
    return writer;
  }
}

实现数据处理方法。

在这里,使用LineMapper从txt源文件逐行读取数据并将其转换为Message对象。

public class MessageLineMapper implements LineMapper<Message> {
    private MappingJsonFactory factory = new MappingJsonFactory();

    @Override
    public Message mapLine(String line, int lineNumber) throws Exception {   
        JsonParser parser = factory.createParser(line);
        Map<String, Object> map = (Map) parser.readValueAs(Map.class);
        Message message = new Message();
        ... // TODO データ転換ロジック
        return message;
    }
}

创建properties文件

spring.datasource.url=jdbc:mysql://database
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect
spring.jpa.show-sql=true
spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true
spring.jackson.serialization.write-dates-as-timestamps=false
spring.batch.initialize-schema=ALWAYS
spring.jpa.hibernate.ddl-auto=update

实现Listener

当发生错误时,监听器将错误消息输出到文件中。

阅读监听器

public class MessageItemReadListener implements ItemReadListener<Message> {
    private Writer errorWriter;

    public MessageItemReadListener(Writer errorWriter) {
        this.errorWriter = errorWriter;
    }

    @Override
    public void beforeRead() {
    }

    @Override
    public void afterRead(Message item) {
    }

    @Override
    public void onReadError(Exception ex) {
         errorWriter.write(format("%s%n", ex.getMessage()));
    }
}

2. 写监听器 (Xiě qì)

public class MessageWriteListener implements ItemWriteListener<Message> {

    @Autowired
    private Writer errorWriter;

    @Override
    public void beforeWrite(List<? extends Message> items) {
    }

    @Override
    public void afterWrite(List<? extends Message> items) {
    }

    @Override
    public void onWriteError(Exception exception, List<? extends Message> items) {
        errorWriter.write(format("%s%n", exception.getMessage()));
        for (Message message : items) {
            errorWriter.write(format("Failed writing message id: %s", message.getObjectId()));
        }
    }
}

执行工作

首先,我正在创建一个测试方法。

public static void main(String[] args) {
    String jobName = args[0];

    try {
        ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
        JobRegistry jobRegistry = context.getBean(JobRegistry.class);
        Job job = jobRegistry.getJob(jobName);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        JobExecution jobExecution = jobLauncher.run(job, createJobParams());
        if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
            throw new RuntimeException(format("%s Job execution failed.", jobName));
        }
    } catch (Exception e) {
        throw new RuntimeException(format("%s Job execution failed.", jobName));
    }
}

private static JobParameters createJobParams() {
    return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}

将项目打包成jar文件,并尝试通过以下命令来执行。

使用Java命令运行”YOUR_BATCH_NAME.jar”文件,并执行”YOUR_JOB_NAME”作业。

运行结果

data.png
erro.png

最后

非常感谢您读到最后。
如果有任何觉得不对的地方,请毫不犹豫地指出。
谢谢合作。

参考资料:
1. https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/zh-cn/Ch02_SpringBatchArchitecture.html

2. 这是关于Spring批处理的官方网站。

广告
将在 10 秒后关闭
bannerAds