Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
E
ewoksdata
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
workflow
ewoksapps
ewoksdata
Merge requests
!23
Hdf5 dataset options
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Hdf5 dataset options
hdf5_dataset_options
into
main
Overview
0
Commits
2
Pipelines
1
Changes
2
Merged
Wout De Nolf
requested to merge
hdf5_dataset_options
into
main
2 years ago
Overview
0
Commits
2
Pipelines
1
Changes
2
Expand
0
0
Merge request reports
Compare
main
main (base)
and
latest version
latest version
e5a7c674
2 commits,
2 years ago
2 files
+
190
−
1
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
2
Search (e.g. *.vue) (Ctrl+P)
src/ewoksdata/data/hdf5_config.py
0 → 100644
+
189
−
0
Options
"""
HDF5 configuration of optimal data storage (IO speed, compression, ...)
"""
from
typing
import
Optional
,
Tuple
,
NewType
from
numbers
import
Integral
from
collections.abc
import
Mapping
import
numpy
from
numpy.typing
import
DTypeLike
try
:
import
hdf5plugin
except
ImportError
:
hdf5plugin
=
None
PositiveIntegral
=
NewType
(
"
PositiveIntegral
"
,
Integral
)
# >= 0
StrictPositiveIntegral
=
NewType
(
"
StrictPositiveIntegral
"
,
Integral
)
# > 0
ShapeType
=
Tuple
[
StrictPositiveIntegral
]
SizeType
=
PositiveIntegral
VarShapeType
=
Tuple
[
Optional
[
PositiveIntegral
]]
# 0 or None mark a variable dimension
VarH5pyShapeType
=
Tuple
[
Optional
[
StrictPositiveIntegral
]
]
# None marks a variable dimension
DEFAULT_CHUNK_NBYTES
=
1
<<
20
DEFAULT_COMPRESSION_LIMIT_NBYTES
=
1
<<
20
DEFAULT_CHUNK_SPLIT
=
4
DEFAULT_COMPRESSION_SCHEME
=
"
gzip-byteshuffle
"
# Default data size
# 0D detector: 2 KB
# 1D detector: 2 MB
# 2D detector: 2 GB
DEFAULT_SCAN_DIM_SIZE
=
512
DEFAULT_DETECTOR_DIM_SIZE
=
1024
DEFAULT_DTYPE
=
numpy
.
int32
def
dtype_nbytes
(
dtype
:
DTypeLike
)
->
int
:
return
numpy
.
dtype
(
dtype
).
itemsize
def
shape_to_size
(
shape
:
ShapeType
)
->
int
:
return
numpy
.
prod
(
shape
,
dtype
=
int
)
def
shape_to_nbytes
(
shape
:
ShapeType
,
dtype
:
DTypeLike
)
->
int
:
return
shape_to_size
(
shape
)
*
dtype_nbytes
(
dtype
)
def
guess_data_shape
(
scan_shape
:
VarShapeType
,
detector_shape
:
VarShapeType
,
max_shape
:
Optional
[
VarH5pyShapeType
],
)
->
ShapeType
:
scan_shape
=
tuple
(
n
if
n
else
DEFAULT_SCAN_DIM_SIZE
for
n
in
scan_shape
)
detector_shape
=
tuple
(
n
if
n
else
DEFAULT_SCAN_DIM_SIZE
for
n
in
detector_shape
)
data_shape
=
scan_shape
+
detector_shape
if
max_shape
:
assert
len
(
max_shape
)
==
len
(
data_shape
),
"
HDF5 dataset shape must have the same dimensions as maxshape
"
data_shape
=
tuple
(
n1
if
not
n2
else
max
(
n1
,
n2
)
for
n1
,
n2
in
zip
(
data_shape
,
max_shape
)
)
return
data_shape
def
guess_chunk_shape
(
data_shape
:
ShapeType
,
dtype
:
DTypeLike
,
chunk_split
:
Optional
[
Integral
]
=
None
,
chunk_nbytes
:
Optional
[
Integral
]
=
None
,
)
->
Optional
[
ShapeType
]:
"""
Try to guess the optimal chunk shape with these constraints:
* Split any dimension for partial access
* Below the maximal chunk size (1 MB by default, uncompressed)
The inner-most dimensions are split in `chunk_split` parts
until chunk_nbytes is reached. The chunk size in the outer
dimensions will be 1, unless the data size is to small.
"""
if
chunk_nbytes
is
None
:
chunk_nbytes
=
DEFAULT_CHUNK_NBYTES
if
chunk_split
is
None
:
chunk_split
=
DEFAULT_CHUNK_SPLIT
itemsize
=
dtype_nbytes
(
dtype
)
size
=
shape_to_size
(
data_shape
)
nbytes
=
size
*
itemsize
if
nbytes
<=
chunk_nbytes
:
return
None
max_size
=
chunk_nbytes
//
itemsize
current_size
=
1
chunk_shape
=
[]
for
n_i
in
data_shape
[
-
1
::
-
1
]:
if
current_size
>=
max_size
:
c_i
=
1
else
:
a
=
int
(
numpy
.
ceil
(
n_i
/
chunk_split
))
b
=
int
(
numpy
.
ceil
(
max_size
/
current_size
))
c_i
=
min
(
a
,
b
)
chunk_shape
.
append
(
c_i
)
current_size
*=
c_i
chunk_shape
=
tuple
(
chunk_shape
[::
-
1
])
if
chunk_shape
==
data_shape
:
return
None
return
chunk_shape
def
guess_compression
(
data_shape
:
ShapeType
,
dtype
:
DTypeLike
,
compression_limit_nbytes
:
Optional
[
Integral
]
=
None
,
)
->
bool
:
"""
Compression is needed when the total data size exceeds the limits (1 MB by default).
"""
if
compression_limit_nbytes
is
None
:
compression_limit_nbytes
=
DEFAULT_COMPRESSION_LIMIT_NBYTES
nbytes
=
shape_to_nbytes
(
data_shape
,
dtype
)
return
nbytes
>
compression_limit_nbytes
def
get_compression_arguments
(
compression_scheme
:
Optional
[
str
]
=
None
)
->
Mapping
:
if
compression_scheme
:
compression_scheme
=
compression_scheme
.
lower
()
if
compression_scheme
is
None
:
compression_scheme
=
DEFAULT_COMPRESSION_SCHEME
if
compression_scheme
==
"
none
"
:
return
dict
()
elif
compression_scheme
==
"
gzip
"
:
return
{
"
compression
"
:
"
gzip
"
}
elif
compression_scheme
==
"
byteshuffle
"
:
return
{
"
shuffle
"
:
True
}
elif
compression_scheme
==
"
gzip-byteshuffle
"
:
return
{
"
compression
"
:
"
gzip
"
,
"
shuffle
"
:
True
}
elif
compression_scheme
==
"
bitshuffle
"
:
if
hdf5plugin
is
None
:
raise
RuntimeError
(
"
Writer does not support HDF5
'
bitshuffle
'
compression. Install the hdf5plugin library
"
)
return
hdf5plugin
.
Bitshuffle
(
nelems
=
0
,
lz4
=
False
)
elif
compression_scheme
==
"
lz4-bitshuffle
"
:
if
hdf5plugin
is
None
:
raise
RuntimeError
(
"
Writer does not support HDF5
'
bitshuffle
'
compression. Install the hdf5plugin library
"
)
return
hdf5plugin
.
Bitshuffle
(
nelems
=
0
,
lz4
=
True
)
else
:
raise
ValueError
(
f
"
Unknown HDF5 compression
'
{
compression_scheme
}
'"
)
def
guess_dataset_config
(
scan_shape
:
VarShapeType
,
detector_shape
:
VarShapeType
,
dtype
:
Optional
[
DTypeLike
]
=
None
,
chunk_split
:
Optional
[
Integral
]
=
None
,
chunk_nbytes
:
Optional
[
Integral
]
=
None
,
compression_limit_nbytes
:
Optional
[
Integral
]
=
None
,
compression_scheme
:
Optional
[
str
]
=
None
,
max_shape
:
Optional
[
VarH5pyShapeType
]
=
None
,
)
->
dict
:
"""
Dataset configuration passed the `h5py.Group.create_dataset` for optimal
storage (IO speed, compression, ...)
"""
data_shape
=
guess_data_shape
(
scan_shape
=
scan_shape
,
detector_shape
=
detector_shape
,
max_shape
=
max_shape
)
if
dtype
is
None
:
dtype
=
DEFAULT_DTYPE
chunk_shape
=
guess_chunk_shape
(
data_shape
=
data_shape
,
dtype
=
dtype
,
chunk_split
=
chunk_split
,
chunk_nbytes
=
chunk_nbytes
,
)
config
=
{
"
chunks
"
:
chunk_shape
}
compression
=
guess_compression
(
data_shape
=
data_shape
,
dtype
=
dtype
,
compression_limit_nbytes
=
compression_limit_nbytes
,
)
if
compression
:
config
.
update
(
get_compression_arguments
(
compression_scheme
=
compression_scheme
))
chunking_required
=
compression
or
max_shape
is
not
None
if
chunking_required
and
chunk_shape
is
None
:
# Do not let h5py guess the chunk size
config
[
"
chunks
"
]
=
data_shape
return
config
Loading