Commit 328f746d authored by Alejandro Homs Puron's avatar Alejandro Homs Puron
Browse files

[IO] Refactor multi to support independent I/O drivers

parent 13a23596
// Copyright (C) 2020 Alejandro Homs Puron, ESRF.
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <filesystem>
#include <boost/exception/info.hpp>
#include <boost/exception/errinfo_file_name.hpp>
#include <lima/exceptions.hpp>
#include <lima/io/enums.hpp>
namespace lima
{
namespace io
{
/// Policy defining the action when the writer finds an existing file
struct write_file_exists_policy
{
write_file_exists_policy(file_exists_policy_enum file_exists_policy) : m_file_exists_policy(file_exists_policy)
{
}
void operator()(std::filesystem::path filepath)
{
if (std::filesystem::exists(filepath) && m_file_exists_policy == file_exists_policy_enum::abort)
LIMA_THROW_EXCEPTION(io_error("File exists and policy is abort")
<< boost::errinfo_file_name(filepath.string()));
}
file_exists_policy_enum m_file_exists_policy;
};
} //namespace io
} //namespace lima
......@@ -33,6 +33,7 @@
#include <lima/core/image/view.hpp>
#include <lima/io/compression/zip.hpp>
#include <lima/io/compression/bshuf_lz4.hpp>
#include <lima/io/file_exists_policy.hpp>
#include <lima/io/h5/enums.hpp>
#include <lima/io/h5/nexus.hpp>
......@@ -125,6 +126,7 @@ namespace io
{
public:
using dimensions_t = lima::point<hsize_t>;
using file_exists_policy_t = write_file_exists_policy;
writer(std::filesystem::path const& filename, // Common
int nb_frames, int nb_frames_per_chunk, // Data
......@@ -395,6 +397,7 @@ namespace io
{
public:
using dimensions_t = lima::point<hsize_t>;
using file_exists_policy_t = write_file_exists_policy;
writer_sparse(std::filesystem::path const& filename, // Common
int nb_frames, int nb_frames_per_chunk, //
......
......@@ -13,6 +13,7 @@
#include <functional>
#include <memory>
#include <vector>
#include <type_traits>
#if defined(LIMA_ENABLE_MPI)
#include <mpi.h>
......@@ -20,11 +21,9 @@
#include <fmt/format.h>
#include <boost/exception/info.hpp>
#include <boost/exception/errinfo_file_name.hpp>
#include <lima/exceptions.hpp>
#include <lima/logging.hpp>
#include <lima/core/frame.hpp>
#include <lima/io/const.hpp>
#include <lima/io/enums.hpp>
......@@ -32,25 +31,26 @@ namespace lima
{
namespace io
{
/// A Writer adapter that splits a frame sequence into multiple files
template <typename Writer>
/// An I/O Driver adapter that splits a frame sequence into multiple files
template <typename Driver>
class multi
{
// An underlying writter and a counter of the number of frames to be written
struct writer_counter
struct driver_counter
{
writer_counter(Writer&& w) : writer(std::move(w)) {}
writer_counter(writer_counter&& w) = default;
driver_counter(Driver&& d) : driver(std::move(d)) {}
driver_counter(driver_counter&& c) = default;
std::atomic_int nb_frames_written = 0;
Writer writer;
std::atomic_int nb_frames_xferred = 0;
Driver driver;
};
public:
using writer_t = Writer;
using driver_t = Driver;
using file_exists_policy_t = typename driver_t::file_exists_policy_t;
/// Construct a multi file writer
/// \tparams Args Parameters that are forwarded to the underlying writer
/// Construct a multi file driver
/// \tparams Args Parameters that are forwarded to the underlying driver
template <typename... Args>
multi(std::filesystem::path base_path, std::string filename_format, std::string filename_prefix,
std::string filename_suffix, int start_number, int rank, file_exists_policy_enum file_exists_policy,
......@@ -71,15 +71,15 @@ namespace io
int number_of_files = int_ceiling(m_nb_frames, m_nb_frames_per_file);
LIMA_LOG(trace, io) << "Expecting " << number_of_files << " files";
// Reserve writers
m_writers.resize(number_of_files);
// Reserve drivers
m_drivers.resize(number_of_files);
// Create the factory (bind the writer specific arguments)
m_writer_factory = [nb_frames_per_file, nb_frames_per_chunk, args...](std::string const& filename) {
return writer_t{filename, nb_frames_per_file, nb_frames_per_chunk, args...};
// Create the factory (bind the driver specific arguments)
m_driver_factory = [nb_frames_per_file, nb_frames_per_chunk, args...](std::string const& filename) {
return driver_t{filename, nb_frames_per_file, nb_frames_per_chunk, args...};
};
// Create the initial writer
// Create the initial driver
open(0);
}
......@@ -87,24 +87,24 @@ namespace io
template <typename Callable>
void apply(int frame_idx, int nb_frames, Callable func)
{
// Check if the writer is available
auto writer_idx = frame_idx / m_nb_frames_per_file;
if (!is_writer_available(writer_idx))
// Create the next writer
open(writer_idx);
// Check if the driver is available
auto driver_idx = frame_idx / m_nb_frames_per_file;
if (!is_driver_available(driver_idx))
// Create the next driver
open(driver_idx);
// Get the writer counter
auto& wcounter = m_writers[writer_idx];
// Get the driver counter
auto& dcounter = m_drivers[driver_idx];
// Write the data
func(wcounter->writer, frame_idx % m_nb_frames_per_file);
func(dcounter->driver, frame_idx % m_nb_frames_per_file);
// Increment counter
wcounter->nb_frames_written += nb_frames;
dcounter->nb_frames_xferred += nb_frames;
// Release the writer if we are done with it
if (wcounter->nb_frames_written >= m_nb_frames_per_file)
wcounter.reset();
// Release the driver if we are done with it
if (dcounter->nb_frames_xferred >= m_nb_frames_per_file)
dcounter.reset();
}
/// Write a frame view
......@@ -127,63 +127,61 @@ namespace io
});
}
/// Close the writer for the given frame idx
/// Close the driver for the given frame idx
/// \param frame_idx is the frame idx (or idx of the first frame in the chunk)
void close(int frame_idx)
{
// Check if the writer is available
auto writer_idx = frame_idx / m_nb_frames_per_file;
if (is_writer_available(writer_idx)) {
LIMA_LOG(trace, io) << "Closing writer " << writer_idx;
// Close writer
m_writers[writer_idx].reset();
// Check if the driver is available
auto driver_idx = frame_idx / m_nb_frames_per_file;
if (is_driver_available(driver_idx)) {
LIMA_LOG(trace, io) << "Closing I/O driver " << driver_idx;
// Close driver
m_drivers[driver_idx].reset();
}
}
protected:
// Settings
int m_nb_frames; //!< The number of frames expected to be saved in the file
int m_nb_frames_per_file; //!< The number of frames per file
int m_nb_frames_per_chunk; //!< The number of frames per chunk
int m_start_number; //!< The start number of the
int m_rank; //!< The MPI rank of the process
std::filesystem::path m_base_path; //!< The directory where to save the files
std::string m_filename_format; //!< The format of the
std::string m_filename_prefix; //!< The file prefix
std::string m_filename_suffix; //!< The file suffix
file_exists_policy_enum m_file_exists_policy; //!< Behavior when the file already exists
int m_nb_frames; //!< The number of frames expected to be read/saved in the file
int m_nb_frames_per_file; //!< The number of frames per file
int m_nb_frames_per_chunk; //!< The number of frames per chunk
int m_start_number; //!< The start number of the
int m_rank; //!< The MPI rank of the process
std::filesystem::path m_base_path; //!< The directory where to save the files
std::string m_filename_format; //!< The format of the
std::string m_filename_prefix; //!< The file prefix
std::string m_filename_suffix; //!< The file suffix
file_exists_policy_t m_file_exists_policy; //!< Behavior when the file already exists
private:
std::vector<std::unique_ptr<writer_counter>> m_writers;
std::function<writer_t(std::string const&)> m_writer_factory;
std::vector<std::shared_ptr<driver_counter>> m_drivers; // Vector elements must be copy-constructible
std::function<driver_t(std::string const&)> m_driver_factory;
/// Format the filename according to base path, prefix, start_number and suffix
std::filesystem::path filename(std::size_t writer_idx = 0)
std::filesystem::path filename(std::size_t driver_idx = 0)
{
using namespace fmt::literals;
return fmt::format(m_filename_format, "filename_prefix"_a = m_filename_prefix, "rank"_a = m_rank,
"file_number"_a = m_start_number + writer_idx, "filename_suffix"_a = m_filename_suffix);
"file_number"_a = m_start_number + driver_idx, "filename_suffix"_a = m_filename_suffix);
}
bool is_writer_available(std::size_t writer_idx) { return (bool) m_writers[writer_idx]; }
bool is_driver_available(std::size_t driver_idx) { return (bool) m_drivers[driver_idx]; }
/// Open and initialize a file
void open(std::size_t writer_idx)
void open(std::size_t driver_idx)
{
if (writer_idx >= m_writers.size())
LIMA_THROW_EXCEPTION(io_error("Unexpected writer index"));
if (driver_idx >= m_drivers.size())
LIMA_THROW_EXCEPTION(io_error("Unexpected I/O driver index"));
std::filesystem::path filepath = m_base_path / filename(writer_idx);
std::filesystem::path filepath = m_base_path / filename(driver_idx);
LIMA_LOG(trace, io) << "Opening file " << filepath;
//Check whether the file exists, etc
if (std::filesystem::exists(filepath) && m_file_exists_policy == file_exists_policy_enum::abort)
LIMA_THROW_EXCEPTION(io_error("File exists and policy is abort")
<< boost::errinfo_file_name(filepath.string()));
m_file_exists_policy(filepath);
// Create the writer (should call filename.native() but HDF5 does not support unicode path yet on Windows)
m_writers[writer_idx] = std::make_unique<writer_counter>(m_writer_factory(filepath.string()));
// Create the driver (should call filename.native() but HDF5 does not support unicode path yet on Windows)
m_drivers[driver_idx] = std::make_shared<driver_counter>(m_driver_factory(filepath.string()));
}
};
......
......@@ -10,6 +10,7 @@
#include <boost/test/unit_test.hpp>
#include <lima/io/multi.hpp>
#include <lima/io/file_exists_policy.hpp>
namespace gil = boost::gil;
namespace io = lima::io;
......@@ -28,6 +29,8 @@ struct frame_view
struct writer
{
using file_exists_policy_t = io::write_file_exists_policy;
inline static const std::size_t max_nb_frames_per_file = 1000;
template <typename String, typename... Settings>
......@@ -98,4 +101,4 @@ BOOST_AUTO_TEST_CASE(test_task_io_multi_sequence)
for (std::size_t i = 0; i < 1000; i++)
multi.write_view(mock::frame_view{i});
}
\ No newline at end of file
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment