Friday, 6 November 2015

Spring Batch

1      Introduction:

Spring batch is open source framework for batch processing. Batch processing is the execution of a series of programs (jobs) on a computer without manual intervention.
Spring batch provides mechanisms for processing large amount of data like transaction management, job processing, resource management, logging, tracing, conversion of data, interfaces etc.
Normal batch can be provided into three parts
1.       Reading the data
2.       Processing the data
3.       Writing the data

2      Spring concepts:

Spring is having following concepts.
1.       Jobs
2.       Job launcher
3.       Job instance
4.       Job repositories
5.       Step
6.       Item readers
7.       Item processor
8.       Item writers

2.1    JOB:

Job is a process for starting the batch. Job is an entity that encapsulate an entire batch process. Job contains the list of steps. Each step carry single task
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:batch="http://www.springframework.org/schema/batch"      xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”     
      xsi:schemaLocation=”http://www.springframework.org/schema/batch

      <import resource=”context.xml” />

      <job id=”welcomeJob” xmlns=”http://www.springframework.org/schema/batch">
            <step id="step1">
                  <tasklet ref="welcomeTask" />
            </step>
      </job>

      <bean name="welcomeTask" class="com.chandra.batch.WelcomeTask"/>
</beans>

2.2    Job Launcher

JobLauncher is a simple interface that controlling the jobs. This class contains the run method which takes the job and job parameters.
<bean id="transactionManager"
 class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

      <bean id="jobRepository"
            class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
            <property name="transactionManager" ref="transactionManager" />
      </bean>

      <bean id="jobLauncher"
            class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
            <property name="jobRepository" ref="jobRepository" />
      </bean>
String[] springConfig = { "com/chandra/batch/job-read-files.xml" };
ApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
     

2.3    Job Instance:

This is a single run for given job. This is unique and identifiable. Job instance can be restarted in case they were not completed successfully and the job is restart able otherwise an error will be raised.

2.4    Step:

Steps are mainly the parts that compose a job. Step is a part of job contains the all necessary information to execute the batch processing. Step may contains the simple task or combination of reader, processor and writer.
EX: consider that if we want to save data in database but before saving we need to create the some basic information of user like IP address, user id, session id and saving data in database after that logging statement that object id and object name and table information to database.
In the above example we can see three steps 1. User information 2. Saving data in DB 3. Auditing data

2.4.1    Step with simple task:

<step id="step1">
      <tasklet ref="welcomeTask" />
</step>

2.4.2    Step with reader writer and processor:

<step id="step" next="nextStep">
      <tasklet>
            <chunk reader="customItemReader" writer="customItemWriter"
                        processor="customItemProcessor" commit-interval="10" />
      </tasklet>
</step>

2.5    Job Repositories:

Job repositories are responsible to store and update metadata information about job instance executions and job context.
<bean id="transactionManager"      class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
      <property name="transactionManager" ref="transactionManager" />
</bean>

2.6    Item readers:

Readers are responsible of data retrieval. Readers reads the data from source and pass the information to process objects. Some of the readers are.
·         AmqpItemReader
·         AggregateItemReader
·         FlatFileItemReader
·         HibernateCursorItemReader
·         HibernatePagingItemReader
·         IbatisPagingItemReader
·         ItemReaderAdapter
·         JdbcCursorItemReader
·         JdbcPagingItemReader
·         JmsItemReader
·         JpaPagingItemReader
·         ListItemReader
·         MongoItemReader
·         Neo4jItemReader
·         RepositoryItemReader
·         StoredProcedureItemReader
·         StaxEventItemReader

2.7    Item writers:

Writers are responsible for writing the data on desired places like database, xls…etc.
·         AbstractItemStreamItemWriter
·         AmqpItemWriter
·         CompositeItemWriter
·         FlatFileItemWriter
·         GemfireItemWriter
·         HibernateItemWriter
·         IbatisBatchItemWriter
·         ItemWriterAdapter
·         JdbcBatchItemWriter
·         JmsItemWriter
·         JpaItemWriter
·         MimeMessageItemWriter
·         MongoItemWriter
·         Neo4jItemWriter
·         StaxEventItemWriter
·         RepositoryItemWriter

2.8    Item processor:

Processors are responsible for processing data from input to desired output
·         AsyncItemProcessor
·         ClassifierCompositeItemProcessor
·         CompositeItemProcessor
·         CustomerCreditIncreaseProcessor
·         CustomerUpdateProcessor
·         ItemProcessorAdapter
·         ItemProcessorAdapter
·         MessageProcessor
·         PassThroughItemProcessor
·         ScriptItemProcessor
·         SleepingItemProcessor
·         StagingItemProcessor
·         TradeProcessor
·         UserMailItemProcessor
·         ValidatingItemProcessor





3      Spring batch application for reading data from csv and stores it in database

Following is the project structure to create batch application to read and write data in database.
·         To create application we need following details
o   Database configuration (database.xml)
o   Job repositories (context.xml)
o   Simple jobs (job-read-files.xml)
o   Csv file which holds the data(Employee.csv)
o   Item object (EmployeeItem.java) which holds the csv file properties in the form of all strings
o   Domain object (Employee.java) which is same as database contains the hibernate annotations
o   Mapper class (EmployeeFieldSetMapper.java) maps csv data into the strings
o   Reader
o   Processor
o   Writer
o   Main application

3.1    Database configuration:

<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
      http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

      <bean name="dataSource"
            class="org.springframework.jdbc.datasource.DriverManagerDataSource">
            <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
            <property name="url" value="jdbc:oracle:thin:@localhost:1521:EPHSW01" />
            <property name="username" value="sql" />
            <property name="password" value="sql" />
      </bean>

      <bean name="sessionFactory"
            class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="hibernateProperties">
                  <props>
                        <prop key="hibenrate.dialect">org.hibernate.dialect.OracleDialect</prop>
                        <prop key="hibernate.show_sql">true</prop>
                        <prop key="hibernate.hbm2ddl.auto">create</prop>
                  </props>
            </property>

            <!-- <property name="mappingResources"> <list> <value>com/chandra/hibernate/Emp.hbm.xml</value>
                  </list> </property> -->
            <property name="annotatedClasses">
                  <list>
                        <value>com.chandra.batch.Employee</value>
                  </list>
            </property>
      </bean>

      <bean name="hibernateTemplate" class="org.springframework.orm.hibernate3.HibernateTemplate">
            <property name="sessionFactory" ref="sessionFactory"></property>
      </bean>
</beans>

3.2    Job repositories

<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
      http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

      <bean id="transactionManager"
            class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

      <bean id="jobRepository"
            class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
            <property name="transactionManager" ref="transactionManager" />
      </bean>

      <bean id="jobLauncher"
            class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
            <property name="jobRepository" ref="jobRepository" />
      </bean>
</beans>

3.3    Simple job configuration:

<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:batch="http://www.springframework.org/schema/batch" xmlns:task="http://www.springframework.org/schema/task"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/batch
      http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
      http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

      <batch:job id="reportJob">
            <batch:step id="step1">
                  <batch:tasklet>
                        <batch:chunk reader="csvReader" processor="csvProcessor"
                              writer="csvWriter" commit-interval="1">
                        </batch:chunk>
                  </batch:tasklet>
            </batch:step>
            <batch:listeners merge="true">
                  <batch:listener ref="jobLogListener" />
            </batch:listeners>
      </batch:job>
      <bean name="csvReader" class="com.chandra.batch.CsvReader" />
      <bean name="csvProcessor" class="com.chandra.batch.CsvProcessor" />
      <bean name="csvWriter" class="com.chandra.batch.CsvWriter">
            <property name="hibernateTemplate" ref="hibernateTemplate" />
      </bean>
      <bean name="jobLogListener" class="com.chandra.batch.JobLogListener" />

</beans>

3.4    Item class

public class EmployeeItem implements Serializable
{
    private static final long serialVersionUID = 1L;
    private String name;
    private String salary;
    private String address;
}

3.5    Domain object:

@Entity
@Table(name = "EMP")
public class Employee implements Serializable
{
    private static final long serialVersionUID = 1L;
    @Id
    @Column(name = "ENAME)
    private String name;
    @Column(name = "ADDRESS)
    private String address;
    @Column(name = "SALARY)
    private Double salary;
}

3.6    Field set mapper

public class EmployeeFieldSetMapper<T extends EmployeeItem> implements FieldSetMapper<EmployeeItem>
{
    @Override
    public EmployeeItem mapFieldSet(FieldSet fieldSet) throws  BindException
    {
      EmployeeItem item = new EmployeeItem();
      item.setName(fieldSet.readString(0));
      item.setAddress(fieldSet.readString(1));
      item.setSalary(fieldSet.readString(2));

      return item;
    }
}

3.7    Reader:

public class CsvReader extends FlatFileItemReader<EmployeeItem>
{
    private static final String[] FIELDS = { "Employee Name", "Employee Address", "Employee salary" };

    @Override
    public void afterPropertiesSet() throws Exception
    {
      File file = new File("Employee.csv");
      FileInputStream fis = new FileInputStream(file);
      DigestInputStream digestInputStream = new DigestInputStream(fis, MessageDigest.getInstance("MD5"));
      InputStreamResource resource = new InputStreamResource(digestInputStream, "Employee.csv");
      setResource(resource);
      setLineMapper(new DefaultLineMapper<EmployeeItem>()
      {
          {
            setLineTokenizer(new DelimitedLineTokenizer()
            {
                {
                  setNames(FIELDS);
                  setStrict(false);
                }
            });
            setFieldSetMapper(new EmployeeFieldSetMapper<EmployeeItem>());
          }
      });
      // Skip header line
      setLinesToSkip(1);
    }
}

3.8    Processor:

public class CsvProcessor implements ItemProcessor<EmployeeItem, Employee>
{

    @Override
    public Employee process(EmployeeItem employeeItem) throws Exception
    {
      Employee emp = new Employee();
      emp.setName(employeeItem.getName());
      emp.setAddress(employeeItem.getAddress());
      emp.setSalary(Double.parseDouble(employeeItem.getSalary()));
      return emp;
    }
}

3.9    Writer:

public class CsvWriter implements ItemWriter<Employee>
{

    public HibernateTemplate hibernateTemplate;

    @Override
    public void write(List<? extends Employee> items) throws Exception
    {
      if (items != null)
      {
          for (Employee item : items)
          {
            if (item == null)
            {
                continue;
            }
            hibernateTemplate.save(item);
          }
      }
    }

    public HibernateTemplate getHibernateTemplate()
    {
      return hibernateTemplate;
    }

    public void setHibernateTemplate(HibernateTemplate hibernateTemplate)
    {
      this.hibernateTemplate = hibernateTemplate;
    }

}

3.10   Listener:

public class JobLogListener implements JobExecutionListener, InitializingBean
{

    @Override
    public void afterPropertiesSet() throws Exception
    {
      System.out.println("after properties");
    }

    @Override
    public void afterJob(JobExecution arg0)
    {
      System.out.println("after job");
    }

    @Override
    public void beforeJob(JobExecution arg0)
    {
      System.out.println("before job");
    }
}

3.11   Main app:

public class App
{
    public static void main(String[] args) throws SQLException
    {
      String[] springConfig = { "com/chandra/batch/job-read-files.xml", "com/chandra/batch/context.xml", "com/chandra/batch/database.xml" };
      ApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
      JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
      Job job = (Job) context.getBean("reportJob");
      try
      {
          jobLauncher.run(job, new JobParameters());
      } catch (Exception e)
      {
          e.printStackTrace();
      }
    }
}
















































1 comment:

  1. good post! @Chandra could you share complete example to read from sftp server and batch process to database.

    ReplyDelete