Skip to content

Streaming large dataset over Rest

How to use StreamingResonseBody to stream large amount of data to a client using SpringBoot.

Sumit Gaur
Sumit Gaur
5 min read
Streaming large dataset over Rest

Java8 streams provide a convenient way to process large amounts of data leveraging the underlying multi-core hardware capabilities (in most cases). But is it possible to extend these features to our DAO layer as well? We'll try to explore the options in this post.

Please note that we'll not be focusing on the reactive streams implementations like WebFlux, RSocket, etc., in this post. We will focus on the native streaming capabilities provided by SpringMVC out of the box.

Introduction

Suppose, as a service provider, we need to publish large amounts of data to our consumers over REST API. We can implement this use-case quickly by creating the following three components:

  1. Controller: an endpoint to accept requests from clients.
  2. Service: a service class containing business logic to transform the data (if required)
  3. DAO: the persistence layer to fetch the dataset from the database (ex: Spring Data JPA repositories).

Assuming we need to publish the entire dataset for an Entity, the most common approach is to create a new interface extending the JpaRepositoty<T, ID> interface and call the findAll() method on it from our Service class. The findAll() method returns a list of records that can then be published to the client as a response body. But it will load the entire dataset in memory in one go, which (depending on the data volume) could be catastrophic.

Now there are options available like pagination to only load a subset of records in the memory at any given point in time, but that expects the service provider to handle additional logic of maintaining the page number.

To overcome these issues, we will explore the possibility of streaming the data to the client. Streaming data has multiple benefits like:

  1. As a service provider, we do not load the entire dataset in the memory. We can fetch data from the database asynchronously and can publish it to the client.
  2. The same is true from a client's perspective as well. It does not have to deal with the entire dataset in one go and can process the streaming data as and when it's available.
  3. Another benefit is that the client does not have to wait for the availability of the entire dataset. It can start processing the data as soon as the records are available.

Service Changes

The following code demonstrates the three components that we mentioned earlier in the post:

RestController Endpoint

The first task is to create an endpoint to accept the requests. The following code shows an example to fetch all the Employees available in the system:

@GetMapping(value = "/employee")
public ResponseEntity<StreamingResponseBody> findAllEmployee() throws ExecutionException, InterruptedException {
   logger.info("request received to fetch all employee details");
   return employeePersistenceService.findActiveEmployee();
}

Please note the return type of the method here - StreamingResponseBody. It will help us return the original HTTP thread as part of the response and allow us to asynchronously write employee records in a new thread on the output stream. The AsyncConfig class does the thread management:

@Bean
public WebMvcConfigurer webMvcConfigurer(ConcurrentTaskExecutor concurrentTaskExecutor) {
    return new WebMvcConfigurer() {
        @Override
        public void configureAsyncSupport(@NonNull AsyncSupportConfigurer configurer) {
            configurer.setDefaultTimeout(-1);
            configurer.setTaskExecutor(concurrentTaskExecutor);
        }
    };
}
@Bean
public ConcurrentTaskExecutor concurrentTaskExecutor() {
    return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(5, new ThreadFactory() {
        private final AtomicInteger threadCounter = new AtomicInteger(0);
        @Override
        public Thread newThread(@NonNull Runnable runnable) {
            return new Thread(runnable, "asyncThread-" + threadCounter.incrementAndGet());
        }
    }));
}

Service Class

The following service class is used to trigger a JpaRepository call to fetch the employee records. The Stream of records returned from the JPA is written on the OutputStream as follows:

@Override
@Transactional(readOnly = true)
public ResponseEntity<StreamingResponseBody> findActiveEmployee() {
   Stream<Employee> employees = employeeRepository.getAllEmployees().map(EmployeePhysical::toModel);
   StreamingResponseBody responseBody = httpResponseOutputStream -> {
       try (Writer writer = new BufferedWriter(new OutputStreamWriter(httpResponseOutputStream))) {
           employees.forEach(employee -> {
               try {
                   writer.write(gson.toJson(employee));
                   logger.info("streamed record");
                   writer.flush();
               } catch (IOException exception) {
                   logger.error("exception occurred while writing object to stream", exception);
               }
           });
       } catch (Exception exception) {
           logger.error("exception occurred while publishing data", exception);
       }
       logger.info("finished streaming records");
   };

   return ResponseEntity.status(HttpStatus.OK).contentType(MediaType.APPLICATION_JSON).body(responseBody);
}

As we can see, the method is annotated with the @Transactional annotation as otherwise, we cannot use the Stream in this context, and Spring fails the call with the following exception logged:

org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.

Repository (DAO interface)

The following interface is used to fetch the employee as a stream from the database:

@Repository
public interface EmployeeRepository extends JpaRepository<EmployeePhysical, Long> {
   
    @QueryHints(value = {
            @QueryHint(name = HINT_FETCH_SIZE, value = "50"),
            @QueryHint(name = HINT_CACHEABLE, value = "false"),
            @QueryHint(name = READ_ONLY, value = "true")
    })
    @Query("SELECT EMP FROM Employees EMP")
    Stream<EmployeePhysical> getAllEmployees();
}

The HINT_FETCH_SIZE attribute directs the persistence context to fetch only so many records in one go. In case there is no fetch size specified, the responsibility lies with the actual JDBC driver implementation to decide how many records it will fetch depending on the default value of its fetchSize property.

Assuming we have covered the bare minimum components required to stream the records to the client; when triggered, it sadly fails with the following exception logged:

org.hibernate.exception.GenericJDBCException: could not advance using next()
----
----
Caused by: java.sql.SQLException: Operation not allowed after ResultSet closed

Any guesses about the failure? If you look closely at the findActiveEmployee method, the lambda expression to write the response on OutputStream is triggered lazily only when the control goes back to the RestController endpoint, and it starts writing asynchronously. But the moment control flow leaves the @Transactional method; the ResultSet is marked closed, due to which the callback can not retrieve the records.

The same is the case while using EntityManager to stream the records. Even if you try to manage transaction yourself, the Spring-managed EntityManager throws the following exception:

Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT

JDBC template to the rescue

Lets, refactor the code to use the JDBC template and use the streaming to publish data:

private ResponseEntity<StreamingResponseBody> findActiveEmployeeUsingEntityManager() {
    jdbcTemplate.setFetchSize(50);
    Stream<Employee> employees = jdbcTemplate.queryForStream("Select FIRST_NAME, LAST_NAME, HIRE_DATE, BIRTH_DATE, GENDER from employees",
            (resultSet, rowNum) ->
                    new Employee(resultSet.getString("FIRST_NAME"), resultSet.getString("LAST_NAME"),
                            resultSet.getDate("HIRE_DATE").toLocalDate(),
                            resultSet.getDate("BIRTH_DATE").toLocalDate(), resultSet.getString("GENDER"))
    );

    StreamingResponseBody responseBody = httpResponseOutputStream -> {
        try (Writer writer = new BufferedWriter(new OutputStreamWriter(httpResponseOutputStream))) {
            employees.forEach(employee -> {
                try {
                    writer.write(gson.toJson(employee));
                    logger.info("streamed record");
                    writer.flush();
                } catch (IOException exception) {
                    logger.error("exception occurred while writing object to stream", exception);
                }
            });
        } catch (Exception exception) {
            logger.error("exception occurred while publishing data", exception);
        }
        logger.info("finished streaming records");
    };

    return ResponseEntity.status(HttpStatus.OK).contentType(MediaType.APPLICATION_JSON).body(responseBody);
}

And now, when we try to publish the data, it works as expected. We can refactor it a bit more to return only the Stream from records from the Service and prepare the results in the Controller class itself:

// service
public Stream<Employee> findActiveEmployee() {
    jdbcTemplate.setFetchSize(10);
    return jdbcTemplate.queryForStream("Select FIRST_NAME, LAST_NAME, HIRE_DATE, BIRTH_DATE, GENDER fromemployees",
            (resultSet, rowNum) ->
                    new Employee(resultSet.getString("FIRST_NAME"), resultSet.getString("LAST_NAME"),
                            resultSet.getDate("HIRE_DATE").toLocalDate(),
                            resultSet.getDate("BIRTH_DATE").toLocalDate(), resultSet.getString("GENDER"))
    );
}

// controller
@GetMapping(value = "/employee")
public ResponseEntity<StreamingResponseBody> findAllEmployee() {
    logger.info("request received to fetch all employee details");
    Stream<Employee> employees = employeePersistenceService.findActiveEmployee();
    StreamingResponseBody responseBody = httpResponseOutputStream -> {
        try (Writer writer = new BufferedWriter(new OutputStreamWriter(httpResponseOutputStream))) {
            employees.forEach(employee -> {
                try {
                    writer.write(gson.toJson(employee));
                    writer.flush();
                } catch (IOException exception) {
                    logger.error("exception occurred while writing object to stream", exception);
                }
            });
        } catch (Exception exception) {
            logger.error("exception occurred while publishing data", exception);
        }
        logger.info("finished streaming records");
    };
    return ResponseEntity.status(HttpStatus.OK).contentType(MediaType.APPLICATION_JSON).bod(responseBody);
}

To check the performance (GC cycles, heap usage, etc.), run the application with the following switches: -verbose:gc -Xms128m -Xmx128m -XX:+PrintGCDetails. I have tested it to Stream around 300K records, and it works just fine, without any issues.

As always, the sample code (and a client as well) is available in the Github repository.

References

How To

Sumit Gaur

A passionate java developer and open source advocate, who understands the value of continuous learning and sharing knowledge.