Spring Batch Tutorial: Creating a Custom ItemReader

Spring Batch has a good support for reading the input data of a batch job from different data sources such as files (CSV, XML) and databases.

However, it's quite common that you have to read the input data from a data source that's not supported out of the box. This means that you have to implement a component that reads the input data from your data source.

This blog post helps you to solve that problem. After you have read this blog post, you:

  • Understand how you can implement a custom ItemReader.
  • Know how you can configure the ItemReader bean which provides the input data for your batch job.

Let's begin.

This blog post assumes that:

Creating a Custom ItemReader

You can create a custom ItemReader by following these steps:

First, you have to create a class that implements the ItemReader<T> interface and provide the type of the returned object as a type parameter.

Second, you have to implement the T read() method of the ItemReader<T> interface by following these rules:

  • The read() method returns an object that contains the information of the next item.
  • If the next item isn't found, the read() method must return null.

Let’s create a custom ItemReader that returns the student information of an online testing course as StudentDTO objects which are read from the memory.

The StudentDTO class is a simple data transfer object, and its source code looks as follows:

public class StudentDTO {
  
    private String emailAddress;
    private String name;
    private String purchasedPackage;
  
    public StudentDTO() {}
  
    public String getEmailAddress() {
        return emailAddress;
    }
  
    public String getName() {
        return name;
    }
  
    public String getPurchasedPackage() {
        return purchasedPackage;
    }
  
    public void setEmailAddress(String emailAddress) {
        this.emailAddress = emailAddress;
    }
  
    public void setName(String name) {
        this.name = name;
    }
  
    public void setPurchasedPackage(String purchasedPackage) {
        this.purchasedPackage = purchasedPackage;
    }
}

You can implement your ItemReader by following these steps:

First, you have to create a class that implements the ItemReader<T> interface and specify the type of the object which is returned by the T read() method. After you have created this class, its source code looks as follows:

import org.springframework.batch.item.ItemReader;

public class InMemoryStudentReader implements ItemReader<StudentDTO> {
}

Second, you have to initialize the input data that's returned by your ItemReader. You can initialize your input data by following these steps:

  1. Add a List<Student> field to your ItemReader class. This field contains the student information of the course.
  2. Add an int field called nextStudentIndex to your ItemReader class. This field contains the index of the next StudentDTO object that's returned by your ItemReader.
  3. Add a private initialize() method to your ItemReader class. This method creates the student data and sets the index of the next student to 0.
  4. Create a constructor that invokes the initialize() method.

After you have initialized your input data, the source code of your ItemReader class looks as follows:

import org.springframework.batch.item.ItemReader;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class InMemoryStudentReader implements ItemReader<StudentDTO> {

    private int nextStudentIndex;
    private List<StudentDTO> studentData;

    InMemoryStudentReader() {
        initialize();
    }

    private void initialize() {
        StudentDTO tony = new StudentDTO();
        tony.setEmailAddress("tony.tester@gmail.com");
        tony.setName("Tony Tester");
        tony.setPurchasedPackage("master");

        StudentDTO nick = new StudentDTO();
        nick.setEmailAddress("nick.newbie@gmail.com");
        nick.setName("Nick Newbie");
        nick.setPurchasedPackage("starter");

        StudentDTO ian = new StudentDTO();
        ian.setEmailAddress("ian.intermediate@gmail.com");
        ian.setName("Ian Intermediate");
        ian.setPurchasedPackage("intermediate");

        studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian));
        nextStudentIndex = 0;
    }
}

Third, you have to implement the read() method of the ItemReader interface by following these rules:

  • If the next student is found, return the found StudentDTO object and increase the value of the nextStudentIndex field by 1.
  • If the next student isn't found, set the value of the nextStudentIndex field to 0.
  • If the next student isn't found, return null.

After you have implemented the read() method, the source code of your ItemReader class looks as follows:

import org.springframework.batch.item.ItemReader;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class InMemoryStudentReader implements ItemReader<StudentDTO> {

    private int nextStudentIndex;
    private List<StudentDTO> studentData;

    InMemoryStudentReader() {
        initialize();
    }

    private void initialize() {
        StudentDTO tony = new StudentDTO();
        tony.setEmailAddress("tony.tester@gmail.com");
        tony.setName("Tony Tester");
        tony.setPurchasedPackage("master");

        StudentDTO nick = new StudentDTO();
        nick.setEmailAddress("nick.newbie@gmail.com");
        nick.setName("Nick Newbie");
        nick.setPurchasedPackage("starter");

        StudentDTO ian = new StudentDTO();
        ian.setEmailAddress("ian.intermediate@gmail.com");
        ian.setName("Ian Intermediate");
        ian.setPurchasedPackage("intermediate");

        studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian));
        nextStudentIndex = 0;
    }

    @Override
    public StudentDTO read() throws Exception {
        StudentDTO nextStudent = null;

        if (nextStudentIndex < studentData.size()) {
            nextStudent = studentData.get(nextStudentIndex);
            nextStudentIndex++;
        }
        else {
            nextStudentIndex = 0;
        }

        return nextStudent;
    }
}

After you have created your custom ItemReader class, you have to configure the ItemReader bean that provides the input data for your Spring Batch job. Next, you will find out how you can configure this bean.

Configuring the ItemReader Bean

You can configure your ItemReader bean by following these steps:

First, you have to create the configuration class that contains the beans which describe the flow of your batch job. The source code of your configuration class looks as follows:

import org.springframework.context.annotation.Configuration;

@Configuration
public class SpringBatchExampleJobConfig {
}

Second, you have to create the method that configures your ItemReader bean. This method must return an ItemReader<StudentDTO> object. After you have created this method, the source code of your configuration class looks as follows:

import org.springframework.batch.item.ItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SpringBatchExampleJobConfig {

    @Bean
    public ItemReader<StudentDTO> itemReader() {

    }
}

Third, you have to ensure that the ItemReader() method returns a new InMemoryStudentReader object. After you have implemented the ItemReader() method, the source code of your configuration class looks as follows:

import org.springframework.batch.item.ItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SpringBatchExampleJobConfig {

    @Bean
    public ItemReader<StudentDTO> itemReader() {
        return new InMemoryStudentReader();
    }
}

You can now create a custom ItemReader class and you understand how you can configure an ItemReader bean which provides the input data for your batch job. Let’s summarize what you learned from this blog post.

Summary

This blog post has taught you four things:

  • You can create a custom ItemReader by implementing the ItemReader<T> interface.
  • When you implement the ItemReader<T> interface, you must provide the type of the returned object as a type parameter.
  • The T read() method of the ItemReader<T> interface must return the next T object. If the next object isn't found, it must return null.
  • After you have created your custom ItemReader class, you have to configure the ItemReader bean that provides the input data for your Spring Batch job.

The next part of this tutorial describes how you can create a custom ItemReader that reads the input data of your batch job by using an external REST API.

P.S. You can get the example applications of this blog post from Github.

33 comments… add one
  • msj May 3, 2016 @ 5:59

    thank you

    • Petri May 4, 2016 @ 22:20

      You are welcome!

  • Naveen May 20, 2016 @ 13:45

    Hi Petri,

    I have come across a scenario, where I have declared one exception in skippable exception, to skip the invalid records detected in item processor. Could you please help me with one possible way through which I can keep track of the all skipped records and at the end using job execution listener send a mail of all those skipped records.

    Any suggestions are appreciated.

    • Petri May 21, 2016 @ 10:13

      Hi Naveen,

      You can use a SkipListener for this purpose. It seems that you should implement your own SkipListener that keeps track of the skipped records (for example, it can save them to database). When the job is finished, you can simply fetch the skipped records and send the email that contains the required information.

    • Nizam Jan 2, 2017 @ 6:01

      Hi Petri,
      I am using SpringBoot. my integration context xml file has int:jdbc inbound channel where I give database select query. I want Spring batch to load multiple table/multi records and process and write to XML output. Please give me some suggetion

      • Petri Jan 2, 2017 @ 12:25

        Hi,

        Take a look at this blog post. It explains how you can use the Spring Integration JDBC inbound channel adapter.

  • Chris Nov 7, 2016 @ 22:22

    Hi there,

    I have tried this but am having problems when I run my code with an IllegalArgumentException - Assertion Failure - this argument is required; it must not be null.

    Any ideas what could be causing that?

    • Petri Nov 7, 2016 @ 22:30

      Hi,

      Did you try to create a batch job that doesn't have a reader and a writer?

  • Mayank Nov 15, 2016 @ 6:44

    I have batch flow which has a reader and a writer. The reader is jdbccursorItemReader. The writer is custom Itemwriter which has a flatfileitemwriter property.

    My question is, if the reader, does not fetch any row, will the writer still execute?

    PS: in case the reader fetches 0 records i have to write an empty file using flatfileitemwriter.

    • Petri Nov 16, 2016 @ 22:28

      Hi,

      If the reader cannot find the next input record, its read() must return null. This means that the writer won't write any lines to the file if the reader cannot find any records from the database. That being said, it does create the file and doesn't delete it even if it is empty (as long as you use the default settings).

  • saisurya.k Dec 30, 2016 @ 13:45

    @Petri,
    What about the thread safety of primitives used in ItemReader(nextStudentIndex )

    • Petri Dec 31, 2016 @ 9:58

      Well, since you asked that question, I assume that you realized that the example is not thread safe. If you need to ensure that the primitives are thread safe, you need to replace the int variable (nextStudentIndex) with AtomicInteger.

  • hartono Jan 18, 2018 @ 8:52

    Hi Petri,
    I want to create an ItemReader for reading data from database and the query for this reader i need parameter on the query to be replaced with some value i get from previous step (I was able to pass the value using execution context)

    for example my query is "select some_field from some_table where id = X"
    and I got the X value from previous step. How to construct the JdbcCursorItemReader?
    I was thinking maybe I can use setPreparedStatementSetter() like in your github example
    and the value I can get it using execution context from previous step.
    What do you think?

    I'm working with 2 datasource (mysql and postgresql) so I'm thinking maybe I create first step which read a value I needed and then pass that value (Long) to the next step and then in the second step I will contruct it using JdbcCursorItemReader and set prepared statement.
    I was thinking maybe I need a custom ItemReader for this?

    • Petri Jan 20, 2018 @ 13:05

      Hi,

      Like you said, if you need to pass information between steps, you can save this information to the job execution context.

      How to construct the JdbcCursorItemReader?
      I was thinking maybe I can use setPreparedStatementSetter() like in your github example
      and the value I can get it using execution context from previous step.
      What do you think?

      You can construct the JdBcCursorItemReader by following the instruction given in this blog post. Also, your idea sounds good to me.

      I was thinking maybe I need a custom ItemReader for this?

      You can read the required information and save it to the job execution context without writing a custom ItemReader (check the Spring Batch reference manual). However, you need to create a custom ItemReader that retrieves the information from the job execution context and finds the processed data by using this information.

      If you have any additional questions, don't hesitate to ask them.

  • Michael Sep 6, 2018 @ 5:09

    Hi Petri,

    I have this spring batch job that fetches data from a mongodb database and stores the data in a postgres database. The job is scheduled to run at intervals (30mins) so newly inserted documents to the mongodb are migrated to the postgres database.

    On first run, records are migrated successfully, but subsequent triggers by the scheduler don't migrate the new records. I'm using the MongoItemReader implementation in spring and i've validated that the queries are fine.

    Any idea what i could be doing wrong? I would appreciate your thoughts on this

  • Pradeep Nov 10, 2021 @ 7:38

    Please help with some examples on LdifReader for Ldif processing.
    Thanks

  • Mamadou Seck May 11, 2022 @ 4:45

    Hi Petri,
    thanks you for your helps.
    in my case i implement ItemReader and I use JdbcTemplate to fetch data to the database. when application is running in the first time it works well. but with my @schedule method I will check any 5 minutes if it has a new data i read this but it doesn't works

    • Petri May 11, 2022 @ 19:35

      Do you mean that the method that's annotated with the @Scheduled annotation isn't invoked when it should be run? If so, could you let me see the method declaration of the scheduled method (including the @Scheduled annotation)? Also, are you using Spring Boot?

  • Mahith Kumar Jul 26, 2022 @ 23:29

    Hi Petri,
    I have this spring batch job that fetches data from elasticsearch database and stores the data in a postgres database. The job is scheduled to run at intervals (30mins) so newly inserted documents to the elasticsearch are migrated to the postgres database.
    On first run, records are migrated successfully, but subsequent triggers by the scheduler don't migrate the new record.
    I created a custom ElasticItemReader and in it's constructor I get the records from elasticsearch and store it in the list just like your initialze() example. I think this is the problem because in the subsequent triggers, this constructor doesn't get called. I need it to call the elasticsearch everytime job gets triggered.
    I'm a newbie to this so you're help would be much appreciated. Thanks

    Any idea what i could be doing wrong? I would appreciate your thoughts on this

    • Petri Jul 26, 2022 @ 23:46

      Hi Mahith,

      I just realized that I made a terrible mistake because I didn't emphasize that my example shouldn't be used in real software projects. Real item readers should read the input data when its read() method is invoked, not when the reader is created by the Spring container.

      In other words, like you suspected, the problem is that your item reader reads the input data only once. You have to move the logic which reads the input data from Elasticsearch to the read() method. Also, remember that because your batch job processes only "new" documents, you need to have a way to query the documents which were saved to the Elasticsearch index after your batch job was run. One way to solve this is to ensure that each record found from the Elasticsearch index has a creation timestamp. Then you can simply keep track of the previous execution time of your Spring Batch job and query only the new documents from Elasticsearch.

      If you have any additional questions, don't hesitate to ask them. I am more than happy to help.

      • Mahith Kumar Jul 27, 2022 @ 14:05

        Thanks a lot! Yes I moved the logic there and it's working now. I actually do have another question.
        So problem here is I will be getting some job information along with the trigger event which I need to use inside my Job across reader, processor and writer. The information is in a form of an Object (say JobInformation). How do I do that? I read a few questions on stack overflow and they told to use JobParamaters. But this does not support a custom object? it only has primitive one's (integer, double, string).
        Thank you.

        • Petri Jul 28, 2022 @ 9:37

          Hi,

          As you already figured out, job parameters don't support "custom" objects. However, you can get around this restriction by setting multiple job parameters. For example, if your JobInformation class would have two fields: currentTime and lastExecutionTime, you could use job parameters called: JobInformation_currentTime and JobInformation_lastExecutionTime. Also, it's probably a good idea to implement a "mapper" that can set these job parameters and create a new JobInformation object by using the values of these job parameters.

          Again, if you have any questions, don't hesitate to ask them.

          • Mahith Kumar Jul 28, 2022 @ 14:29

            Ok that would work. Thanks a lot! You've been really helpful :)

          • Petri Jul 29, 2022 @ 20:52

            You are welcome. It's good to hear that I was able to help you out.

  • Anonymous May 25, 2023 @ 10:20

    Hi Patri.

    Can we implement this for more chunk size

    • Petri May 25, 2023 @ 21:09

      Hi,

      Sure. The ItemReader isn't aware of the chunk size because the chunk size is a property of a step, and the chunk size specifies the number of items which are persisted into the database inside one transaction. That being said, you should take the chunk size into account when you implement your ItemReader because it doesn't make any sense to read items from the database one by one if you use a "large" chunk size.

      If you need additional information, you should take a look at the Spring Batch reference manual. Also, if you have any additional questions, don't hesitate to ask them.

  • Kasik May 26, 2023 @ 9:15

    Hi Petri!

    Wondering if you have any implementation for JdbcPagingItemReader in ItemReader that has a dynamic query driven by Job Parameters?

    • Petri Jun 1, 2023 @ 18:14

      Hi,

      Unfortunately I don't have anything which I can share right away. That said, if you can clarify your requirements a bit, I might write a small proof of concept kind of thing. I am especially interested in one thing: are the job parameters static (values are same for every execution) or dynamic (like the last execution time)?

Leave a Reply