1 Introduction:
Spring batch is open source framework for batch processing.
Batch processing is the execution of a series of programs (jobs) on a computer
without manual intervention.
Spring batch provides mechanisms for processing large amount
of data like transaction management, job processing, resource management,
logging, tracing, conversion of data, interfaces etc.
Normal batch can be provided into three parts
1.
Reading the data
2.
Processing the data
3.
Writing the data
2 Spring
concepts:
Spring is having following concepts.
1.
Jobs
2.
Job launcher
3.
Job instance
4.
Job repositories
5.
Step
6.
Item readers
7.
Item processor
8.
Item writers
2.1
JOB:
Job is a process for starting the batch. Job is an entity
that encapsulate an entire batch process. Job contains the list of steps. Each
step carry single task
<?xml version="1.0"
encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation=”http://www.springframework.org/schema/batch
<import resource=”context.xml” />
<job id=”welcomeJob”
xmlns=”http://www.springframework.org/schema/batch">
<step id="step1">
<tasklet
ref="welcomeTask" />
</step>
</job>
<bean name="welcomeTask" class="com.chandra.batch.WelcomeTask"/>
</beans>
|
2.2
Job Launcher
JobLauncher is a simple interface that controlling the jobs.
This class contains the run method which takes the job and job parameters.
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"
/>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
|
String[] springConfig = { "com/chandra/batch/job-read-files.xml" };
ApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
JobLauncher jobLauncher = (JobLauncher)
context.getBean("jobLauncher");
|
2.3
Job Instance:
This is a single run for given job. This is unique and
identifiable. Job instance can be restarted in case they were not completed
successfully and the job is restart able otherwise an error will be raised.
2.4
Step:
Steps are mainly the parts that compose a job. Step is a
part of job contains the all necessary information to execute the batch processing.
Step may contains the simple task or combination of reader, processor and
writer.
EX: consider that if we want to save data in database but
before saving we need to create the some basic information of user like IP
address, user id, session id and saving data in database after that logging
statement that object id and object name and table information to database.
In the above example we can see three steps 1. User
information 2. Saving data in DB 3. Auditing data
2.4.1
Step with simple task:
<step id="step1">
<tasklet ref="welcomeTask" />
</step>
|
2.4.2
Step with reader writer and processor:
<step id="step" next="nextStep">
<tasklet>
<chunk reader="customItemReader" writer="customItemWriter"
processor="customItemProcessor" commit-interval="10" />
</tasklet>
</step>
|
2.5
Job Repositories:
Job repositories are responsible to store and update
metadata information about job instance executions and job context.
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"
/>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
|
2.6
Item readers:
Readers are responsible of data retrieval. Readers reads the
data from source and pass the information to process objects. Some of the
readers are.
·
AmqpItemReader
·
AggregateItemReader
·
FlatFileItemReader
·
HibernateCursorItemReader
·
HibernatePagingItemReader
·
IbatisPagingItemReader
·
ItemReaderAdapter
·
JdbcCursorItemReader
·
JdbcPagingItemReader
·
JmsItemReader
·
JpaPagingItemReader
·
ListItemReader
·
MongoItemReader
·
Neo4jItemReader
·
RepositoryItemReader
·
StoredProcedureItemReader
·
StaxEventItemReader
2.7
Item writers:
Writers are responsible for writing the data on desired
places like database, xls…etc.
·
AbstractItemStreamItemWriter
·
AmqpItemWriter
·
CompositeItemWriter
·
FlatFileItemWriter
·
GemfireItemWriter
·
HibernateItemWriter
·
IbatisBatchItemWriter
·
ItemWriterAdapter
·
JdbcBatchItemWriter
·
JmsItemWriter
·
JpaItemWriter
·
MimeMessageItemWriter
·
MongoItemWriter
·
Neo4jItemWriter
·
StaxEventItemWriter
·
RepositoryItemWriter
2.8
Item processor:
Processors are responsible for processing data from input to
desired output
·
AsyncItemProcessor
·
ClassifierCompositeItemProcessor
·
CompositeItemProcessor
·
CustomerCreditIncreaseProcessor
·
CustomerUpdateProcessor
·
ItemProcessorAdapter
·
ItemProcessorAdapter
·
MessageProcessor
·
PassThroughItemProcessor
·
ScriptItemProcessor
·
SleepingItemProcessor
·
StagingItemProcessor
·
TradeProcessor
·
UserMailItemProcessor
·
ValidatingItemProcessor
3 Spring
batch application for reading data from csv and stores it in database
Following is the project structure to create batch
application to read and write data in database.
·
To create application we need following details
o
Database configuration (database.xml)
o
Job repositories (context.xml)
o
Simple jobs (job-read-files.xml)
o
Csv file which holds the data(Employee.csv)
o
Item object (EmployeeItem.java) which holds the
csv file properties in the form of all strings
o
Domain object (Employee.java) which is same as
database contains the hibernate annotations
o
Mapper class (EmployeeFieldSetMapper.java) maps
csv data into the strings
o
Reader
o
Processor
o
Writer
o
Main application
3.1
Database configuration:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
<bean name="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
<property name="url" value="jdbc:oracle:thin:@localhost:1521:EPHSW01"
/>
<property name="username" value="sql" />
<property name="password" value="sql" />
</bean>
<bean name="sessionFactory"
class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="hibernateProperties">
<props>
<prop key="hibenrate.dialect">org.hibernate.dialect.OracleDialect</prop>
<prop key="hibernate.show_sql">true</prop>
<prop key="hibernate.hbm2ddl.auto">create</prop>
</props>
</property>
<!-- <property name="mappingResources">
<list> <value>com/chandra/hibernate/Emp.hbm.xml</value>
</list>
</property> -->
<property name="annotatedClasses">
<list>
<value>com.chandra.batch.Employee</value>
</list>
</property>
</bean>
<bean name="hibernateTemplate" class="org.springframework.orm.hibernate3.HibernateTemplate">
<property name="sessionFactory" ref="sessionFactory"></property>
</bean>
</beans>
|
3.2
Job repositories
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"
/>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>
|
3.3
Simple job configuration:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
<batch:job id="reportJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="csvReader" processor="csvProcessor"
writer="csvWriter" commit-interval="1">
</batch:chunk>
</batch:tasklet>
</batch:step>
<batch:listeners merge="true">
<batch:listener ref="jobLogListener" />
</batch:listeners>
</batch:job>
<bean name="csvReader" class="com.chandra.batch.CsvReader" />
<bean name="csvProcessor" class="com.chandra.batch.CsvProcessor" />
<bean name="csvWriter" class="com.chandra.batch.CsvWriter">
<property name="hibernateTemplate" ref="hibernateTemplate" />
</bean>
<bean name="jobLogListener" class="com.chandra.batch.JobLogListener"
/>
</beans>
|
3.4
Item class
public class EmployeeItem implements Serializable
{
private static final long serialVersionUID = 1L;
private String name;
private String salary;
private String address;
}
|
3.5
Domain object:
@Entity
@Table(name = "EMP")
public class Employee implements Serializable
{
private static final long serialVersionUID = 1L;
@Id
@Column(name = "ENAME)
private String name;
@Column(name = "ADDRESS)
private String address;
@Column(name = "SALARY)
private Double salary;
}
|
3.6
Field set mapper
public class EmployeeFieldSetMapper<T extends EmployeeItem> implements FieldSetMapper<EmployeeItem>
{
@Override
public EmployeeItem mapFieldSet(FieldSet
fieldSet) throws BindException
{
EmployeeItem item = new EmployeeItem();
item.setName(fieldSet.readString(0));
item.setAddress(fieldSet.readString(1));
item.setSalary(fieldSet.readString(2));
return item;
}
}
|
3.7
Reader:
public class CsvReader extends FlatFileItemReader<EmployeeItem>
{
private static final String[] FIELDS = { "Employee Name", "Employee Address", "Employee salary" };
@Override
public void afterPropertiesSet() throws Exception
{
File file = new File("Employee.csv");
FileInputStream fis = new FileInputStream(file);
DigestInputStream
digestInputStream = new
DigestInputStream(fis, MessageDigest.getInstance("MD5"));
InputStreamResource
resource = new
InputStreamResource(digestInputStream, "Employee.csv");
setResource(resource);
setLineMapper(new DefaultLineMapper<EmployeeItem>()
{
{
setLineTokenizer(new DelimitedLineTokenizer()
{
{
setNames(FIELDS);
setStrict(false);
}
});
setFieldSetMapper(new
EmployeeFieldSetMapper<EmployeeItem>());
}
});
// Skip header line
setLinesToSkip(1);
}
}
|
3.8
Processor:
public class CsvProcessor implements ItemProcessor<EmployeeItem, Employee>
{
@Override
public Employee process(EmployeeItem
employeeItem) throws Exception
{
Employee emp = new Employee();
emp.setName(employeeItem.getName());
emp.setAddress(employeeItem.getAddress());
emp.setSalary(Double.parseDouble(employeeItem.getSalary()));
return emp;
}
}
|
3.9
Writer:
public class CsvWriter implements ItemWriter<Employee>
{
public HibernateTemplate hibernateTemplate;
@Override
public void write(List<? extends Employee> items) throws Exception
{
if (items != null)
{
for (Employee item : items)
{
if (item == null)
{
continue;
}
hibernateTemplate.save(item);
}
}
}
public HibernateTemplate
getHibernateTemplate()
{
return hibernateTemplate;
}
public void setHibernateTemplate(HibernateTemplate hibernateTemplate)
{
this.hibernateTemplate = hibernateTemplate;
}
}
|
3.10
Listener:
public class JobLogListener implements JobExecutionListener, InitializingBean
{
@Override
public void afterPropertiesSet() throws Exception
{
System.out.println("after
properties");
}
@Override
public void afterJob(JobExecution arg0)
{
System.out.println("after
job");
}
@Override
public void beforeJob(JobExecution arg0)
{
System.out.println("before
job");
}
}
|
3.11
Main app:
public class App
{
public static void
main(String[] args) throws SQLException
{
String[] springConfig = { "com/chandra/batch/job-read-files.xml", "com/chandra/batch/context.xml", "com/chandra/batch/database.xml" };
ApplicationContext context
= new
ClassPathXmlApplicationContext(springConfig);
JobLauncher jobLauncher =
(JobLauncher) context.getBean("jobLauncher");
Job job = (Job)
context.getBean("reportJob");
try
{
jobLauncher.run(job, new JobParameters());
} catch (Exception e)
{
e.printStackTrace();
}
}
}
|
good post! @Chandra could you share complete example to read from sftp server and batch process to database.
ReplyDelete