269 lines
No EOL
9.4 KiB
Python
269 lines
No EOL
9.4 KiB
Python
#!/usr/bin/env python
|
|
# -------------------------------------------------------------------------------------
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, version 3.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but
|
|
# WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# General Public License for more details.
|
|
#
|
|
# The text of the license is located in the root directory of the source package.
|
|
# -------------------------------------------------------------------------------------
|
|
|
|
__author__ = "Tristan Hoellinger, Mayeul Aubin"
|
|
__version__ = "0.1"
|
|
__date__ = "2024"
|
|
__license__ = "GPLv3"
|
|
|
|
"""
|
|
Tools to deal with low-level operations e.g. redirecting C stdout.
|
|
"""
|
|
|
|
from contextlib import contextmanager
|
|
import ctypes
|
|
import io
|
|
import os, sys
|
|
import tempfile
|
|
|
|
libc = ctypes.CDLL(None)
|
|
c_stdout = ctypes.c_void_p.in_dll(libc, "stdout")
|
|
|
|
|
|
# Taken from:
|
|
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
|
|
@contextmanager
|
|
def stdout_redirector(stream):
|
|
"""A context manager that redirects stdout to the given stream. For instance, this
|
|
could be used to redirect C code stdout to None (to avoid cluttering the terminal eg
|
|
when using tqdm).
|
|
|
|
Args:
|
|
stream (file-like object): The stream to which stdout should be redirected.
|
|
Example:
|
|
>>> with stdout_redirector(stream):
|
|
>>> print("Hello world!") # Will be printed to stream instead of stdout.
|
|
"""
|
|
# The original fd stdout points to. Usually 1 on POSIX systems.
|
|
original_stdout_fd = sys.stdout.fileno()
|
|
|
|
def _redirect_stdout(to_fd):
|
|
"""Redirect stdout to the given file descriptor."""
|
|
# Flush the C-level buffer stdout
|
|
libc.fflush(c_stdout)
|
|
# Flush and close sys.stdout - also closes the file descriptor (fd)
|
|
sys.stdout.close()
|
|
# Make original_stdout_fd point to the same file as to_fd
|
|
os.dup2(to_fd, original_stdout_fd)
|
|
# Create a new sys.stdout that points to the redirected fd
|
|
sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, "wb"))
|
|
|
|
# Save a copy of the original stdout fd in saved_stdout_fd
|
|
saved_stdout_fd = os.dup(original_stdout_fd)
|
|
try:
|
|
# Create a temporary file and redirect stdout to it
|
|
tfile = tempfile.TemporaryFile(mode="w+b")
|
|
_redirect_stdout(tfile.fileno())
|
|
# Yield to caller, then redirect stdout back to the saved fd
|
|
yield
|
|
_redirect_stdout(saved_stdout_fd)
|
|
# Copy contents of temporary file to the given stream
|
|
tfile.flush()
|
|
tfile.seek(0, io.SEEK_SET)
|
|
stream.write(tfile.read())
|
|
finally:
|
|
tfile.close()
|
|
os.close(saved_stdout_fd)
|
|
|
|
|
|
# Adapted from:
|
|
# https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
|
|
c_stderr = ctypes.c_void_p.in_dll(libc, "stderr")
|
|
|
|
|
|
@contextmanager
|
|
def stderr_redirector(stream):
|
|
"""A context manager that redirects stderr to the given stream.
|
|
For instance, this could be used to redirect C code stderr to None (to avoid
|
|
cluttering the terminal eg when using tqdm).
|
|
WARNING: this should be used with CAUTION as it redirects all stderr, so NONE OF THE
|
|
ERRORS WILL BE DISPLAYED, no matter the severity.
|
|
|
|
Args:
|
|
stream (file-like object): The stream to which stdout should be redirected.
|
|
"""
|
|
# The original fd stdout points to. Usually 1 on POSIX systems.
|
|
original_stderr_fd = sys.stderr.fileno()
|
|
|
|
def _redirect_stderr(to_fd):
|
|
"""Redirect stderr to the given file descriptor."""
|
|
# Flush the C-level buffer stderr
|
|
libc.fflush(c_stderr)
|
|
# Flush and close sys.stderr - also closes the file descriptor (fd)
|
|
sys.stderr.close()
|
|
# Make original_stderr_fd point to the same file as to_fd
|
|
os.dup2(to_fd, original_stderr_fd)
|
|
# Create a new sys.stderr that points to the redirected fd
|
|
sys.stderr = io.TextIOWrapper(os.fdopen(original_stderr_fd, "wb"))
|
|
|
|
# Save a copy of the original stdout fd in saved_stdout_fd
|
|
saved_stderr_fd = os.dup(original_stderr_fd)
|
|
try:
|
|
# Create a temporary file and redirect stdout to it
|
|
tfile = tempfile.TemporaryFile(mode="w+b")
|
|
_redirect_stderr(tfile.fileno())
|
|
# Yield to caller, then redirect stdout back to the saved fd
|
|
yield
|
|
_redirect_stderr(saved_stderr_fd)
|
|
# Copy contents of temporary file to the given stream
|
|
tfile.flush()
|
|
tfile.seek(0, io.SEEK_SET)
|
|
stream.write(tfile.read())
|
|
finally:
|
|
tfile.close()
|
|
os.close(saved_stderr_fd)
|
|
|
|
|
|
def print_level(level:int, module:str):
|
|
"""
|
|
Generate a string with the current time, the module name and the level of the message.
|
|
"""
|
|
from datetime import datetime
|
|
|
|
max_len_module = 14
|
|
|
|
date = datetime.now()
|
|
out=""
|
|
out+=date.strftime("%H:%M:%S")
|
|
out+=" | "
|
|
out+=module.upper().center(max_len_module)
|
|
out+=" | "
|
|
out+=">"*level
|
|
out+=" "
|
|
|
|
return out
|
|
|
|
|
|
def print_message(message:str, level:int, module:str, verbose:int = 1):
|
|
"""
|
|
Print a message with a given level and module name.
|
|
"""
|
|
if verbose >= 1:
|
|
print(print_level(level, module)+message)
|
|
|
|
|
|
def print_header(verbose:int=1):
|
|
"""
|
|
Print the header of the program.
|
|
"""
|
|
if verbose >= 1:
|
|
from datetime import datetime
|
|
date = datetime.now()
|
|
width=40
|
|
program_name="SBMY CONTROL"
|
|
print("#"*width)
|
|
print("# "+program_name.center(width-4)+" #")
|
|
print("# "+date.strftime("%Y-%m-%d %H:%M:%S").center(width-4)+" #")
|
|
print("#"*width)
|
|
print("")
|
|
|
|
|
|
def print_starting_module(module:str, verbose:int=1):
|
|
"""
|
|
Print the starting message of a module.
|
|
"""
|
|
if verbose >= 1:
|
|
print(print_level(0, module)+f"Starting {module} module.")
|
|
|
|
|
|
def print_ending_module(module:str, verbose:int=1):
|
|
"""
|
|
Print the ending message of a module.
|
|
"""
|
|
if verbose >= 1:
|
|
print(print_level(0, module)+f"Ending {module} module.")
|
|
|
|
|
|
def wait_until_file_exists(filename:str, verbose:int=1, limit:int=1200):
|
|
"""
|
|
Wait until a file exists.
|
|
"""
|
|
from time import sleep
|
|
from os.path import isfile
|
|
|
|
k=0
|
|
while not isfile(filename) and k<limit:
|
|
if k%60 == -1:
|
|
print_message(f"Waiting for {filename} to exist. {k//60} minutes elapsed.", 3, "low level", verbose=verbose)
|
|
sleep(1)
|
|
if k>=limit:
|
|
raise TimeoutError(f"File {filename} not found after {limit} seconds.")
|
|
if k>60:
|
|
print_message(f"File {filename} exists. {k//60} minutes elapsed.", 3, "low level", verbose=verbose)
|
|
|
|
|
|
def get_progress_from_logfile(filename):
|
|
"""
|
|
Get the number of operations done and the total number of operations from a log file.
|
|
"""
|
|
current_operation = 0
|
|
total_operations = 0
|
|
from os.path import isfile
|
|
if not isfile(filename):
|
|
raise FileNotFoundError(f"Log file {filename} not found.")
|
|
with open(filename, "r") as f:
|
|
lines = f.readlines()
|
|
for line in lines:
|
|
if " operation " in line:
|
|
try:
|
|
splitted_line = line.split(" operation ")[1]
|
|
splitted_line = splitted_line.split(":")[0]
|
|
current_operation = int(splitted_line.split("/")[0])
|
|
total_operations = int(splitted_line.split("/")[1])
|
|
except:
|
|
pass
|
|
elif "Fatal" in line or "Error" in line:
|
|
return -1, -1
|
|
return current_operation, total_operations
|
|
|
|
|
|
def progress_bar_from_logfile(filename:str, desc:str="", verbose:int=1, **kwargs):
|
|
"""
|
|
Print a progress bar from a log file.
|
|
"""
|
|
from tqdm import tqdm
|
|
from time import sleep
|
|
k=0
|
|
limit=600
|
|
update_interval=0.2
|
|
sleep(2) # Wait for the process to be launched, and for the previous log file to be overwritten if necessary.
|
|
wait_until_file_exists(filename, verbose=verbose, limit=limit)
|
|
current_operation, total_operations = get_progress_from_logfile(filename)
|
|
previous_operation = 0
|
|
|
|
while total_operations == 0: # Wait for the simulation to be initialised, and to run the first operation
|
|
sleep(2)
|
|
current_operation, total_operations = get_progress_from_logfile(filename)
|
|
|
|
if current_operation == total_operations:
|
|
# print_message("Progress bar not needed, the process is already finished.", 3, "low level", verbose=verbose)
|
|
if current_operation == -1:
|
|
print_message("Error appeared in log file, skipping it.",level=3,module="low level",verbose=verbose)
|
|
return
|
|
|
|
with tqdm(desc=desc, total=total_operations, disable=(verbose==0), **kwargs) as pbar:
|
|
while current_operation < total_operations and k*update_interval < limit:
|
|
sleep(update_interval)
|
|
current_operation, total_operations = get_progress_from_logfile(filename)
|
|
if current_operation > previous_operation:
|
|
pbar.update(current_operation-previous_operation)
|
|
previous_operation = current_operation
|
|
if current_operation == -1:
|
|
print_message("Error appeared in log file, skipping it.",level=3,module="low level",verbose=verbose)
|
|
return
|
|
k+=1
|
|
if k*update_interval >= limit:
|
|
print_message(f"Progress bar timed out after {limit} seconds.", 3, "low level", verbose=verbose)
|
|
|
|
return |