#!/usr/bin/env python # ------------------------------------------------------------------------------------- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # The text of the license is located in the root directory of the source package. # ------------------------------------------------------------------------------------- __author__ = "Tristan Hoellinger, Mayeul Aubin" __version__ = "0.1" __date__ = "2024" __license__ = "GPLv3" """ Tools to deal with low-level operations e.g. redirecting C stdout. """ from contextlib import contextmanager import ctypes import io import os, sys import tempfile libc = ctypes.CDLL(None) c_stdout = ctypes.c_void_p.in_dll(libc, "stdout") # Taken from: # https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/ @contextmanager def stdout_redirector(stream): """A context manager that redirects stdout to the given stream. For instance, this could be used to redirect C code stdout to None (to avoid cluttering the terminal eg when using tqdm). Args: stream (file-like object): The stream to which stdout should be redirected. Example: >>> with stdout_redirector(stream): >>> print("Hello world!") # Will be printed to stream instead of stdout. """ # The original fd stdout points to. Usually 1 on POSIX systems. original_stdout_fd = sys.stdout.fileno() def _redirect_stdout(to_fd): """Redirect stdout to the given file descriptor.""" # Flush the C-level buffer stdout libc.fflush(c_stdout) # Flush and close sys.stdout - also closes the file descriptor (fd) sys.stdout.close() # Make original_stdout_fd point to the same file as to_fd os.dup2(to_fd, original_stdout_fd) # Create a new sys.stdout that points to the redirected fd sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, "wb")) # Save a copy of the original stdout fd in saved_stdout_fd saved_stdout_fd = os.dup(original_stdout_fd) try: # Create a temporary file and redirect stdout to it tfile = tempfile.TemporaryFile(mode="w+b") _redirect_stdout(tfile.fileno()) # Yield to caller, then redirect stdout back to the saved fd yield _redirect_stdout(saved_stdout_fd) # Copy contents of temporary file to the given stream tfile.flush() tfile.seek(0, io.SEEK_SET) stream.write(tfile.read()) finally: tfile.close() os.close(saved_stdout_fd) # Adapted from: # https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/ c_stderr = ctypes.c_void_p.in_dll(libc, "stderr") @contextmanager def stderr_redirector(stream): """A context manager that redirects stderr to the given stream. For instance, this could be used to redirect C code stderr to None (to avoid cluttering the terminal eg when using tqdm). WARNING: this should be used with CAUTION as it redirects all stderr, so NONE OF THE ERRORS WILL BE DISPLAYED, no matter the severity. Args: stream (file-like object): The stream to which stdout should be redirected. """ # The original fd stdout points to. Usually 1 on POSIX systems. original_stderr_fd = sys.stderr.fileno() def _redirect_stderr(to_fd): """Redirect stderr to the given file descriptor.""" # Flush the C-level buffer stderr libc.fflush(c_stderr) # Flush and close sys.stderr - also closes the file descriptor (fd) sys.stderr.close() # Make original_stderr_fd point to the same file as to_fd os.dup2(to_fd, original_stderr_fd) # Create a new sys.stderr that points to the redirected fd sys.stderr = io.TextIOWrapper(os.fdopen(original_stderr_fd, "wb")) # Save a copy of the original stdout fd in saved_stdout_fd saved_stderr_fd = os.dup(original_stderr_fd) try: # Create a temporary file and redirect stdout to it tfile = tempfile.TemporaryFile(mode="w+b") _redirect_stderr(tfile.fileno()) # Yield to caller, then redirect stdout back to the saved fd yield _redirect_stderr(saved_stderr_fd) # Copy contents of temporary file to the given stream tfile.flush() tfile.seek(0, io.SEEK_SET) stream.write(tfile.read()) finally: tfile.close() os.close(saved_stderr_fd) def print_level(level:int, module:str): """ Generate a string with the current time, the module name and the level of the message. """ from datetime import datetime max_len_module = 14 date = datetime.now() out="" out+=date.strftime("%H:%M:%S") out+=" | " out+=module.upper().center(max_len_module) out+=" | " out+=">"*level out+=" " return out def print_message(message:str, level:int, module:str, verbose:int = 1): """ Print a message with a given level and module name. """ if verbose >= 1: print(print_level(level, module)+message) def print_header(verbose:int=1): """ Print the header of the program. """ if verbose >= 1: from datetime import datetime date = datetime.now() width=40 program_name="SBMY CONTROL" print("#"*width) print("# "+program_name.center(width-4)+" #") print("# "+date.strftime("%Y-%m-%d %H:%M:%S").center(width-4)+" #") print("#"*width) print("") def print_starting_module(module:str, verbose:int=1): """ Print the starting message of a module. """ if verbose >= 1: print(print_level(0, module)+f"Starting {module} module.") def print_ending_module(module:str, verbose:int=1): """ Print the ending message of a module. """ if verbose >= 1: print(print_level(0, module)+f"Ending {module} module.") def wait_until_file_exists(filename:str, verbose:int=1, limit:int=1200): """ Wait until a file exists. """ from time import sleep from os.path import isfile k=0 while not isfile(filename) and k=limit: raise TimeoutError(f"File {filename} not found after {limit} seconds.") if k>60: print_message(f"File {filename} exists. {k//60} minutes elapsed.", 3, "low level", verbose=verbose) def get_progress_from_logfile(filename): """ Get the number of operations done and the total number of operations from a log file. """ current_operation = 0 total_operations = 0 from os.path import isfile if not isfile(filename): raise FileNotFoundError(f"Log file {filename} not found.") with open(filename, "r") as f: lines = f.readlines() for line in lines: if " operation " in line: try: splitted_line = line.split(" operation ")[1] splitted_line = splitted_line.split(":")[0] current_operation = int(splitted_line.split("/")[0]) total_operations = int(splitted_line.split("/")[1]) except: pass elif "Fatal" in line or "Error" in line: return -1, -1 return current_operation, total_operations def progress_bar_from_logfile(filename:str, desc:str="", verbose:int=1, **kwargs): """ Print a progress bar from a log file. """ from tqdm import tqdm from time import sleep k=0 limit=600 update_interval=0.2 sleep(2) # Wait for the process to be launched, and for the previous log file to be overwritten if necessary. wait_until_file_exists(filename, verbose=verbose, limit=limit) current_operation, total_operations = get_progress_from_logfile(filename) previous_operation = 0 while total_operations == 0: # Wait for the simulation to be initialised, and to run the first operation sleep(2) current_operation, total_operations = get_progress_from_logfile(filename) if current_operation == total_operations: # print_message("Progress bar not needed, the process is already finished.", 3, "low level", verbose=verbose) if current_operation == -1: print_message("Error appeared in log file, skipping it.",level=3,module="low level",verbose=verbose) return with tqdm(desc=desc, total=total_operations, disable=(verbose==0), **kwargs) as pbar: while current_operation < total_operations and k*update_interval < limit: sleep(update_interval) current_operation, total_operations = get_progress_from_logfile(filename) if current_operation > previous_operation: pbar.update(current_operation-previous_operation) previous_operation = current_operation if current_operation == -1: print_message("Error appeared in log file, skipping it.",level=3,module="low level",verbose=verbose) return k+=1 if k*update_interval >= limit: print_message(f"Progress bar timed out after {limit} seconds.", 3, "low level", verbose=verbose) return