Spring Batch has a good support for reading the input data of a batch job from different data sources such as files (CSV, XML) and databases.
However, it's quite common that you have to read the input data from a data source that's not supported out of the box. This means that you have to implement a component that reads the input data from your data source.
This blog post helps you to solve that problem. After you have read this blog post, you:
- Understand how you can implement a custom
ItemReader
. - Know how you can configure the
ItemReader
bean which provides the input data for your batch job.
Let's begin.
- You are familiar with Spring Batch
- You can get the required dependencies with Maven or Gradle
Creating a Custom ItemReader
You can create a custom ItemReader
by following these steps:
First, you have to create a class that implements the ItemReader<T>
interface and provide the type of the returned object as a type parameter.
Second, you have to implement the T read()
method of the ItemReader<T>
interface by following these rules:
- The
read()
method returns an object that contains the information of the next item. - If the next item isn't found, the
read()
method must returnnull
.
Let’s create a custom ItemReader
that returns the student information of an online testing course as StudentDTO
objects which are read from the memory.
The StudentDTO
class is a simple data transfer object, and its source code looks as follows:
public class StudentDTO { private String emailAddress; private String name; private String purchasedPackage; public StudentDTO() {} public String getEmailAddress() { return emailAddress; } public String getName() { return name; } public String getPurchasedPackage() { return purchasedPackage; } public void setEmailAddress(String emailAddress) { this.emailAddress = emailAddress; } public void setName(String name) { this.name = name; } public void setPurchasedPackage(String purchasedPackage) { this.purchasedPackage = purchasedPackage; } }
You can implement your ItemReader
by following these steps:
First, you have to create a class that implements the ItemReader<T>
interface and specify the type of the object which is returned by the T read()
method. After you have created this class, its source code looks as follows:
import org.springframework.batch.item.ItemReader; public class InMemoryStudentReader implements ItemReader<StudentDTO> { }
Second, you have to initialize the input data that's returned by your ItemReader
. You can initialize your input data by following these steps:
- Add a
List<Student>
field to yourItemReader
class. This field contains the student information of the course. - Add an
int
field callednextStudentIndex
to yourItemReader
class. This field contains the index of the nextStudentDTO
object that's returned by yourItemReader
. - Add a private
initialize()
method to yourItemReader
class. This method creates the student data and sets the index of the next student to 0. - Create a constructor that invokes the
initialize()
method.
After you have initialized your input data, the source code of your ItemReader
class looks as follows:
import org.springframework.batch.item.ItemReader; import java.util.Arrays; import java.util.Collections; import java.util.List; public class InMemoryStudentReader implements ItemReader<StudentDTO> { private int nextStudentIndex; private List<StudentDTO> studentData; InMemoryStudentReader() { initialize(); } private void initialize() { StudentDTO tony = new StudentDTO(); tony.setEmailAddress("tony.tester@gmail.com"); tony.setName("Tony Tester"); tony.setPurchasedPackage("master"); StudentDTO nick = new StudentDTO(); nick.setEmailAddress("nick.newbie@gmail.com"); nick.setName("Nick Newbie"); nick.setPurchasedPackage("starter"); StudentDTO ian = new StudentDTO(); ian.setEmailAddress("ian.intermediate@gmail.com"); ian.setName("Ian Intermediate"); ian.setPurchasedPackage("intermediate"); studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian)); nextStudentIndex = 0; } }
Third, you have to implement the read()
method of the ItemReader
interface by following these rules:
- If the next student is found, return the found
StudentDTO
object and increase the value of thenextStudentIndex
field by 1. - If the next student isn't found, set the value of the
nextStudentIndex
field to 0. - If the next student isn't found, return
null
.
After you have implemented the read()
method, the source code of your ItemReader
class looks as follows:
import org.springframework.batch.item.ItemReader; import java.util.Arrays; import java.util.Collections; import java.util.List; public class InMemoryStudentReader implements ItemReader<StudentDTO> { private int nextStudentIndex; private List<StudentDTO> studentData; InMemoryStudentReader() { initialize(); } private void initialize() { StudentDTO tony = new StudentDTO(); tony.setEmailAddress("tony.tester@gmail.com"); tony.setName("Tony Tester"); tony.setPurchasedPackage("master"); StudentDTO nick = new StudentDTO(); nick.setEmailAddress("nick.newbie@gmail.com"); nick.setName("Nick Newbie"); nick.setPurchasedPackage("starter"); StudentDTO ian = new StudentDTO(); ian.setEmailAddress("ian.intermediate@gmail.com"); ian.setName("Ian Intermediate"); ian.setPurchasedPackage("intermediate"); studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian)); nextStudentIndex = 0; } @Override public StudentDTO read() throws Exception { StudentDTO nextStudent = null; if (nextStudentIndex < studentData.size()) { nextStudent = studentData.get(nextStudentIndex); nextStudentIndex++; } else { nextStudentIndex = 0; } return nextStudent; } }
After you have created your custom ItemReader
class, you have to configure the ItemReader
bean that provides the input data for your Spring Batch job. Next, you will find out how you can configure this bean.
Configuring the ItemReader Bean
You can configure your ItemReader
bean by following these steps:
First, you have to create the configuration class that contains the beans which describe the flow of your batch job. The source code of your configuration class looks as follows:
import org.springframework.context.annotation.Configuration; @Configuration public class SpringBatchExampleJobConfig { }
Second, you have to create the method that configures your ItemReader
bean. This method must return an ItemReader<StudentDTO>
object. After you have created this method, the source code of your configuration class looks as follows:
import org.springframework.batch.item.ItemReader; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SpringBatchExampleJobConfig { @Bean public ItemReader<StudentDTO> itemReader() { } }
Third, you have to ensure that the ItemReader()
method returns a new InMemoryStudentReader
object. After you have implemented the ItemReader()
method, the source code of your configuration class looks as follows:
import org.springframework.batch.item.ItemReader; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SpringBatchExampleJobConfig { @Bean public ItemReader<StudentDTO> itemReader() { return new InMemoryStudentReader(); } }
You can now create a custom ItemReader
class and you understand how you can configure an ItemReader
bean which provides the input data for your batch job. Let’s summarize what you learned from this blog post.
Summary
This blog post has taught you four things:
- You can create a custom
ItemReader
by implementing theItemReader<T>
interface. - When you implement the
ItemReader<T>
interface, you must provide the type of the returned object as a type parameter. - The
T read()
method of theItemReader<T>
interface must return the nextT
object. If the next object isn't found, it must returnnull
. - After you have created your custom
ItemReader
class, you have to configure theItemReader
bean that provides the input data for your Spring Batch job.
The next part of this tutorial describes how you can create a custom ItemReader
that reads the input data of your batch job by using an external REST API.
P.S. You can get the example applications of this blog post from Github.
thank you
You are welcome!
Hi Petri,
I have come across a scenario, where I have declared one exception in skippable exception, to skip the invalid records detected in item processor. Could you please help me with one possible way through which I can keep track of the all skipped records and at the end using job execution listener send a mail of all those skipped records.
Any suggestions are appreciated.
Hi Naveen,
You can use a
SkipListener
for this purpose. It seems that you should implement your ownSkipListener
that keeps track of the skipped records (for example, it can save them to database). When the job is finished, you can simply fetch the skipped records and send the email that contains the required information.Hi Petri,
I am using SpringBoot. my integration context xml file has int:jdbc inbound channel where I give database select query. I want Spring batch to load multiple table/multi records and process and write to XML output. Please give me some suggetion
Hi,
Take a look at this blog post. It explains how you can use the Spring Integration JDBC inbound channel adapter.
Hi there,
I have tried this but am having problems when I run my code with an IllegalArgumentException - Assertion Failure - this argument is required; it must not be null.
Any ideas what could be causing that?
Hi,
Did you try to create a batch job that doesn't have a reader and a writer?
I can run my batch with a FlatFileItemReader no problem, it's the custom reader which causes this problem for me.
It looks like the problem is caused by either the custom item reader or the configuration class that configures your batch job. Can you add the source code of these classes to Pastebin?
Here you go...
http://pastebin.com/FEG6Ti1G
http://pastebin.com/rq6s7Fj3
http://pastebin.com/Huu2ufeZ
http://pastebin.com/FL9uUAEY
Hi,
Unfortunately I cannot seem to find the problem. Could you add the stack trace to Pastebin as well?
I have batch flow which has a reader and a writer. The reader is jdbccursorItemReader. The writer is custom Itemwriter which has a flatfileitemwriter property.
My question is, if the reader, does not fetch any row, will the writer still execute?
PS: in case the reader fetches 0 records i have to write an empty file using flatfileitemwriter.
Hi,
If the reader cannot find the next input record, its
read()
must returnnull
. This means that the writer won't write any lines to the file if the reader cannot find any records from the database. That being said, it does create the file and doesn't delete it even if it is empty (as long as you use the default settings).@Petri,
What about the thread safety of primitives used in ItemReader(nextStudentIndex )
Well, since you asked that question, I assume that you realized that the example is not thread safe. If you need to ensure that the primitives are thread safe, you need to replace the
int
variable (nextStudentIndex
) withAtomicInteger
.Hi Petri,
I want to create an ItemReader for reading data from database and the query for this reader i need parameter on the query to be replaced with some value i get from previous step (I was able to pass the value using execution context)
for example my query is "select some_field from some_table where id = X"
and I got the X value from previous step. How to construct the JdbcCursorItemReader?
I was thinking maybe I can use setPreparedStatementSetter() like in your github example
and the value I can get it using execution context from previous step.
What do you think?
I'm working with 2 datasource (mysql and postgresql) so I'm thinking maybe I create first step which read a value I needed and then pass that value (Long) to the next step and then in the second step I will contruct it using JdbcCursorItemReader and set prepared statement.
I was thinking maybe I need a custom ItemReader for this?
Hi,
Like you said, if you need to pass information between steps, you can save this information to the job execution context.
You can construct the
JdBcCursorItemReader
by following the instruction given in this blog post. Also, your idea sounds good to me.You can read the required information and save it to the job execution context without writing a custom
ItemReader
(check the Spring Batch reference manual). However, you need to create a customItemReader
that retrieves the information from the job execution context and finds the processed data by using this information.If you have any additional questions, don't hesitate to ask them.
Hi Petri,
I have this spring batch job that fetches data from a mongodb database and stores the data in a postgres database. The job is scheduled to run at intervals (30mins) so newly inserted documents to the mongodb are migrated to the postgres database.
On first run, records are migrated successfully, but subsequent triggers by the scheduler don't migrate the new records. I'm using the MongoItemReader implementation in spring and i've validated that the queries are fine.
Any idea what i could be doing wrong? I would appreciate your thoughts on this
Please help with some examples on LdifReader for Ldif processing.
Thanks
Hi Petri,
thanks you for your helps.
in my case i implement ItemReader and I use JdbcTemplate to fetch data to the database. when application is running in the first time it works well. but with my @schedule method I will check any 5 minutes if it has a new data i read this but it doesn't works
Do you mean that the method that's annotated with the
@Scheduled
annotation isn't invoked when it should be run? If so, could you let me see the method declaration of the scheduled method (including the@Scheduled
annotation)? Also, are you using Spring Boot?Hi Petri,
I have this spring batch job that fetches data from elasticsearch database and stores the data in a postgres database. The job is scheduled to run at intervals (30mins) so newly inserted documents to the elasticsearch are migrated to the postgres database.
On first run, records are migrated successfully, but subsequent triggers by the scheduler don't migrate the new record.
I created a custom ElasticItemReader and in it's constructor I get the records from elasticsearch and store it in the list just like your initialze() example. I think this is the problem because in the subsequent triggers, this constructor doesn't get called. I need it to call the elasticsearch everytime job gets triggered.
I'm a newbie to this so you're help would be much appreciated. Thanks
Any idea what i could be doing wrong? I would appreciate your thoughts on this
Hi Mahith,
I just realized that I made a terrible mistake because I didn't emphasize that my example shouldn't be used in real software projects. Real item readers should read the input data when its
read()
method is invoked, not when the reader is created by the Spring container.In other words, like you suspected, the problem is that your item reader reads the input data only once. You have to move the logic which reads the input data from Elasticsearch to the
read()
method. Also, remember that because your batch job processes only "new" documents, you need to have a way to query the documents which were saved to the Elasticsearch index after your batch job was run. One way to solve this is to ensure that each record found from the Elasticsearch index has a creation timestamp. Then you can simply keep track of the previous execution time of your Spring Batch job and query only the new documents from Elasticsearch.If you have any additional questions, don't hesitate to ask them. I am more than happy to help.
Thanks a lot! Yes I moved the logic there and it's working now. I actually do have another question.
So problem here is I will be getting some job information along with the trigger event which I need to use inside my Job across reader, processor and writer. The information is in a form of an Object (say JobInformation). How do I do that? I read a few questions on stack overflow and they told to use JobParamaters. But this does not support a custom object? it only has primitive one's (integer, double, string).
Thank you.
Hi,
As you already figured out, job parameters don't support "custom" objects. However, you can get around this restriction by setting multiple job parameters. For example, if your
JobInformation
class would have two fields:currentTime
andlastExecutionTime
, you could use job parameters called:JobInformation_currentTime
andJobInformation_lastExecutionTime
. Also, it's probably a good idea to implement a "mapper" that can set these job parameters and create a newJobInformation
object by using the values of these job parameters.Again, if you have any questions, don't hesitate to ask them.
Ok that would work. Thanks a lot! You've been really helpful :)
You are welcome. It's good to hear that I was able to help you out.
Hi Patri.
Can we implement this for more chunk size
Hi,
Sure. The
ItemReader
isn't aware of the chunk size because the chunk size is a property of a step, and the chunk size specifies the number of items which are persisted into the database inside one transaction. That being said, you should take the chunk size into account when you implement yourItemReader
because it doesn't make any sense to read items from the database one by one if you use a "large" chunk size.If you need additional information, you should take a look at the Spring Batch reference manual. Also, if you have any additional questions, don't hesitate to ask them.
Hi Petri!
Wondering if you have any implementation for JdbcPagingItemReader in ItemReader that has a dynamic query driven by Job Parameters?
Hi,
Unfortunately I don't have anything which I can share right away. That said, if you can clarify your requirements a bit, I might write a small proof of concept kind of thing. I am especially interested in one thing: are the job parameters static (values are same for every execution) or dynamic (like the last execution time)?