from ctypes import byref, c_char_p, c_size_t
from pathlib import Path
from typing import Optional, Union
from numpy import empty, ndarray
from expelliarmus.utils import _SUPPORTED_ENCODINGS
from expelliarmus.wizard.clib import (
c_cargos_t,
c_cut_fns,
c_get_time_window_fns,
c_measure_fns,
c_read_fns,
c_save_fns,
dat_cargo_t,
event_t,
events_cargo_t,
evt2_cargo_t,
evt3_cargo_t,
)
[docs]def c_read_wrapper(encoding: str, fpath: Union[str, Path], buff_size: int):
c_fpath = c_char_p(bytes(str(fpath), "utf-8"))
c_buff_size = c_size_t(buff_size)
cargo = c_cargos_t[encoding](events_info=events_cargo_t())
c_measure_fns[encoding](c_fpath, byref(cargo), c_buff_size)
status = 0
if cargo.events_info.dim > 0:
arr = empty((cargo.events_info.dim,), dtype=event_t)
status = c_read_fns[encoding](c_fpath, arr, byref(cargo), c_buff_size)
return (
(arr, status) if cargo.events_info.dim > 0 and status == 0 else (None, status)
)
[docs]def c_save_wrapper(
encoding: str,
fpath: Union[str, Path],
arr: ndarray,
buff_size: int,
):
c_fpath = c_char_p(bytes(str(fpath), "utf-8"))
c_buff_size = c_size_t(buff_size)
dim = len(arr)
if arr.dtype != event_t:
loc_arr = empty((dim,), dtype=event_t)
for k in ("t", "y", "x", "p"):
loc_arr[k] = arr[k].astype(loc_arr[k].dtype)
else:
loc_arr = arr
cargo = c_cargos_t[encoding](events_info=events_cargo_t(dim=dim, start_byte=0))
return c_save_fns[encoding](c_fpath, loc_arr, byref(cargo), c_buff_size)
[docs]def c_read_time_window_wrapper(
encoding: str,
fpath: Union[str, Path],
cargo: Union[dat_cargo_t, evt2_cargo_t, evt3_cargo_t],
buff_size: int,
):
c_fpath = c_char_p(bytes(str(fpath), "utf-8"))
c_buff_size = c_size_t(buff_size)
c_get_time_window_fns[encoding](c_fpath, byref(cargo), c_buff_size)
status = 0
if cargo.events_info.dim > 0:
arr = empty((cargo.events_info.dim,), dtype=event_t)
status = c_read_fns[encoding](c_fpath, arr, byref(cargo), c_buff_size)
return (
(arr, cargo, status)
if cargo.events_info.dim > 0 and status == 0
else (None, cargo, status)
)
[docs]def c_read_chunk_wrapper(
encoding: str,
fpath: Union[str, Path],
cargo: Union[dat_cargo_t, evt2_cargo_t, evt3_cargo_t],
buff_size: int,
):
c_fpath = c_char_p(bytes(str(fpath), "utf-8"))
c_buff_size = c_size_t(buff_size)
arr = empty(
(cargo.events_info.dim + (12 if encoding == "evt3" else 0),), dtype=event_t
)
status = c_read_fns[encoding](c_fpath, arr, byref(cargo), c_buff_size)
return (
(arr[: cargo.events_info.dim], cargo, status)
if cargo.events_info.dim > 0 and status == 0
else (None, cargo, status)
)
[docs]def c_cut_wrapper(
encoding: str,
fpath_in: Union[str, Path],
fpath_out: Union[str, Path],
new_duration: int,
buff_size: int,
):
c_fpath_in = c_char_p(bytes(str(fpath_in), "utf-8"))
c_fpath_out = c_char_p(bytes(str(fpath_out), "utf-8"))
c_new_duration = c_size_t(new_duration)
c_buff_size = c_size_t(buff_size)
return c_cut_fns[encoding](c_fpath_in, c_fpath_out, c_new_duration, c_buff_size)