sbmy_control/low_level.py

269 lines
No EOL
9.4 KiB
Python

#!/usr/bin/env python
# -------------------------------------------------------------------------------------
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# The text of the license is located in the root directory of the source package.
# -------------------------------------------------------------------------------------
__author__ = "Tristan Hoellinger, Mayeul Aubin"
__version__ = "0.1"
__date__ = "2024"
__license__ = "GPLv3"
"""
Tools to deal with low-level operations e.g. redirecting C stdout.
"""
from contextlib import contextmanager
import ctypes
import io
import os, sys
import tempfile
libc = ctypes.CDLL(None)
c_stdout = ctypes.c_void_p.in_dll(libc, "stdout")
# Taken from:
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
@contextmanager
def stdout_redirector(stream):
"""A context manager that redirects stdout to the given stream. For instance, this
could be used to redirect C code stdout to None (to avoid cluttering the terminal eg
when using tqdm).
Args:
stream (file-like object): The stream to which stdout should be redirected.
Example:
>>> with stdout_redirector(stream):
>>> print("Hello world!") # Will be printed to stream instead of stdout.
"""
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stdout_fd = sys.stdout.fileno()
def _redirect_stdout(to_fd):
"""Redirect stdout to the given file descriptor."""
# Flush the C-level buffer stdout
libc.fflush(c_stdout)
# Flush and close sys.stdout - also closes the file descriptor (fd)
sys.stdout.close()
# Make original_stdout_fd point to the same file as to_fd
os.dup2(to_fd, original_stdout_fd)
# Create a new sys.stdout that points to the redirected fd
sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, "wb"))
# Save a copy of the original stdout fd in saved_stdout_fd
saved_stdout_fd = os.dup(original_stdout_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode="w+b")
_redirect_stdout(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stdout(saved_stdout_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stdout_fd)
# Adapted from:
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
c_stderr = ctypes.c_void_p.in_dll(libc, "stderr")
@contextmanager
def stderr_redirector(stream):
"""A context manager that redirects stderr to the given stream.
For instance, this could be used to redirect C code stderr to None (to avoid
cluttering the terminal eg when using tqdm).
WARNING: this should be used with CAUTION as it redirects all stderr, so NONE OF THE
ERRORS WILL BE DISPLAYED, no matter the severity.
Args:
stream (file-like object): The stream to which stdout should be redirected.
"""
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stderr_fd = sys.stderr.fileno()
def _redirect_stderr(to_fd):
"""Redirect stderr to the given file descriptor."""
# Flush the C-level buffer stderr
libc.fflush(c_stderr)
# Flush and close sys.stderr - also closes the file descriptor (fd)
sys.stderr.close()
# Make original_stderr_fd point to the same file as to_fd
os.dup2(to_fd, original_stderr_fd)
# Create a new sys.stderr that points to the redirected fd
sys.stderr = io.TextIOWrapper(os.fdopen(original_stderr_fd, "wb"))
# Save a copy of the original stdout fd in saved_stdout_fd
saved_stderr_fd = os.dup(original_stderr_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode="w+b")
_redirect_stderr(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stderr(saved_stderr_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stderr_fd)
def print_level(level:int, module:str):
"""
Generate a string with the current time, the module name and the level of the message.
"""
from datetime import datetime
max_len_module = 14
date = datetime.now()
out=""
out+=date.strftime("%H:%M:%S")
out+=" | "
out+=module.upper().center(max_len_module)
out+=" | "
out+=">"*level
out+=" "
return out
def print_message(message:str, level:int, module:str, verbose:int = 1):
"""
Print a message with a given level and module name.
"""
if verbose >= 1:
print(print_level(level, module)+message)
def print_header(verbose:int=1):
"""
Print the header of the program.
"""
if verbose >= 1:
from datetime import datetime
date = datetime.now()
width=40
program_name="SBMY CONTROL"
print("#"*width)
print("# "+program_name.center(width-4)+" #")
print("# "+date.strftime("%Y-%m-%d %H:%M:%S").center(width-4)+" #")
print("#"*width)
print("")
def print_starting_module(module:str, verbose:int=1):
"""
Print the starting message of a module.
"""
if verbose >= 1:
print(print_level(0, module)+f"Starting {module} module.")
def print_ending_module(module:str, verbose:int=1):
"""
Print the ending message of a module.
"""
if verbose >= 1:
print(print_level(0, module)+f"Ending {module} module.")
def wait_until_file_exists(filename:str, verbose:int=1, limit:int=1200):
"""
Wait until a file exists.
"""
from time import sleep
from os.path import isfile
k=0
while not isfile(filename) and k<limit:
if k%60 == -1:
print_message(f"Waiting for {filename} to exist. {k//60} minutes elapsed.", 3, "low level", verbose=verbose)
sleep(1)
if k>=limit:
raise TimeoutError(f"File {filename} not found after {limit} seconds.")
if k>60:
print_message(f"File {filename} exists. {k//60} minutes elapsed.", 3, "low level", verbose=verbose)
def get_progress_from_logfile(filename):
"""
Get the number of operations done and the total number of operations from a log file.
"""
current_operation = 0
total_operations = 0
from os.path import isfile
if not isfile(filename):
raise FileNotFoundError(f"Log file {filename} not found.")
with open(filename, "r") as f:
lines = f.readlines()
for line in lines:
if " operation " in line:
try:
splitted_line = line.split(" operation ")[1]
splitted_line = splitted_line.split(":")[0]
current_operation = int(splitted_line.split("/")[0])
total_operations = int(splitted_line.split("/")[1])
except:
pass
elif "Fatal" in line or "Error" in line:
return -1, -1
return current_operation, total_operations
def progress_bar_from_logfile(filename:str, desc:str="", verbose:int=1, **kwargs):
"""
Print a progress bar from a log file.
"""
from tqdm import tqdm
from time import sleep
k=0
limit=600
update_interval=0.2
sleep(2) # Wait for the process to be launched, and for the previous log file to be overwritten if necessary.
wait_until_file_exists(filename, verbose=verbose, limit=limit)
current_operation, total_operations = get_progress_from_logfile(filename)
previous_operation = 0
while total_operations == 0: # Wait for the simulation to be initialised, and to run the first operation
sleep(2)
current_operation, total_operations = get_progress_from_logfile(filename)
if current_operation == total_operations:
# print_message("Progress bar not needed, the process is already finished.", 3, "low level", verbose=verbose)
if current_operation == -1:
print_message("Error appeared in log file, skipping it.",level=3,module="low level",verbose=verbose)
return
with tqdm(desc=desc, total=total_operations, disable=(verbose==0), **kwargs) as pbar:
while current_operation < total_operations and k*update_interval < limit:
sleep(update_interval)
current_operation, total_operations = get_progress_from_logfile(filename)
if current_operation > previous_operation:
pbar.update(current_operation-previous_operation)
previous_operation = current_operation
if current_operation == -1:
print_message("Error appeared in log file, skipping it.",level=3,module="low level",verbose=verbose)
return
k+=1
if k*update_interval >= limit:
print_message(f"Progress bar timed out after {limit} seconds.", 3, "low level", verbose=verbose)
return