sbmy_control/low_level.py
2025-03-05 16:13:35 +00:00

185 lines
No EOL
6.1 KiB
Python

#!/usr/bin/env python
# -------------------------------------------------------------------------------------
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# The text of the license is located in the root directory of the source package.
# -------------------------------------------------------------------------------------
__author__ = "Tristan Hoellinger, Mayeul Aubin"
__version__ = "0.1"
__date__ = "2024"
__license__ = "GPLv3"
"""
Tools to deal with low-level operations e.g. redirecting C stdout.
"""
from contextlib import contextmanager
import ctypes
import io
import os, sys
import tempfile
libc = ctypes.CDLL(None)
c_stdout = ctypes.c_void_p.in_dll(libc, "stdout")
# Taken from:
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
@contextmanager
def stdout_redirector(stream):
"""A context manager that redirects stdout to the given stream. For instance, this
could be used to redirect C code stdout to None (to avoid cluttering the terminal eg
when using tqdm).
Args:
stream (file-like object): The stream to which stdout should be redirected.
Example:
>>> with stdout_redirector(stream):
>>> print("Hello world!") # Will be printed to stream instead of stdout.
"""
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stdout_fd = sys.stdout.fileno()
def _redirect_stdout(to_fd):
"""Redirect stdout to the given file descriptor."""
# Flush the C-level buffer stdout
libc.fflush(c_stdout)
# Flush and close sys.stdout - also closes the file descriptor (fd)
sys.stdout.close()
# Make original_stdout_fd point to the same file as to_fd
os.dup2(to_fd, original_stdout_fd)
# Create a new sys.stdout that points to the redirected fd
sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, "wb"))
# Save a copy of the original stdout fd in saved_stdout_fd
saved_stdout_fd = os.dup(original_stdout_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode="w+b")
_redirect_stdout(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stdout(saved_stdout_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stdout_fd)
# Adapted from:
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
c_stderr = ctypes.c_void_p.in_dll(libc, "stderr")
@contextmanager
def stderr_redirector(stream):
"""A context manager that redirects stderr to the given stream.
For instance, this could be used to redirect C code stderr to None (to avoid
cluttering the terminal eg when using tqdm).
WARNING: this should be used with CAUTION as it redirects all stderr, so NONE OF THE
ERRORS WILL BE DISPLAYED, no matter the severity.
Args:
stream (file-like object): The stream to which stdout should be redirected.
"""
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stderr_fd = sys.stderr.fileno()
def _redirect_stderr(to_fd):
"""Redirect stderr to the given file descriptor."""
# Flush the C-level buffer stderr
libc.fflush(c_stderr)
# Flush and close sys.stderr - also closes the file descriptor (fd)
sys.stderr.close()
# Make original_stderr_fd point to the same file as to_fd
os.dup2(to_fd, original_stderr_fd)
# Create a new sys.stderr that points to the redirected fd
sys.stderr = io.TextIOWrapper(os.fdopen(original_stderr_fd, "wb"))
# Save a copy of the original stdout fd in saved_stdout_fd
saved_stderr_fd = os.dup(original_stderr_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode="w+b")
_redirect_stderr(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stderr(saved_stderr_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stderr_fd)
def print_level(level:int, module:str):
"""
Generate a string with the current time, the module name and the level of the message.
"""
from datetime import datetime
max_len_module = 10
date = datetime.now()
out=""
out+=date.strftime("%H:%M:%S")
out+=" | "
out+=module.upper().ljust(max_len_module)
out+=" | "
out+=">"*level
out+=" "
return out
def print_message(message:str, level:int, module:str, verbose:int = 1):
"""
Print a message with a given level and module name.
"""
if verbose >= 1:
print(print_level(level, module)+message)
def print_header(verbose:int=1):
"""
Print the header of the program.
"""
if verbose >= 1:
from datetime import datetime
date = datetime.now()
width=40
program_name="SBMY CONTROL"
print("#"*width)
print("# "+program_name.center(width-4)+" #")
print("# "+date.strftime("%Y-%m-%d %H:%M:%S").center(width-4)+" #")
print("#"*width)
print("")
def print_starting_module(module:str, verbose:int=1):
"""
Print the starting message of a module.
"""
if verbose >= 1:
print(print_level(0, module)+f"Starting {module} module.")
def print_ending_module(module:str, verbose:int=1):
"""
Print the ending message of a module.
"""
if verbose >= 1:
print(print_level(0, module)+f"Ending {module} module.")