Source code for expelliarmus.wizard.clib

import os
import pathlib
import re
from ctypes import (
    CDLL,
    POINTER,
    Structure,
    c_char_p,
    c_int,
    c_int16,
    c_int64,
    c_size_t,
    c_uint,
    c_uint8,
    c_uint16,
    c_uint64,
)

from numpy import zeros
from numpy.ctypeslib import ndpointer

from expelliarmus.utils import _ROOT_PATH

# Searching for the shared library.
[docs]lib_re = r"^expelliarmus\..*\.(so|pyd)$"
for root, dirs, files in os.walk(_ROOT_PATH): for f in files: if re.match(lib_re, f):
[docs] lib_path = pathlib.Path(os.path.join(root, f))
break
[docs]clib = CDLL(str(lib_path))
[docs]class event_t(Structure): _fields_ = [ ("t", c_int64), ("x", c_int16), ("y", c_int16), ("p", c_uint8),
]
[docs]class events_cargo_t(Structure): _fields_ = [ ("dim", c_size_t), ("is_chunk", c_uint8), ("time_window", c_size_t), ("is_time_window", c_uint8), ("start_byte", c_size_t), ("finished", c_uint8),
]
[docs]class dat_cargo_t(Structure): _fields_ = [ ("events_info", events_cargo_t), ("last_t", c_uint64), ("time_ovfs", c_uint64),
]
[docs]class evt2_cargo_t(Structure): _fields_ = [ ("events_info", events_cargo_t), ("last_t", c_int64), ("time_high", c_uint64),
]
[docs]class evt3_cargo_t(Structure): _fields_ = [ ("events_info", events_cargo_t), ("time_high", c_uint64), ("time_low", c_uint64), ("time_high_ovfs", c_uint64), ("time_low_ovfs", c_uint64), ("base_x", c_uint16), ("last_event", event_t),
]
[docs]c_cargos_t = dict(dat=dat_cargo_t, evt2=evt2_cargo_t, evt3=evt3_cargo_t)
# Setting up C wrappers. # Read functions.
[docs]c_read_dat = clib.read_dat
[docs]c_read_evt2 = clib.read_evt2
[docs]c_read_evt3 = clib.read_evt3
for fn, cargo_t in zip( (c_read_dat, c_read_evt2, c_read_evt3), (dat_cargo_t, evt2_cargo_t, evt3_cargo_t) ): fn.argtypes = [ c_char_p, ndpointer(ndim=1), POINTER(cargo_t), c_size_t, ] fn.restype = c_int
[docs]c_read_fns = dict(dat=c_read_dat, evt2=c_read_evt2, evt3=c_read_evt3)
# Compression functions.
[docs]c_save_dat = clib.save_dat
[docs]c_save_evt2 = clib.save_evt2
[docs]c_save_evt3 = clib.save_evt3
for fn, cargo_t in zip( (c_save_dat, c_save_evt2, c_save_evt3), (dat_cargo_t, evt2_cargo_t, evt3_cargo_t) ): fn.argtypes = [ c_char_p, ndpointer(ndim=1), POINTER(cargo_t), c_size_t, ] fn.restype = c_int
[docs]c_save_fns = dict(dat=c_save_dat, evt2=c_save_evt2, evt3=c_save_evt3)
# Cut functions.
[docs]ARGTYPES_CUT = [c_char_p, c_char_p, c_size_t, c_size_t]
[docs]RESTYPE_CUT = c_size_t
[docs]c_cut_dat = clib.cut_dat
[docs]c_cut_evt2 = clib.cut_evt2
[docs]c_cut_evt3 = clib.cut_evt3
for fn in (c_cut_dat, c_cut_evt2, c_cut_evt3):
[docs] fn.restype = RESTYPE_CUT
fn.argtypes = ARGTYPES_CUT
[docs]c_cut_fns = dict(dat=c_cut_dat, evt2=c_cut_evt2, evt3=c_cut_evt3)
# Measure functions.
[docs]c_measure_dat = clib.measure_dat
[docs]c_measure_evt2 = clib.measure_evt2
[docs]c_measure_evt3 = clib.measure_evt3
for fn, cargo_t in zip( (c_measure_dat, c_measure_evt2, c_measure_evt3), (dat_cargo_t, evt2_cargo_t, evt3_cargo_t), ): fn.argtypes = [c_char_p, POINTER(cargo_t), c_size_t] fn.restype = None
[docs]c_measure_fns = dict(dat=c_measure_dat, evt2=c_measure_evt2, evt3=c_measure_evt3)
# Time window functions.
[docs]c_get_time_window_dat = clib.get_time_window_dat
[docs]c_get_time_window_evt2 = clib.get_time_window_evt2
[docs]c_get_time_window_evt3 = clib.get_time_window_evt3
for fn, cargo_t in zip( (c_get_time_window_dat, c_get_time_window_evt2, c_get_time_window_evt3), (dat_cargo_t, evt2_cargo_t, evt3_cargo_t), ):
[docs] fn.argtypes = [c_char_p, POINTER(cargo_t), c_size_t]
fn.restype = None
[docs]c_get_time_window_fns = dict( dat=c_get_time_window_dat, evt2=c_get_time_window_evt2, evt3=c_get_time_window_evt3
)