Initial Commit

This commit is contained in:
2025-07-07 14:12:53 +12:00
commit fc07e157c8
7 changed files with 640 additions and 0 deletions

122
util/builder/cpp_builder.py Normal file
View File

@@ -0,0 +1,122 @@
from util.generic.multithreader import MultiprocessWorker, WorkerProcess, WorkerReturn, Spinners
from util.generic.log import Log
from util.builder.info import Build, BuildType, Platform
from typing import List
import subprocess
import os
import json
class CppBuilder:
def __init__(self, builds: List[Build] = [], log: Log = None, build_dir: str = "build/", source_dir: str = ".", include_dir: str = "include/"):
if log is None: log = Log(self.__class__.__name__, Log.Level.DEBUG)
self.logger = log.create_logger()
self._builds:List[Build] = builds
self._build_dir: str = build_dir
self._source_dir: str = source_dir
self._include_dir: str = include_dir
self.multiprocesser = MultiprocessWorker(spinner_set=Spinners.SPIN_OPEN_CUBE)
def find_source_files(self, root_dir:str) -> List[str]:
cpp_files = []
for root, _, files in os.walk(root_dir):
for file in files:
if file.endswith('.c') or file.endswith('.cpp'):
relative_path = os.path.relpath(os.path.join(root, file), root_dir)
cpp_files.append(relative_path)
return cpp_files
def build(self):
if len(self._builds) > 1:
self.logger.log(Log.Level.INFO, "Starting builds...")
else:
self.logger.log(Log.Level.INFO, "Starting build...")
os.makedirs(self._build_dir, exist_ok=True)
for build in self._builds:
instruction = [
build.platform.value.compiler,
"-o", os.path.join(self._build_dir, f"{build.name}_{build.platform.value}"),
f"-march={build.platform.value.architecture}",
"-I", self._include_dir,
*self.find_source_files(self._source_dir)
]
instruction.append(build.platform.value.code_specification)
instruction.extend(build.type.value)
instruction.extend(build.additional_instructions)
self.multiprocesser.add_task(WorkerProcess(CppBuilder._build_worker, instruction))
results: List[WorkerReturn] = self.multiprocesser.run()
for res in results:
res.output_result(self.logger, r'-o (\S+)', "SUCCESS: Compiled '{output}'")
@staticmethod
def _build_worker(instruction):
try:
process = subprocess.run(instruction, capture_output=True, text=True, check=True)
return WorkerReturn(True, ' '.join(instruction), process.stdout, process.stderr, None)
except subprocess.CalledProcessError as e:
return WorkerReturn(False, ' '.join(instruction), e.stdout, e.stderr, str(e))
except Exception as e:
return WorkerReturn(False, ' '.join(instruction), "", "", str(e))
def load_config(self, file_path: str):
try:
with open(file_path, "r") as file:
config = json.load(file)
except FileNotFoundError:
self.logger.log(Log.Level.ERROR, f"File '{file_path}' not found.")
return False
except json.JSONDecodeError:
self.logger.log(Log.Level.ERROR, f"File '{file_path}' does not contain valid JSON data.")
return False
except Exception as e:
self.logger.log(Log.Level.ERROR, f"An unexpected error occurred: {e}")
return False
if config is None:
return False
configurations = config["configurations"]
global_build_args = config["global_build_args"]
for i in configurations:
build_name = i["name"]
build_type_str = i["build_type"]
platform_str = i["platform"]
if build_type_str not in BuildType.__members__:
self.logger.log(
Log.Level.ERROR,
f"Invalid build_type '{build_type_str}' for build '{build_name}'. Skipping..."
)
continue
if platform_str not in Platform.__members__:
self.logger.log(
Log.Level.ERROR,
f"Invalid platform '{platform_str}' for build '{build_name}'. Skipping..."
)
continue
build_type = BuildType[build_type_str]
platform = Platform[platform_str]
self._builds.append(Build(
build_name,
build_type,
platform,
i["args"] + global_build_args
))
self.logger.log(Log.Level.DEBUG, f"'{build_name}' config loaded from file.")
self.logger.log(Log.Level.INFO, f"Configurations successfully loaded from '{file_path}'.")

36
util/builder/info.py Normal file
View File

@@ -0,0 +1,36 @@
from dataclasses import dataclass, field
from enum import Enum
from typing import List
class BuildType(Enum):
DEBUG: List[str] = ["-g", "-O0", "-DLOG_LEVEL=DEBUG"]
TRACE: List[str] = ["-g", "-O0", "-DLOG_LEVEL=TRACE"]
RELEASE: List[str] = ["-O3", "-DREL_BUILD", "-DLOG_LEVEL=ERROR"]
PROFILE: List[str] = ["-O2", "-g", "-pg", "-DLOG_LEVEL=ERROR"]
RELWITHDEBINFO: List[str] = ["-O2", "-g", "-DLOG_LEVEL=DEBUG"]
FASTEST: List[str] = ["-Ofast", "-ffast-math", "-xHost", "-DLOG_LEVEL=ERROR"]
@dataclass
class PlatformInfo:
name: str
compiler: str
architecture: str
file_type: str
code_specification: str
def __str__(self):
return f"{self.name}_{self.architecture}{self.file_type}"
class Platform(Enum):
LINUX_x86_64 = PlatformInfo("linux", "clang", "x86-64", "", "-DSPEC_LINUX")
LINUX_x86 = PlatformInfo("linux", "clang", "x86", "", "-DSPEC_LINUX")
LINUX_x86_64_V4 = PlatformInfo("linux", "clang", "x86-64-v4", "", "-DSPEC_LINUX")
MACOS = PlatformInfo("macos", "gcc", "arm", "", "-DSPEC_DARWIN")
@dataclass
class Build:
name: str
type: BuildType
platform: PlatformInfo
additional_instructions: List[str] = field(default_factory=list)

37
util/generic/log.py Normal file
View File

@@ -0,0 +1,37 @@
from dataclasses import dataclass
from logging.handlers import RotatingFileHandler
from enum import IntEnum
from typing import Optional
import logging
@dataclass
class Log:
class Level(IntEnum):
CRITICAL = 50
FATAL = CRITICAL
ERROR = 40
WARNING = 30
WARN = WARNING
INFO = 20
DEBUG = 10
NOTSET = 0
name: str
level: Level
file: Optional[str] = None
format: str = '[%(name)s] %(asctime)s - %(levelname)s - %(message)s'
date_format: str = '%Y-%m-%d %H:%M:%S'
def create_logger(self):
logger = logging.getLogger(self.name)
logger.setLevel(self.level)
formatter = logging.Formatter(self.format, self.date_format)
self._add_handler(logger, logging.StreamHandler(), formatter)
if self.file:
self._add_handler(logger, RotatingFileHandler(self.file, maxBytes=1000000, backupCount=3), formatter)
return logger
def _add_handler(self, logger: logging.Logger, handler: logging.Handler, formatter: logging.Formatter):
handler.setLevel(self.level)
handler.setFormatter(formatter)
logger.addHandler(handler)

View File

@@ -0,0 +1,129 @@
from util.generic.log import Log
from dataclasses import dataclass
from enum import Enum
import sys
import time
from datetime import datetime
from multiprocessing import Pool, Manager
import threading
from logging import Logger
from typing import Optional
import re
@dataclass
class Spinner:
char_list: list[str]
delay: float
class Spinners(Enum):
BASIC = Spinner(["|", "/", "-", "\\"], 0.1)
SPIN_TRI_BLOCK = Spinner(["", "", "", ""], 0.1)
SPIN_RIGHT_ANGLE = Spinner(["🮤", "🮧", "🮥", "🮦"], 0.1)
SPIN_OPEN_CUBE = Spinner(["🮪", "🮫", "🮭", "🮬"], 0.1)
SPIN_DOTS = Spinner(["", "", "", "", "", "", "", "", "", ""], 0.1)
GUY_DANCING = Spinner(["🯇", "🯈"], 0.3)
GUY_JUMPING = Spinner(["🯅", "🯆"], 0.3)
SHIFTING_PATTERN = Spinner(["🮕", "🮖"], 0.1)
SHIFTING_GRADIENT = Spinner(["░▒▓█▓▒░", "▒▓█▓▒░░", "▓█▓▒░▒▓", "█▓▒░▒▓█", "▓▒░▒▓█▓", "░░▒▓█▓▒"], 0.1)
@dataclass
class WorkerProcess:
function: callable
instructions: tuple
def dispatch_worker(worker_process: WorkerProcess):
try:
result = worker_process.function(worker_process.instructions)
return result
except Exception as e:
raise
@dataclass
class WorkerReturn:
status: bool
command: str
stdout: str = ""
stderr: str = ""
error: Optional[str] = None
def output_result(self, logger: Logger, success_regex: str = None, success_message: str = None):
if self.status:
result_level = Log.Level.INFO
if success_regex and success_message:
success_match = re.search(success_regex, self.command)
success_string = success_match.group(1) if success_match else "unknown_target"
logger.log(result_level, success_message.format(output = success_string))
else:
logger.log(result_level, f"SUCCESS: {self.command}")
else:
result_level = Log.Level.ERROR
logger.log(result_level, f"FAILURE: {self.command}")
if self.error:
logger.log(result_level, f"Error: {self.error}")
if self.stdout:
logger.log(result_level, f"STDOUT:\n{self.stdout}")
if self.stderr:
logger.log(result_level, f"STDERR:\n{self.stderr}")
class MultiprocessWorker:
def __init__(self, logger: Logger = None, max_processes=4, spinner_set:Spinners = Spinners.BASIC):
if logger is None: logger = Log(self.__class__.__name__, Log.Level.DEBUG).create_logger()
self.logger = logger
self.tasks: list[WorkerProcess] = []
self.max_processes = max_processes
self.spinner:Spinner = spinner_set.value
self.manager = Manager()
self.active_processes = self.manager.Value('i', 0)
self.stop_display = self.manager.Event()
self._process_lock = threading.Lock()
def add_tasks(self, instructions: list[WorkerProcess]):
self.logger.log(Log.Level.DEBUG, f"Adding {len(instructions)} tasks.")
self.tasks.extend(instructions)
def add_task(self, instruction: WorkerProcess):
self.logger.log(Log.Level.DEBUG, f"Adding task with callable {instruction.function.__name__}")
self.tasks.append(instruction)
def _spinner_display(self, total_tasks, results):
idx = 0
list_length = len(self.spinner.char_list)
while len(results) < total_tasks:
sys.stdout.write(f"\r{self.spinner.char_list[idx]} | Completed: {len(results)}/{total_tasks} | Time: {datetime.now().strftime('%H:%M:%S')} ")
sys.stdout.flush()
idx = (idx + 1) % list_length
time.sleep(self.spinner.delay)
sys.stdout.write("\r")
sys.stdout.flush()
def run(self):
self.logger.log(Log.Level.DEBUG, f"Starting with {len(self.tasks)} tasks and max {self.max_processes} processes.")
results = []
spinner_thread = threading.Thread(target=self._spinner_display, args=(len(self.tasks), results))
spinner_thread.daemon = True
spinner_thread.start()
try:
with Pool(self.max_processes) as pool:
for wp in self.tasks:
pool.apply_async(
dispatch_worker,
args=(wp,),
callback=lambda res: results.append(res)
)
pool.close()
pool.join()
except Exception as e:
self.logger.exception("Unexpected error during multiprocessing run")
finally:
self.stop_display.set()
spinner_thread.join()
self.logger.log(Log.Level.DEBUG, "All multiprocessing tasks completed.")
return results