Skip to content

pybasemkit API Documentation

argparse_action

Created on 2025-08-29

@author: wf

StoreDictKeyPair

Bases: Action

Custom argparse action to store key-value pairs as a dictionary.

This class implements an argparse action to parse and store command-line arguments in the form of key-value pairs. The pairs should be separated by a comma and each key-value pair should be separated by an equals sign.

Example

--option key1=value1,key2=value2,key3=value3

Reference

https://stackoverflow.com/a/42355279/1497139

Source code in basemkit/argparse_action.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class StoreDictKeyPair(argparse.Action):
    """
    Custom argparse action to store key-value pairs as a dictionary.

    This class implements an argparse action to parse and store command-line
    arguments in the form of key-value pairs. The pairs should be separated by
    a comma and each key-value pair should be separated by an equals sign.

    Example:
        --option key1=value1,key2=value2,key3=value3

    Reference:
        https://stackoverflow.com/a/42355279/1497139
    """

    def __call__(
        self,
        _parser: argparse.ArgumentParser,
        namespace: argparse.Namespace,
        values: str,
        _option_string: Optional[str] = None,
    ) -> None:
        """
        Parse key-value pairs and store them as a dictionary in the namespace.

        Args:
            parser (argparse.ArgumentParser): The argument parser object.
            namespace (argparse.Namespace): The namespace to store the parsed values.
            values (str): The string containing key-value pairs separated by commas.
            option_string (Optional[str]): The option string, if provided.
        """
        my_dict = {}
        for kv in values.split(","):
            k, v = kv.split("=")
            my_dict[k] = v
        setattr(namespace, self.dest, my_dict)

__call__(_parser, namespace, values, _option_string=None)

Parse key-value pairs and store them as a dictionary in the namespace.

Parameters:

Name Type Description Default
parser ArgumentParser

The argument parser object.

required
namespace Namespace

The namespace to store the parsed values.

required
values str

The string containing key-value pairs separated by commas.

required
option_string Optional[str]

The option string, if provided.

required
Source code in basemkit/argparse_action.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __call__(
    self,
    _parser: argparse.ArgumentParser,
    namespace: argparse.Namespace,
    values: str,
    _option_string: Optional[str] = None,
) -> None:
    """
    Parse key-value pairs and store them as a dictionary in the namespace.

    Args:
        parser (argparse.ArgumentParser): The argument parser object.
        namespace (argparse.Namespace): The namespace to store the parsed values.
        values (str): The string containing key-value pairs separated by commas.
        option_string (Optional[str]): The option string, if provided.
    """
    my_dict = {}
    for kv in values.split(","):
        k, v = kv.split("=")
        my_dict[k] = v
    setattr(namespace, self.dest, my_dict)

base_cmd

Created on 2025-06-16

Minimal reusable command line base class with standard options.

@author: wf

BaseCmd

Minimal reusable command line base class with standard options: --about, --debug, --force, --quiet, --verbose, --version.

Intended to be subclassed by tools requiring consistent CLI behavior.

Source code in basemkit/base_cmd.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class BaseCmd:
    """
    Minimal reusable command line base class with standard options:
    --about, --debug, --force, --quiet, --verbose, --version.

    Intended to be subclassed by tools requiring consistent CLI behavior.
    """

    def __init__(self, version: Any, description: Optional[str] = None):
        """
        Initialize the BaseCmd instance.

        Args:
            version: An object with .name, .version, .description, and .doc_url attributes.
            description (str): Optional CLI description. Defaults to version.description.
        """
        self.version = version
        self.description = description or self.version.description
        name = getattr(self.version, "name", "")
        version = getattr(self.version, "version", "")
        updated = getattr(self.version, "updated", "")
        self.version_msg = f"{name} {version} {updated}".strip()
        self.program_version_message = self.version_msg
        self.debug = False
        self.quiet = False
        self.verbose = False
        self.force = False
        self.parser = None
        self.exit_code = 0
        self.args = None

    def add_arguments(self, parser: ArgumentParser):
        """
        Add standard CLI arguments to the parser, sorted by long option name.

        Args:
            parser (ArgumentParser): The parser to add arguments to.
        """
        parser.add_argument("-a", "--about", action="store_true", help="show version info and open documentation")
        parser.add_argument("-d", "--debug", action="store_true", help="enable debug output")
        parser.add_argument(
            "--debugLocalPath", help="remote debug Server path mapping - localPath - path on machine where python runs"
        )
        parser.add_argument("--debugPort", type=int, default=5678, help="remote debug Port [default: %(default)s]")
        parser.add_argument(
            "--debugRemotePath", help="remote debug Server path mapping - remotePath - path on debug server"
        )
        parser.add_argument("--debugServer", help="remote debug Server")
        parser.add_argument("-f", "--force", action="store_true", help="force overwrite or unsafe actions")
        parser.add_argument("-q", "--quiet", action="store_true", help="suppress all output")
        parser.add_argument("-v", "--verbose", action="store_true", help="increase output verbosity")
        parser.add_argument("-V", "--version", action="version", version=self.program_version_message)

    def getArgParser(self, description: str, version_msg: str) -> ArgumentParser:
        """
        Compatibility layer for legacy camelCase contract.
        Calls get_arg_parser with overridden description and version_msg.
        """
        self.description = description
        self.program_version_message = version_msg
        parser = ArgumentParser(description=description, formatter_class=RawDescriptionHelpFormatter)
        self.add_arguments(parser)
        return parser

    def get_arg_parser(self) -> ArgumentParser:
        """
        Create and configure the argument parser.

        Returns:
            ArgumentParser: The configured argument parser.
        """
        parser = self.getArgParser(self.description, self.version_msg)
        return parser

    def parse_args(self, argv=None) -> Namespace:
        """
        Parse command line arguments.

        Args:
            argv (list): Optional list of command line arguments. Defaults to sys.argv.

        Returns:
            Namespace: Parsed argument values.
        """
        if self.parser is None:
            self.parser = self.get_arg_parser()
        self.args = self.parser.parse_args(argv)
        args = self.args
        return args

    def cmd_parse(self, argv: List[str]) -> Namespace:
        """delegate method"""
        args = self.parse_args(argv)
        return args

    def optional_debug(self, args: Namespace):
        """
        Optionally start pydevd remote debugging if debugServer is specified.

        see https://www.pydev.org/manual_adv_remote_debugger.html

        Args:
            args (Namespace): Parsed CLI arguments
        """
        if args.debugServer:
            import pydevd
            import pydevd_file_utils

            remote_path = args.debugRemotePath
            local_path = args.debugLocalPath

            # note the complexity of https://stackoverflow.com/a/41765551/1497139
            # discussed in 2011
            if remote_path and local_path:
                remotes = [r.strip() for r in remote_path.split(",")]
                locals_ = [l.strip() for l in local_path.split(",")]
                if len(remotes) != len(locals_):
                    raise ValueError("debugRemotePath and debugLocalPath must have the same number of entries")
                mappings = list(zip(remotes, locals_))
                pydevd_file_utils.setup_client_server_paths(mappings)

            pydevd.settrace(
                args.debugServer,
                port=args.debugPort,
                stdoutToServer=True,
                stderrToServer=True,
            )
            print("Remote debugger attached.")

    def handle_args(self, args: Namespace) -> bool:
        """
        Handle parsed arguments. Intended to be overridden in subclasses.

        Args:
            args (Namespace): Parsed argument namespace.

        Returns:
            bool: True if argument was handled and no further processing is required.
        """
        self.args = args
        self.debug = args.debug
        self.quiet = args.quiet
        self.verbose = args.verbose
        self.force = args.force
        self.optional_debug(args)
        if args.about:
            print(self.program_version_message)
            print(f"see {self.version.doc_url}")
            webbrowser.open(self.version.doc_url)
            return True

        return False

    def handle_exception(self, e: BaseException) -> int:
        """
        Handle exceptions occurring during execution.
        Subclasses can override this to provide custom error handling.

        Args:
            e (BaseException): The exception that was raised.

        Returns:
            int: The exit code (1 for KeyboardInterrupt, 2 for other exceptions).
        """
        exit_code = 0
        if isinstance(e, KeyboardInterrupt):
            exit_code = 1
        else:
            # Check self.debug or args.debug specifically for traceback logic
            is_debug = self.debug or (self.args and getattr(self.args, "debug", False))

            if is_debug:
                traceback.print_exc()
            else:
                msg = f"{self.version.name}: {e}\n"
                sys.stderr.write(msg)
            exit_code = 2
        return exit_code

    def run(self, argv=None) -> int:
        """
        Execute the command line logic.

        Args:
            argv (list): Optional command line arguments.

        Returns:
            int: Exit code: 0 = OK, 1 = KeyboardInterrupt, 2 = Exception.
        """
        try:
            args = self.parse_args(argv)
            handled = self.handle_args(args)
            exit_code = 0
            if not handled:
                exit_code = 0
        except BaseException as e:
            exit_code = self.handle_exception(e)
        return exit_code

    @classmethod
    def main(cls, version: Any, argv=None) -> int:
        """
        Entry point for scripts using this command line interface.

        Args:
            version: Version metadata object passed to constructor.
            argv (list): Optional command line arguments.

        Returns:
            int: Exit code from `run()`.
        """
        instance = cls(version)
        exit_code = instance.run(argv)
        return exit_code

__init__(version, description=None)

Initialize the BaseCmd instance.

Parameters:

Name Type Description Default
version Any

An object with .name, .version, .description, and .doc_url attributes.

required
description str

Optional CLI description. Defaults to version.description.

None
Source code in basemkit/base_cmd.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, version: Any, description: Optional[str] = None):
    """
    Initialize the BaseCmd instance.

    Args:
        version: An object with .name, .version, .description, and .doc_url attributes.
        description (str): Optional CLI description. Defaults to version.description.
    """
    self.version = version
    self.description = description or self.version.description
    name = getattr(self.version, "name", "")
    version = getattr(self.version, "version", "")
    updated = getattr(self.version, "updated", "")
    self.version_msg = f"{name} {version} {updated}".strip()
    self.program_version_message = self.version_msg
    self.debug = False
    self.quiet = False
    self.verbose = False
    self.force = False
    self.parser = None
    self.exit_code = 0
    self.args = None

add_arguments(parser)

Add standard CLI arguments to the parser, sorted by long option name.

Parameters:

Name Type Description Default
parser ArgumentParser

The parser to add arguments to.

required
Source code in basemkit/base_cmd.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def add_arguments(self, parser: ArgumentParser):
    """
    Add standard CLI arguments to the parser, sorted by long option name.

    Args:
        parser (ArgumentParser): The parser to add arguments to.
    """
    parser.add_argument("-a", "--about", action="store_true", help="show version info and open documentation")
    parser.add_argument("-d", "--debug", action="store_true", help="enable debug output")
    parser.add_argument(
        "--debugLocalPath", help="remote debug Server path mapping - localPath - path on machine where python runs"
    )
    parser.add_argument("--debugPort", type=int, default=5678, help="remote debug Port [default: %(default)s]")
    parser.add_argument(
        "--debugRemotePath", help="remote debug Server path mapping - remotePath - path on debug server"
    )
    parser.add_argument("--debugServer", help="remote debug Server")
    parser.add_argument("-f", "--force", action="store_true", help="force overwrite or unsafe actions")
    parser.add_argument("-q", "--quiet", action="store_true", help="suppress all output")
    parser.add_argument("-v", "--verbose", action="store_true", help="increase output verbosity")
    parser.add_argument("-V", "--version", action="version", version=self.program_version_message)

cmd_parse(argv)

delegate method

Source code in basemkit/base_cmd.py
112
113
114
115
def cmd_parse(self, argv: List[str]) -> Namespace:
    """delegate method"""
    args = self.parse_args(argv)
    return args

getArgParser(description, version_msg)

Compatibility layer for legacy camelCase contract. Calls get_arg_parser with overridden description and version_msg.

Source code in basemkit/base_cmd.py
75
76
77
78
79
80
81
82
83
84
def getArgParser(self, description: str, version_msg: str) -> ArgumentParser:
    """
    Compatibility layer for legacy camelCase contract.
    Calls get_arg_parser with overridden description and version_msg.
    """
    self.description = description
    self.program_version_message = version_msg
    parser = ArgumentParser(description=description, formatter_class=RawDescriptionHelpFormatter)
    self.add_arguments(parser)
    return parser

get_arg_parser()

Create and configure the argument parser.

Returns:

Name Type Description
ArgumentParser ArgumentParser

The configured argument parser.

Source code in basemkit/base_cmd.py
86
87
88
89
90
91
92
93
94
def get_arg_parser(self) -> ArgumentParser:
    """
    Create and configure the argument parser.

    Returns:
        ArgumentParser: The configured argument parser.
    """
    parser = self.getArgParser(self.description, self.version_msg)
    return parser

handle_args(args)

Handle parsed arguments. Intended to be overridden in subclasses.

Parameters:

Name Type Description Default
args Namespace

Parsed argument namespace.

required

Returns:

Name Type Description
bool bool

True if argument was handled and no further processing is required.

Source code in basemkit/base_cmd.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def handle_args(self, args: Namespace) -> bool:
    """
    Handle parsed arguments. Intended to be overridden in subclasses.

    Args:
        args (Namespace): Parsed argument namespace.

    Returns:
        bool: True if argument was handled and no further processing is required.
    """
    self.args = args
    self.debug = args.debug
    self.quiet = args.quiet
    self.verbose = args.verbose
    self.force = args.force
    self.optional_debug(args)
    if args.about:
        print(self.program_version_message)
        print(f"see {self.version.doc_url}")
        webbrowser.open(self.version.doc_url)
        return True

    return False

handle_exception(e)

Handle exceptions occurring during execution. Subclasses can override this to provide custom error handling.

Parameters:

Name Type Description Default
e BaseException

The exception that was raised.

required

Returns:

Name Type Description
int int

The exit code (1 for KeyboardInterrupt, 2 for other exceptions).

Source code in basemkit/base_cmd.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def handle_exception(self, e: BaseException) -> int:
    """
    Handle exceptions occurring during execution.
    Subclasses can override this to provide custom error handling.

    Args:
        e (BaseException): The exception that was raised.

    Returns:
        int: The exit code (1 for KeyboardInterrupt, 2 for other exceptions).
    """
    exit_code = 0
    if isinstance(e, KeyboardInterrupt):
        exit_code = 1
    else:
        # Check self.debug or args.debug specifically for traceback logic
        is_debug = self.debug or (self.args and getattr(self.args, "debug", False))

        if is_debug:
            traceback.print_exc()
        else:
            msg = f"{self.version.name}: {e}\n"
            sys.stderr.write(msg)
        exit_code = 2
    return exit_code

main(version, argv=None) classmethod

Entry point for scripts using this command line interface.

Parameters:

Name Type Description Default
version Any

Version metadata object passed to constructor.

required
argv list

Optional command line arguments.

None

Returns:

Name Type Description
int int

Exit code from run().

Source code in basemkit/base_cmd.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
@classmethod
def main(cls, version: Any, argv=None) -> int:
    """
    Entry point for scripts using this command line interface.

    Args:
        version: Version metadata object passed to constructor.
        argv (list): Optional command line arguments.

    Returns:
        int: Exit code from `run()`.
    """
    instance = cls(version)
    exit_code = instance.run(argv)
    return exit_code

optional_debug(args)

Optionally start pydevd remote debugging if debugServer is specified.

see https://www.pydev.org/manual_adv_remote_debugger.html

Parameters:

Name Type Description Default
args Namespace

Parsed CLI arguments

required
Source code in basemkit/base_cmd.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def optional_debug(self, args: Namespace):
    """
    Optionally start pydevd remote debugging if debugServer is specified.

    see https://www.pydev.org/manual_adv_remote_debugger.html

    Args:
        args (Namespace): Parsed CLI arguments
    """
    if args.debugServer:
        import pydevd
        import pydevd_file_utils

        remote_path = args.debugRemotePath
        local_path = args.debugLocalPath

        # note the complexity of https://stackoverflow.com/a/41765551/1497139
        # discussed in 2011
        if remote_path and local_path:
            remotes = [r.strip() for r in remote_path.split(",")]
            locals_ = [l.strip() for l in local_path.split(",")]
            if len(remotes) != len(locals_):
                raise ValueError("debugRemotePath and debugLocalPath must have the same number of entries")
            mappings = list(zip(remotes, locals_))
            pydevd_file_utils.setup_client_server_paths(mappings)

        pydevd.settrace(
            args.debugServer,
            port=args.debugPort,
            stdoutToServer=True,
            stderrToServer=True,
        )
        print("Remote debugger attached.")

parse_args(argv=None)

Parse command line arguments.

Parameters:

Name Type Description Default
argv list

Optional list of command line arguments. Defaults to sys.argv.

None

Returns:

Name Type Description
Namespace Namespace

Parsed argument values.

Source code in basemkit/base_cmd.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def parse_args(self, argv=None) -> Namespace:
    """
    Parse command line arguments.

    Args:
        argv (list): Optional list of command line arguments. Defaults to sys.argv.

    Returns:
        Namespace: Parsed argument values.
    """
    if self.parser is None:
        self.parser = self.get_arg_parser()
    self.args = self.parser.parse_args(argv)
    args = self.args
    return args

run(argv=None)

Execute the command line logic.

Parameters:

Name Type Description Default
argv list

Optional command line arguments.

None

Returns:

Name Type Description
int int

Exit code: 0 = OK, 1 = KeyboardInterrupt, 2 = Exception.

Source code in basemkit/base_cmd.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def run(self, argv=None) -> int:
    """
    Execute the command line logic.

    Args:
        argv (list): Optional command line arguments.

    Returns:
        int: Exit code: 0 = OK, 1 = KeyboardInterrupt, 2 = Exception.
    """
    try:
        args = self.parse_args(argv)
        handled = self.handle_args(args)
        exit_code = 0
        if not handled:
            exit_code = 0
    except BaseException as e:
        exit_code = self.handle_exception(e)
    return exit_code

basetest

Created on 2021-08-19

@author: wf

Basetest

Bases: TestCase

base test case

Source code in basemkit/basetest.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class Basetest(unittest.TestCase):
    """
    base test case
    """

    def setUp(self, debug=False, profile=True):
        """
        setUp test environment
        """
        unittest.TestCase.setUp(self)
        self.debug = debug
        self.profile = profile
        msg = f"test {self._testMethodName}, debug={self.debug}"
        self.profiler = Profiler(msg, profile=self.profile)

    def tearDown(self):
        unittest.TestCase.tearDown(self)
        self.profiler.time()

    @staticmethod
    def inPublicCI():
        """
        are we running in a public Continuous Integration Environment?
        """
        publicCI = getpass.getuser() in ["travis", "runner"]
        jenkins = "JENKINS_HOME" in os.environ
        return publicCI or jenkins

    @staticmethod
    def isUser(name: str):
        """Checks if the system has the given name"""
        return getpass.getuser() == name

    @staticmethod
    def timeout(seconds: float) -> Callable:
        """
        Decorator to enforce a timeout on test methods.

        Args:
            seconds (float): Timeout duration in seconds.

        Returns:
            Callable: A decorator that wraps a function and raises TimeoutError
                      if it exceeds the allowed execution time.

        Raises:
            TimeoutError: If the wrapped function exceeds the timeout.
            Exception: If the wrapped function raises any other exception.
        """

        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                result = [None]
                exception = [None]

                def target():
                    try:
                        result[0] = func(*args, **kwargs)
                    except Exception as e:
                        exception[0] = e

                thread = threading.Thread(target=target)
                thread.start()
                thread.join(seconds)

                if thread.is_alive():
                    raise TimeoutError(f"Test timed out after {seconds} seconds")

                if exception[0] is not None:
                    raise exception[0]

                return result[0]

            return wrapper

        return decorator

inPublicCI() staticmethod

are we running in a public Continuous Integration Environment?

Source code in basemkit/basetest.py
36
37
38
39
40
41
42
43
@staticmethod
def inPublicCI():
    """
    are we running in a public Continuous Integration Environment?
    """
    publicCI = getpass.getuser() in ["travis", "runner"]
    jenkins = "JENKINS_HOME" in os.environ
    return publicCI or jenkins

isUser(name) staticmethod

Checks if the system has the given name

Source code in basemkit/basetest.py
45
46
47
48
@staticmethod
def isUser(name: str):
    """Checks if the system has the given name"""
    return getpass.getuser() == name

setUp(debug=False, profile=True)

setUp test environment

Source code in basemkit/basetest.py
22
23
24
25
26
27
28
29
30
def setUp(self, debug=False, profile=True):
    """
    setUp test environment
    """
    unittest.TestCase.setUp(self)
    self.debug = debug
    self.profile = profile
    msg = f"test {self._testMethodName}, debug={self.debug}"
    self.profiler = Profiler(msg, profile=self.profile)

timeout(seconds) staticmethod

Decorator to enforce a timeout on test methods.

Parameters:

Name Type Description Default
seconds float

Timeout duration in seconds.

required

Returns:

Name Type Description
Callable Callable

A decorator that wraps a function and raises TimeoutError if it exceeds the allowed execution time.

Raises:

Type Description
TimeoutError

If the wrapped function exceeds the timeout.

Exception

If the wrapped function raises any other exception.

Source code in basemkit/basetest.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@staticmethod
def timeout(seconds: float) -> Callable:
    """
    Decorator to enforce a timeout on test methods.

    Args:
        seconds (float): Timeout duration in seconds.

    Returns:
        Callable: A decorator that wraps a function and raises TimeoutError
                  if it exceeds the allowed execution time.

    Raises:
        TimeoutError: If the wrapped function exceeds the timeout.
        Exception: If the wrapped function raises any other exception.
    """

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            result = [None]
            exception = [None]

            def target():
                try:
                    result[0] = func(*args, **kwargs)
                except Exception as e:
                    exception[0] = e

            thread = threading.Thread(target=target)
            thread.start()
            thread.join(seconds)

            if thread.is_alive():
                raise TimeoutError(f"Test timed out after {seconds} seconds")

            if exception[0] is not None:
                raise exception[0]

            return result[0]

        return wrapper

    return decorator

docker_util

Created on 2025-05-14

@author: wf

DockerUtil

docker utilities

Source code in basemkit/docker_util.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class DockerUtil:
    """
    docker utilities
    """

    def __init__(self, shell: Shell, container_name: str, log: Log, verbose: bool = False, debug: bool = False):
        self.shell = shell
        self.container_name = container_name
        self.log = log
        self.verbose = verbose
        self.debug = debug

    def handle_exception(self, context: str, ex: Exception):
        """
        handle the given exception
        """
        container_name = self.container_name
        self.log.log("❌", container_name, f"Exception {context}: {ex}")
        if self.debug:
            # extract exception type, and trace back
            ex_type = type(ex)
            ex_tb = ex.__traceback__
            # print exception stack details
            traceback.print_exception(ex_type, ex, ex_tb)

    def patch_file(self, file_path: str, callback, push_back: bool = True):
        """
        Copy a file from the container, apply a patch callback, and optionally copy it back.

        Args:
            file_path (str): Absolute path to the file inside the container.
            callback (Callable[[str], None]): Function to apply changes to the local copy.
            push_back (bool): If True, copy the modified file back to the container.
        """

        with NamedTemporaryFile(delete=False) as tmp:
            local_path = tmp.name

        # Copy file from container
        result = self.shell.run(
            f"docker cp {self.container_name}:{file_path} {local_path}",
            tee=self.debug,
        )
        if result.returncode != 0:
            raise RuntimeError(f"docker cp from {file_path} failed")

        # Apply patch callback
        callback(local_path)

        # Copy file back to container
        if push_back:
            result = self.shell.run(
                f"docker cp {local_path} {self.container_name}:{file_path}",
                tee=self.debug,
            )
            if result.returncode != 0:
                raise RuntimeError(f"docker cp back to {file_path} failed")

        # Clean up
        try:
            os.unlink(local_path)
        except Exception:
            pass

    def line_patch(self, path: str, line_callback, title: str, msg: str):
        """
        Patch a file in the container line-by-line via callback and check in using RCS.

        Args:
            path (str): Path to file inside container.
            line_callback (Callable[[str], Tuple[str, bool]]): Function to patch a line. Returns (line, found).
            title (str): What is being patched, used for error message.
            msg (str): RCS check-in message.
        """

        def patch_callback(local_path):
            with open(local_path, "r") as f:
                lines = f.readlines()
            found = False
            with open(local_path, "w") as f:
                for line in lines:
                    patched_line, was_found = line_callback(line)
                    f.write(patched_line)
                    found = found or was_found
            if not found:
                raise RuntimeError(f"⚠️  No matching line found for {title} in {path}")

        self.patch_file(path, patch_callback)
        self.run(f"""ci -l -m"{msg}" {path}""")

    def run_script(self, name: str, script_content: str, tee: bool = False, *args):
        """Run a script in the container with parameters"""
        with NamedTemporaryFile(mode="w", suffix=".sh", delete=False) as tmp:
            tmp.write(script_content)
            tmp_file = tmp.name

        os.chmod(tmp_file, 0o755)

        # Copy script to container
        container_script_path = f"/tmp/{name}.sh"
        self.run_local(f"docker cp {tmp_file} {self.container_name}:{container_script_path}")

        # Execute script in container with args
        args_str = " ".join(args)
        process = self.run_local(
            cmd=f"docker exec -i {self.container_name} bash {container_script_path} {args_str}",
            tee=tee,
        )

        # Clean up local temporary file
        try:
            os.unlink(tmp_file)
        except Exception:
            pass

        return process

    def run(self, command):
        """Run a command in the container"""
        # use single quotes
        cmd = f"docker exec -i {self.container_name} bash -c '{command}'"
        return self.run_local(cmd)

    def run_local(self, cmd: str, tee: bool = False) -> subprocess.CompletedProcess:
        """
        Run a command with sourced profile

        Args:
            cmd: The command to run
            tree: if true show stdout/stderr while running the command

        Returns:
            subprocess.CompletedProcess: The result of the command
        """
        process = self.shell.run(cmd, tee=tee, debug=self.debug)
        return process

    def inspect(self) -> Optional[Dict[str, Any]]:
        """
        Retrieve full .State of the Docker container.

        Returns:
            dict: parsed .State structure or None on error
        """
        inspect_dict = None
        cmd = f'docker inspect -f "{{{{json .State}}}}" {self.container_name}'
        result = self.shell.run(cmd, debug=self.debug)
        if result.returncode == 0:
            try:
                json_text = result.stdout.strip()
                inspect_dict = json.loads(json_text)
            except Exception as ex:
                if self.debug:
                    print(f"Failed to parse Docker state JSON: {ex}")
        return inspect_dict

    def run_shell_command(self, command: str, success_msg: str = None, error_msg: str = None) -> ShellResult:
        """
        Helper function for running shell commands with consistent error handling.

        Args:
            command: Shell command to run
            success_msg: Message to log on success
            error_msg: Message to log on error

        Returns:
            shell_result: a shell result
        """
        container_name = self.container_name
        command_success = False
        proc = None
        try:
            proc = self.shell.run(command, debug=self.debug, tee=self.verbose)
            if proc.returncode == 0:
                if success_msg:
                    self.log.log("✅", container_name, success_msg)
                command_success = True
            else:
                error_detail = error_msg or f"Command failed: {command}"
                if proc.stderr:
                    error_detail += f" - {proc.stderr}"
                self.log.log("❌", container_name, error_detail)
                command_success = False
        except Exception as ex:
            self.handle_exception(f"command '{command}'", ex)
            command_success = False
        shell_result = ShellResult(proc, command_success)
        return shell_result

    def docker_cmd(self, cmd: str, options: str = "", args: str = "") -> str:
        """
        create the given docker command with the given options
        """
        container_name = "" if cmd == "info" else self.container_name
        if options:
            options = f" {options}"
        if args:
            args = f" {args}"
        full_cmd = f"docker {cmd}{options} {container_name}{args}"
        return full_cmd

    def run_docker_cmd(self, cmd: str, options: str = "", args: str = "") -> ShellResult:
        """
        run the given docker commmand with the given options
        """
        container_name = self.container_name
        full_cmd = self.docker_cmd(cmd, options, args)
        shell_result = self.run_shell_command(
            full_cmd,
            success_msg=f"{cmd} container {container_name}",
            error_msg=f"Failed to {cmd} container {container_name}",
        )
        return shell_result

    def logs(self) -> ShellResult:
        """show the logs of the container"""
        logs_result = self.run_docker_cmd("logs")
        return logs_result

    def docker_info(self) -> ShellResult:
        """
        Check if Docker is responsive on the host system.
        """
        info_result = self.run_docker_cmd("info")
        return info_result

    def stop(self) -> ShellResult:
        """stop the server container"""
        stop_result = self.run_docker_cmd("stop")
        return stop_result

    def rm(self) -> ShellResult:
        """remove the server container."""
        rm_result = self.run_docker_cmd("rm")
        return rm_result

    def bash(self) -> bool:
        """bash into the server container."""
        bash_cmd = self.docker_cmd("exec", "-it", "/bin/bash")
        print(bash_cmd)
        return True

bash()

bash into the server container.

Source code in basemkit/docker_util.py
254
255
256
257
258
def bash(self) -> bool:
    """bash into the server container."""
    bash_cmd = self.docker_cmd("exec", "-it", "/bin/bash")
    print(bash_cmd)
    return True

docker_cmd(cmd, options='', args='')

create the given docker command with the given options

Source code in basemkit/docker_util.py
207
208
209
210
211
212
213
214
215
216
217
def docker_cmd(self, cmd: str, options: str = "", args: str = "") -> str:
    """
    create the given docker command with the given options
    """
    container_name = "" if cmd == "info" else self.container_name
    if options:
        options = f" {options}"
    if args:
        args = f" {args}"
    full_cmd = f"docker {cmd}{options} {container_name}{args}"
    return full_cmd

docker_info()

Check if Docker is responsive on the host system.

Source code in basemkit/docker_util.py
237
238
239
240
241
242
def docker_info(self) -> ShellResult:
    """
    Check if Docker is responsive on the host system.
    """
    info_result = self.run_docker_cmd("info")
    return info_result

handle_exception(context, ex)

handle the given exception

Source code in basemkit/docker_util.py
30
31
32
33
34
35
36
37
38
39
40
41
def handle_exception(self, context: str, ex: Exception):
    """
    handle the given exception
    """
    container_name = self.container_name
    self.log.log("❌", container_name, f"Exception {context}: {ex}")
    if self.debug:
        # extract exception type, and trace back
        ex_type = type(ex)
        ex_tb = ex.__traceback__
        # print exception stack details
        traceback.print_exception(ex_type, ex, ex_tb)

inspect()

Retrieve full .State of the Docker container.

Returns:

Name Type Description
dict Optional[Dict[str, Any]]

parsed .State structure or None on error

Source code in basemkit/docker_util.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def inspect(self) -> Optional[Dict[str, Any]]:
    """
    Retrieve full .State of the Docker container.

    Returns:
        dict: parsed .State structure or None on error
    """
    inspect_dict = None
    cmd = f'docker inspect -f "{{{{json .State}}}}" {self.container_name}'
    result = self.shell.run(cmd, debug=self.debug)
    if result.returncode == 0:
        try:
            json_text = result.stdout.strip()
            inspect_dict = json.loads(json_text)
        except Exception as ex:
            if self.debug:
                print(f"Failed to parse Docker state JSON: {ex}")
    return inspect_dict

line_patch(path, line_callback, title, msg)

Patch a file in the container line-by-line via callback and check in using RCS.

Parameters:

Name Type Description Default
path str

Path to file inside container.

required
line_callback Callable[[str], Tuple[str, bool]]

Function to patch a line. Returns (line, found).

required
title str

What is being patched, used for error message.

required
msg str

RCS check-in message.

required
Source code in basemkit/docker_util.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def line_patch(self, path: str, line_callback, title: str, msg: str):
    """
    Patch a file in the container line-by-line via callback and check in using RCS.

    Args:
        path (str): Path to file inside container.
        line_callback (Callable[[str], Tuple[str, bool]]): Function to patch a line. Returns (line, found).
        title (str): What is being patched, used for error message.
        msg (str): RCS check-in message.
    """

    def patch_callback(local_path):
        with open(local_path, "r") as f:
            lines = f.readlines()
        found = False
        with open(local_path, "w") as f:
            for line in lines:
                patched_line, was_found = line_callback(line)
                f.write(patched_line)
                found = found or was_found
        if not found:
            raise RuntimeError(f"⚠️  No matching line found for {title} in {path}")

    self.patch_file(path, patch_callback)
    self.run(f"""ci -l -m"{msg}" {path}""")

logs()

show the logs of the container

Source code in basemkit/docker_util.py
232
233
234
235
def logs(self) -> ShellResult:
    """show the logs of the container"""
    logs_result = self.run_docker_cmd("logs")
    return logs_result

patch_file(file_path, callback, push_back=True)

Copy a file from the container, apply a patch callback, and optionally copy it back.

Parameters:

Name Type Description Default
file_path str

Absolute path to the file inside the container.

required
callback Callable[[str], None]

Function to apply changes to the local copy.

required
push_back bool

If True, copy the modified file back to the container.

True
Source code in basemkit/docker_util.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def patch_file(self, file_path: str, callback, push_back: bool = True):
    """
    Copy a file from the container, apply a patch callback, and optionally copy it back.

    Args:
        file_path (str): Absolute path to the file inside the container.
        callback (Callable[[str], None]): Function to apply changes to the local copy.
        push_back (bool): If True, copy the modified file back to the container.
    """

    with NamedTemporaryFile(delete=False) as tmp:
        local_path = tmp.name

    # Copy file from container
    result = self.shell.run(
        f"docker cp {self.container_name}:{file_path} {local_path}",
        tee=self.debug,
    )
    if result.returncode != 0:
        raise RuntimeError(f"docker cp from {file_path} failed")

    # Apply patch callback
    callback(local_path)

    # Copy file back to container
    if push_back:
        result = self.shell.run(
            f"docker cp {local_path} {self.container_name}:{file_path}",
            tee=self.debug,
        )
        if result.returncode != 0:
            raise RuntimeError(f"docker cp back to {file_path} failed")

    # Clean up
    try:
        os.unlink(local_path)
    except Exception:
        pass

rm()

remove the server container.

Source code in basemkit/docker_util.py
249
250
251
252
def rm(self) -> ShellResult:
    """remove the server container."""
    rm_result = self.run_docker_cmd("rm")
    return rm_result

run(command)

Run a command in the container

Source code in basemkit/docker_util.py
135
136
137
138
139
def run(self, command):
    """Run a command in the container"""
    # use single quotes
    cmd = f"docker exec -i {self.container_name} bash -c '{command}'"
    return self.run_local(cmd)

run_docker_cmd(cmd, options='', args='')

run the given docker commmand with the given options

Source code in basemkit/docker_util.py
219
220
221
222
223
224
225
226
227
228
229
230
def run_docker_cmd(self, cmd: str, options: str = "", args: str = "") -> ShellResult:
    """
    run the given docker commmand with the given options
    """
    container_name = self.container_name
    full_cmd = self.docker_cmd(cmd, options, args)
    shell_result = self.run_shell_command(
        full_cmd,
        success_msg=f"{cmd} container {container_name}",
        error_msg=f"Failed to {cmd} container {container_name}",
    )
    return shell_result

run_local(cmd, tee=False)

Run a command with sourced profile

Parameters:

Name Type Description Default
cmd str

The command to run

required
tree

if true show stdout/stderr while running the command

required

Returns:

Type Description
CompletedProcess

subprocess.CompletedProcess: The result of the command

Source code in basemkit/docker_util.py
141
142
143
144
145
146
147
148
149
150
151
152
153
def run_local(self, cmd: str, tee: bool = False) -> subprocess.CompletedProcess:
    """
    Run a command with sourced profile

    Args:
        cmd: The command to run
        tree: if true show stdout/stderr while running the command

    Returns:
        subprocess.CompletedProcess: The result of the command
    """
    process = self.shell.run(cmd, tee=tee, debug=self.debug)
    return process

run_script(name, script_content, tee=False, *args)

Run a script in the container with parameters

Source code in basemkit/docker_util.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def run_script(self, name: str, script_content: str, tee: bool = False, *args):
    """Run a script in the container with parameters"""
    with NamedTemporaryFile(mode="w", suffix=".sh", delete=False) as tmp:
        tmp.write(script_content)
        tmp_file = tmp.name

    os.chmod(tmp_file, 0o755)

    # Copy script to container
    container_script_path = f"/tmp/{name}.sh"
    self.run_local(f"docker cp {tmp_file} {self.container_name}:{container_script_path}")

    # Execute script in container with args
    args_str = " ".join(args)
    process = self.run_local(
        cmd=f"docker exec -i {self.container_name} bash {container_script_path} {args_str}",
        tee=tee,
    )

    # Clean up local temporary file
    try:
        os.unlink(tmp_file)
    except Exception:
        pass

    return process

run_shell_command(command, success_msg=None, error_msg=None)

Helper function for running shell commands with consistent error handling.

Parameters:

Name Type Description Default
command str

Shell command to run

required
success_msg str

Message to log on success

None
error_msg str

Message to log on error

None

Returns:

Name Type Description
shell_result ShellResult

a shell result

Source code in basemkit/docker_util.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def run_shell_command(self, command: str, success_msg: str = None, error_msg: str = None) -> ShellResult:
    """
    Helper function for running shell commands with consistent error handling.

    Args:
        command: Shell command to run
        success_msg: Message to log on success
        error_msg: Message to log on error

    Returns:
        shell_result: a shell result
    """
    container_name = self.container_name
    command_success = False
    proc = None
    try:
        proc = self.shell.run(command, debug=self.debug, tee=self.verbose)
        if proc.returncode == 0:
            if success_msg:
                self.log.log("✅", container_name, success_msg)
            command_success = True
        else:
            error_detail = error_msg or f"Command failed: {command}"
            if proc.stderr:
                error_detail += f" - {proc.stderr}"
            self.log.log("❌", container_name, error_detail)
            command_success = False
    except Exception as ex:
        self.handle_exception(f"command '{command}'", ex)
        command_success = False
    shell_result = ShellResult(proc, command_success)
    return shell_result

stop()

stop the server container

Source code in basemkit/docker_util.py
244
245
246
247
def stop(self) -> ShellResult:
    """stop the server container"""
    stop_result = self.run_docker_cmd("stop")
    return stop_result

persistent_log

Created on 2024-10-04

@author: wf

Log

Wrapper for persistent logging.

Source code in basemkit/persistent_log.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@lod_storable
class Log:
    """
    Wrapper for persistent logging.
    """

    entries: List[LogEntry] = field(default_factory=list)

    def color_msg(self, color, msg):
        """Display a colored message"""
        print(f"{color}{msg}{END_COLOR}")

    def __post_init__(self):
        """
        Initializes the log with level mappings and updates the level counts.
        """
        self.do_log = True
        self.do_print = False
        self.levels = {"❌": logging.ERROR, "⚠️": logging.WARNING, "✅": logging.INFO}
        self.level_names = {
            logging.ERROR: "error",
            logging.WARNING: "warn",
            logging.INFO: "info",
        }
        self.update_level_counts()

    def clear(self):
        """
        Clears all log entries.
        """
        self.entries = []
        self.update_level_counts()

    def update_level_counts(self):
        """
        Updates the counts for each log level based on the existing entries.
        """
        self.level_counts = {"error": Counter(), "warn": Counter(), "info": Counter()}
        for entry in self.entries:
            counter = self.get_counter(entry.level_name)
            if counter is not None:
                counter[entry.kind] += 1

    def get_counter(self, level: str) -> Counter:
        """
        Returns the counter for the specified log level.
        """
        return self.level_counts.get(level)

    def get_level_summary(self, level: str, limit: int = 7) -> Tuple[int, str]:
        """
        Get a summary of the most common counts for the specified log level.

        Args:
            level (str): The log level name ('error', 'warn', 'info').
            limit (int): The maximum number of most common entries to include in the summary (default is 7).

        Returns:
            Tuple[int, str]: A tuple containing the count of log entries and a summary message.
        """
        counter = self.get_counter(level)
        if counter:
            count = sum(counter.values())
            most_common_entries = dict(counter.most_common(limit))  # Get the top 'limit' entries
            summary_msg = f"{level.capitalize()} entries: {most_common_entries}"
            return count, summary_msg
        return 0, f"No entries found for level: {level}"

    def log(self, icon: str, kind: str, msg: str):
        """
        Log a message with the specified icon and kind.

        Args:
            icon (str): The icon representing the log level ('❌', '⚠️', '✅').
            kind (str): The category or type of the log message.
            msg (str): The log message to record.
        """
        level = self.levels.get(icon, logging.INFO)
        level_name = self.level_names[level]
        icon_msg = f"{icon}:{msg}"
        log_entry = LogEntry(msg=icon_msg, level_name=level_name, kind=kind)
        self.entries.append(log_entry)

        # Update level counts
        self.level_counts[level_name][kind] += 1

        if self.do_log:
            logging.log(level, icon_msg)
        if self.do_print:
            print(icon_msg)

    def dump(self):
        """
        dump my entries
        """
        for entry in self.entries:
            print(entry.as_text())

__post_init__()

Initializes the log with level mappings and updates the level counts.

Source code in basemkit/persistent_log.py
62
63
64
65
66
67
68
69
70
71
72
73
74
def __post_init__(self):
    """
    Initializes the log with level mappings and updates the level counts.
    """
    self.do_log = True
    self.do_print = False
    self.levels = {"❌": logging.ERROR, "⚠️": logging.WARNING, "✅": logging.INFO}
    self.level_names = {
        logging.ERROR: "error",
        logging.WARNING: "warn",
        logging.INFO: "info",
    }
    self.update_level_counts()

clear()

Clears all log entries.

Source code in basemkit/persistent_log.py
76
77
78
79
80
81
def clear(self):
    """
    Clears all log entries.
    """
    self.entries = []
    self.update_level_counts()

color_msg(color, msg)

Display a colored message

Source code in basemkit/persistent_log.py
58
59
60
def color_msg(self, color, msg):
    """Display a colored message"""
    print(f"{color}{msg}{END_COLOR}")

dump()

dump my entries

Source code in basemkit/persistent_log.py
141
142
143
144
145
146
def dump(self):
    """
    dump my entries
    """
    for entry in self.entries:
        print(entry.as_text())

get_counter(level)

Returns the counter for the specified log level.

Source code in basemkit/persistent_log.py
93
94
95
96
97
def get_counter(self, level: str) -> Counter:
    """
    Returns the counter for the specified log level.
    """
    return self.level_counts.get(level)

get_level_summary(level, limit=7)

Get a summary of the most common counts for the specified log level.

Parameters:

Name Type Description Default
level str

The log level name ('error', 'warn', 'info').

required
limit int

The maximum number of most common entries to include in the summary (default is 7).

7

Returns:

Type Description
Tuple[int, str]

Tuple[int, str]: A tuple containing the count of log entries and a summary message.

Source code in basemkit/persistent_log.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_level_summary(self, level: str, limit: int = 7) -> Tuple[int, str]:
    """
    Get a summary of the most common counts for the specified log level.

    Args:
        level (str): The log level name ('error', 'warn', 'info').
        limit (int): The maximum number of most common entries to include in the summary (default is 7).

    Returns:
        Tuple[int, str]: A tuple containing the count of log entries and a summary message.
    """
    counter = self.get_counter(level)
    if counter:
        count = sum(counter.values())
        most_common_entries = dict(counter.most_common(limit))  # Get the top 'limit' entries
        summary_msg = f"{level.capitalize()} entries: {most_common_entries}"
        return count, summary_msg
    return 0, f"No entries found for level: {level}"

log(icon, kind, msg)

Log a message with the specified icon and kind.

Parameters:

Name Type Description Default
icon str

The icon representing the log level ('❌', '⚠️', '✅').

required
kind str

The category or type of the log message.

required
msg str

The log message to record.

required
Source code in basemkit/persistent_log.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def log(self, icon: str, kind: str, msg: str):
    """
    Log a message with the specified icon and kind.

    Args:
        icon (str): The icon representing the log level ('❌', '⚠️', '✅').
        kind (str): The category or type of the log message.
        msg (str): The log message to record.
    """
    level = self.levels.get(icon, logging.INFO)
    level_name = self.level_names[level]
    icon_msg = f"{icon}:{msg}"
    log_entry = LogEntry(msg=icon_msg, level_name=level_name, kind=kind)
    self.entries.append(log_entry)

    # Update level counts
    self.level_counts[level_name][kind] += 1

    if self.do_log:
        logging.log(level, icon_msg)
    if self.do_print:
        print(icon_msg)

update_level_counts()

Updates the counts for each log level based on the existing entries.

Source code in basemkit/persistent_log.py
83
84
85
86
87
88
89
90
91
def update_level_counts(self):
    """
    Updates the counts for each log level based on the existing entries.
    """
    self.level_counts = {"error": Counter(), "warn": Counter(), "info": Counter()}
    for entry in self.entries:
        counter = self.get_counter(entry.level_name)
        if counter is not None:
            counter[entry.kind] += 1

LogEntry

Represents a log entry with a message, kind, and log level name.

Source code in basemkit/persistent_log.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@lod_storable
class LogEntry:
    """
    Represents a log entry with a message, kind, and log level name.
    """

    msg: str
    kind: str
    level_name: str
    timestamp: Optional[str] = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now().isoformat()

    def as_text(self) -> str:
        """Return text representation of log entry with timestamp"""
        text = f"[{self.timestamp}] {self.msg}"
        return text

    def as_html(self) -> str:
        """Return HTML representation of log entry with appropriate styling"""
        color = "red" if self.level_name == "error" else "orange" if self.level_name == "warn" else "black"
        text = self.as_text()
        markup = f'<p style="color: {color};">{text}</p>'
        return markup

as_html()

Return HTML representation of log entry with appropriate styling

Source code in basemkit/persistent_log.py
42
43
44
45
46
47
def as_html(self) -> str:
    """Return HTML representation of log entry with appropriate styling"""
    color = "red" if self.level_name == "error" else "orange" if self.level_name == "warn" else "black"
    text = self.as_text()
    markup = f'<p style="color: {color};">{text}</p>'
    return markup

as_text()

Return text representation of log entry with timestamp

Source code in basemkit/persistent_log.py
37
38
39
40
def as_text(self) -> str:
    """Return text representation of log entry with timestamp"""
    text = f"[{self.timestamp}] {self.msg}"
    return text

profiler

Created on 2022-11-18

@author: wf

Profiler

simple profiler

Source code in basemkit/profiler.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Profiler:
    """
    simple profiler
    """

    def __init__(self, msg: str, profile=True, with_start: bool = True):
        """
        Construct the profiler with the given message and flags.

        Args:
            msg (str): The message to show if profiling is active.
            profile (bool): True if profiling messages should be shown.
            with_start (bool): If True, show start message immediately.
        """
        self.msg = msg
        self.profile = profile
        if with_start:
            self.start()

    def start(self):
        """
        start profiling
        """
        self.starttime = time.time()
        if self.profile:
            print(f"Starting {self.msg} ...")

    def time(self, extraMsg: str = ""):
        """
        time the action and print if profile is active
        """
        elapsed = time.time() - self.starttime
        if self.profile:
            print(f"{self.msg}{extraMsg} took {elapsed:5.1f} s")
        return elapsed

__init__(msg, profile=True, with_start=True)

Construct the profiler with the given message and flags.

Parameters:

Name Type Description Default
msg str

The message to show if profiling is active.

required
profile bool

True if profiling messages should be shown.

True
with_start bool

If True, show start message immediately.

True
Source code in basemkit/profiler.py
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, msg: str, profile=True, with_start: bool = True):
    """
    Construct the profiler with the given message and flags.

    Args:
        msg (str): The message to show if profiling is active.
        profile (bool): True if profiling messages should be shown.
        with_start (bool): If True, show start message immediately.
    """
    self.msg = msg
    self.profile = profile
    if with_start:
        self.start()

start()

start profiling

Source code in basemkit/profiler.py
29
30
31
32
33
34
35
def start(self):
    """
    start profiling
    """
    self.starttime = time.time()
    if self.profile:
        print(f"Starting {self.msg} ...")

time(extraMsg='')

time the action and print if profile is active

Source code in basemkit/profiler.py
37
38
39
40
41
42
43
44
def time(self, extraMsg: str = ""):
    """
    time the action and print if profile is active
    """
    elapsed = time.time() - self.starttime
    if self.profile:
        print(f"{self.msg}{extraMsg} took {elapsed:5.1f} s")
    return elapsed

shell

Created on 2025-05-14

@author: wf

Shell

Runs commands with environment from profile

Source code in basemkit/shell.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class Shell:
    """
    Runs commands with environment from profile
    """

    def __init__(self, profile: str = None, shell_path: str = None):
        """
        Initialize shell with optional profile

        Args:
            profile: Path to profile file to source e.g. ~/.zprofile
            shell_path: the shell_path e.g. /bin/zsh
        """
        self.profile = profile
        self.shell_path = shell_path
        if self.shell_path is None:
            self.shell_path = os.environ.get("SHELL", "/bin/bash")
        self.shell_name = os.path.basename(self.shell_path)
        self.source_op = "source"
        if self.profile is None:
            self.profile = self.find_profile()
            if self.profile == ".profile":
                self.source_op = "."

    def find_profile(self) -> str:
        """
        Find the appropriate profile file for the current shell

        Searches for the profile file corresponding to the shell_name
        in the user's home directory.

        Returns:
            str: Path to the profile file or None if not found
        """
        profile = None
        home = os.path.expanduser("~")
        # Try common profile files
        profiles = {"zsh": ".zprofile", "bash": ".bash_profile", "sh": ".profile"}
        if self.shell_name in profiles:
            profile_name = profiles[self.shell_name]
            path = os.path.join(home, profile_name)
            if os.path.exists(path):
                profile = path
        return profile

    @classmethod
    def ofArgs(cls, args: Namespace) -> "Shell":
        """
        Create Shell from command line args

        Args:
            args: Arguments with optional profile

        Returns:
            Shell: Configured Shell
        """
        # Use explicit profile or detect
        profile = getattr(args, "profile", None)
        shell = cls(profile=profile)
        return shell

    def run(self, cmd: str, text: bool = True, debug: bool = False, tee: bool = False) -> subprocess.CompletedProcess:
        """
        Run command with profile, always capturing output and optionally teeing it.

        Args:
            cmd: Command to run
            text: Text mode for subprocess I/O
            debug: Print the command to be run
            tee: If True, also print output live while capturing

        Returns:
            subprocess.CompletedProcess
        """
        shell_cmd = f"source {self.profile} && {cmd}" if self.profile else cmd

        if debug:
            print(f"Running: {shell_cmd}")

        popen_process = subprocess.Popen(
            [self.shell_path, "-c", shell_cmd],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=text,
        )

        std_tee = StdTee.run(popen_process, tee=tee)
        returncode = popen_process.wait()

        process = subprocess.CompletedProcess(
            args=popen_process.args,
            returncode=returncode,
            stdout=std_tee.stdout_buffer.getvalue(),
            stderr=std_tee.stderr_buffer.getvalue(),
        )

        if process.returncode != 0:
            if debug:
                msg = f"""{process.args} failed:
  returncode: {process.returncode}
  stdout    : {process.stdout.strip()}
  stderr    : {process.stderr.strip()}
"""
                print(msg, file=sys.stderr)
            pass

        return process

    def proc_stats(
        self,
        title: str,
        procs: Dict[Path, subprocess.CompletedProcess],
        ignores: List[str] = [],
    ):
        """
        Show process statistics with checkmark/crossmark and success/failure summary.

        Args:
            title (str): A short title to label the output section.
            procs (Dict[Path, subprocess.CompletedProcess]): Mapping of input files to their process results.
            ignores (List[str], optional): List of substrings. If any is found in stderr, the error is ignored.
        """
        total = len(procs)
        failures = 0
        print(f"\n{total} {title}:")
        for idx, (path, result) in enumerate(procs.items(), start=1):
            stderr = result.stderr or ""
            stdout = result.stdout or ""
            ignored = any(ignore in stderr for ignore in ignores)
            has_error = (stderr and not ignored) or ("Error" in stdout)
            if has_error:
                symbol = "❌"
                failures += 1
            else:
                symbol = "✅"
            print(f"{symbol} {idx}/{total}: {path.name}")
        percent_ok = ((total - failures) / total) * 100 if total > 0 else 0
        print(f"\n{total - failures}/{total} ({percent_ok:.1f}%), ❌ {failures}/{total} ({100 - percent_ok:.1f}%)")

__init__(profile=None, shell_path=None)

Initialize shell with optional profile

Parameters:

Name Type Description Default
profile str

Path to profile file to source e.g. ~/.zprofile

None
shell_path str

the shell_path e.g. /bin/zsh

None
Source code in basemkit/shell.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def __init__(self, profile: str = None, shell_path: str = None):
    """
    Initialize shell with optional profile

    Args:
        profile: Path to profile file to source e.g. ~/.zprofile
        shell_path: the shell_path e.g. /bin/zsh
    """
    self.profile = profile
    self.shell_path = shell_path
    if self.shell_path is None:
        self.shell_path = os.environ.get("SHELL", "/bin/bash")
    self.shell_name = os.path.basename(self.shell_path)
    self.source_op = "source"
    if self.profile is None:
        self.profile = self.find_profile()
        if self.profile == ".profile":
            self.source_op = "."

find_profile()

Find the appropriate profile file for the current shell

Searches for the profile file corresponding to the shell_name in the user's home directory.

Returns:

Name Type Description
str str

Path to the profile file or None if not found

Source code in basemkit/shell.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def find_profile(self) -> str:
    """
    Find the appropriate profile file for the current shell

    Searches for the profile file corresponding to the shell_name
    in the user's home directory.

    Returns:
        str: Path to the profile file or None if not found
    """
    profile = None
    home = os.path.expanduser("~")
    # Try common profile files
    profiles = {"zsh": ".zprofile", "bash": ".bash_profile", "sh": ".profile"}
    if self.shell_name in profiles:
        profile_name = profiles[self.shell_name]
        path = os.path.join(home, profile_name)
        if os.path.exists(path):
            profile = path
    return profile

ofArgs(args) classmethod

Create Shell from command line args

Parameters:

Name Type Description Default
args Namespace

Arguments with optional profile

required

Returns:

Name Type Description
Shell Shell

Configured Shell

Source code in basemkit/shell.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@classmethod
def ofArgs(cls, args: Namespace) -> "Shell":
    """
    Create Shell from command line args

    Args:
        args: Arguments with optional profile

    Returns:
        Shell: Configured Shell
    """
    # Use explicit profile or detect
    profile = getattr(args, "profile", None)
    shell = cls(profile=profile)
    return shell

proc_stats(title, procs, ignores=[])

Show process statistics with checkmark/crossmark and success/failure summary.

Parameters:

Name Type Description Default
title str

A short title to label the output section.

required
procs Dict[Path, CompletedProcess]

Mapping of input files to their process results.

required
ignores List[str]

List of substrings. If any is found in stderr, the error is ignored.

[]
Source code in basemkit/shell.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def proc_stats(
    self,
    title: str,
    procs: Dict[Path, subprocess.CompletedProcess],
    ignores: List[str] = [],
):
    """
    Show process statistics with checkmark/crossmark and success/failure summary.

    Args:
        title (str): A short title to label the output section.
        procs (Dict[Path, subprocess.CompletedProcess]): Mapping of input files to their process results.
        ignores (List[str], optional): List of substrings. If any is found in stderr, the error is ignored.
    """
    total = len(procs)
    failures = 0
    print(f"\n{total} {title}:")
    for idx, (path, result) in enumerate(procs.items(), start=1):
        stderr = result.stderr or ""
        stdout = result.stdout or ""
        ignored = any(ignore in stderr for ignore in ignores)
        has_error = (stderr and not ignored) or ("Error" in stdout)
        if has_error:
            symbol = "❌"
            failures += 1
        else:
            symbol = "✅"
        print(f"{symbol} {idx}/{total}: {path.name}")
    percent_ok = ((total - failures) / total) * 100 if total > 0 else 0
    print(f"\n{total - failures}/{total} ({percent_ok:.1f}%), ❌ {failures}/{total} ({100 - percent_ok:.1f}%)")

run(cmd, text=True, debug=False, tee=False)

Run command with profile, always capturing output and optionally teeing it.

Parameters:

Name Type Description Default
cmd str

Command to run

required
text bool

Text mode for subprocess I/O

True
debug bool

Print the command to be run

False
tee bool

If True, also print output live while capturing

False

Returns:

Type Description
CompletedProcess

subprocess.CompletedProcess

Source code in basemkit/shell.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    def run(self, cmd: str, text: bool = True, debug: bool = False, tee: bool = False) -> subprocess.CompletedProcess:
        """
        Run command with profile, always capturing output and optionally teeing it.

        Args:
            cmd: Command to run
            text: Text mode for subprocess I/O
            debug: Print the command to be run
            tee: If True, also print output live while capturing

        Returns:
            subprocess.CompletedProcess
        """
        shell_cmd = f"source {self.profile} && {cmd}" if self.profile else cmd

        if debug:
            print(f"Running: {shell_cmd}")

        popen_process = subprocess.Popen(
            [self.shell_path, "-c", shell_cmd],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=text,
        )

        std_tee = StdTee.run(popen_process, tee=tee)
        returncode = popen_process.wait()

        process = subprocess.CompletedProcess(
            args=popen_process.args,
            returncode=returncode,
            stdout=std_tee.stdout_buffer.getvalue(),
            stderr=std_tee.stderr_buffer.getvalue(),
        )

        if process.returncode != 0:
            if debug:
                msg = f"""{process.args} failed:
  returncode: {process.returncode}
  stdout    : {process.stdout.strip()}
  stderr    : {process.stderr.strip()}
"""
                print(msg, file=sys.stderr)
            pass

        return process

ShellResult

result of a command line call

Source code in basemkit/shell.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class ShellResult:
    """
    result of a command line call
    """

    def __init__(self, proc, success: bool):
        self.proc = proc
        self.success = success

    def __str__(self):
        text = self.as_text()
        return text

    def as_text(self, debug: bool = False):
        if debug:
            text = f"{self.proc.args} → rc={self.proc.returncode}, success={self.success}"
        else:
            text = "✅" if self.success else f"❌ → rc={self.proc.returncode}"
        return text

StdTee

Manages teeing for both stdout and stderr using StreamTee instances. Captures output in instance variables.

Source code in basemkit/shell.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class StdTee:
    """
    Manages teeing for both stdout and stderr using StreamTee instances.
    Captures output in instance variables.
    """

    def __init__(self, process, tee=True):
        self.stdout_buffer = io.StringIO()
        self.stderr_buffer = io.StringIO()
        self.out_tee = StreamTee(process.stdout, sys.stdout, self.stdout_buffer, tee)
        self.err_tee = StreamTee(process.stderr, sys.stderr, self.stderr_buffer, tee)

    def start(self):
        self.out_tee.start()
        self.err_tee.start()

    def join(self):
        self.out_tee.join()
        self.err_tee.join()

    @classmethod
    def run(cls, process, tee=True):
        """
        Run teeing and capture for the given process.
        Returns a StdTee instance with stdout/stderr captured.
        """
        std_tee = cls(process, tee=tee)
        std_tee.start()
        std_tee.join()
        return std_tee

run(process, tee=True) classmethod

Run teeing and capture for the given process. Returns a StdTee instance with stdout/stderr captured.

Source code in basemkit/shell.py
111
112
113
114
115
116
117
118
119
120
@classmethod
def run(cls, process, tee=True):
    """
    Run teeing and capture for the given process.
    Returns a StdTee instance with stdout/stderr captured.
    """
    std_tee = cls(process, tee=tee)
    std_tee.start()
    std_tee.join()
    return std_tee

StreamTee

Tees a single input stream to both a mirror and a capture buffer.

Source code in basemkit/shell.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class StreamTee:
    """
    Tees a single input stream to both a mirror and a capture buffer.
    """

    def __init__(self, source, mirror, buffer, tee=True):
        self.source = source
        self.mirror = mirror
        self.buffer = buffer
        self.tee = tee
        self.thread = threading.Thread(target=self._run, daemon=True)

    def _run(self):
        for line in iter(self.source.readline, ""):
            if self.tee:
                self.mirror.write(line)
                self.mirror.flush()
            self.buffer.write(line)
        self.source.close()

    def start(self):
        self.thread.start()

    def join(self):
        self.thread.join()

SysTee

Tee sys.stdout and sys.stderr to a logfile while preserving original output.

Source code in basemkit/shell.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class SysTee:
    """
    Tee sys.stdout and sys.stderr to a logfile while preserving original output.
    """

    def __init__(self, log_path: str):
        self.logfile = open(log_path, "a")
        self.original_stdout = sys.stdout
        self.original_stderr = sys.stderr
        sys.stdout = self
        sys.stderr = self

    def write(self, data):
        self.original_stdout.write(data)
        self.logfile.write(data)

    def flush(self):
        self.original_stdout.flush()
        self.logfile.flush()

    def close(self):
        sys.stdout = self.original_stdout
        sys.stderr = self.original_stderr
        self.logfile.close()

yamlable

Created on 2023-12-08, Extended on 2023-16-12 and 2024-01-25

@author: wf, ChatGPT

Prompts for the development and extension of the 'YamlAble' class within the 'yamable' module:

  1. Develop 'YamlAble' class in 'yamable' module. It should convert dataclass instances to/from YAML.
  2. Implement methods for YAML block scalar style and exclude None values in 'YamlAble' class.
  3. Add functionality to remove None values from dataclass instances before YAML conversion.
  4. Ensure 'YamlAble' processes only dataclass instances, with error handling for non-dataclass objects.
  5. Extend 'YamlAble' for JSON serialization and deserialization.
  6. Add methods for saving/loading dataclass instances to/from YAML and JSON files in 'YamlAble'.
  7. Implement loading of dataclass instances from URLs for both YAML and JSON in 'YamlAble'.
  8. Write tests for 'YamlAble' within the pyLodStorage context. Use 'samples 2' example from pyLoDStorage https://github.com/WolfgangFahl/pyLoDStorage/blob/master/lodstorage/sample2.py as a reference.
  9. Ensure tests cover YAML/JSON serialization, deserialization, and file I/O operations, using the sample-based approach..
  10. Use Google-style docstrings, comments, and type hints in 'YamlAble' class and tests.
  11. Adhere to instructions and seek clarification for any uncertainties.
  12. Add @lod_storable annotation support that will automatically YamlAble support and add @dataclass and @dataclass_json prerequisite behavior to a class

DateConvert

date converter

Source code in basemkit/yamlable.py
81
82
83
84
85
86
87
88
89
class DateConvert:
    """
    date converter
    """

    @classmethod
    def iso_date_to_datetime(cls, iso_date: str) -> date:
        date = datetime.strptime(iso_date, "%Y-%m-%d").date() if iso_date else None
        return date

YamlAble

Bases: Generic[T]

An extended YAML handler class for converting dataclass objects to and from YAML format, and handling loading from and saving to files and URLs.

Source code in basemkit/yamlable.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class YamlAble(Generic[T]):
    """
    An extended YAML handler class for converting dataclass objects to and from YAML format,
    and handling loading from and saving to files and URLs.
    """

    def _yaml_setup(self):
        """
        Initializes the YamAble handler, setting up custom representers and preparing it for various operations.
        """
        if not is_dataclass(self):
            raise ValueError("I must be a dataclass instance.")
        if not hasattr(self, "_yaml_dumper"):
            self._yaml_dumper = yaml.Dumper
            self._yaml_dumper.ignore_aliases = lambda *_args: True
            self._yaml_dumper.add_representer(type(None), self.represent_none)
            self._yaml_dumper.add_representer(str, self.represent_literal)

    def represent_none(self, _, __) -> yaml.Node:
        """
        Custom representer for ignoring None values in the YAML output.
        """
        return self._yaml_dumper.represent_scalar("tag:yaml.org,2002:null", "")

    def represent_literal(self, dumper: yaml.Dumper, data: str) -> yaml.Node:
        """
        Custom representer for block scalar style for strings.
        """
        if "\n" in data:
            return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
        return dumper.represent_scalar("tag:yaml.org,2002:str", data)

    def to_yaml(
        self,
        ignore_none: bool = True,
        ignore_underscore: bool = True,
        allow_unicode: bool = True,
        sort_keys: bool = False,
    ) -> str:
        """
        Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables,
        and using block scalar style for strings.

        Args:
            ignore_none: Flag to indicate whether None values should be removed from the YAML output.
            ignore_underscore: Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.
            allow_unicode: Flag to indicate whether to allow unicode characters in the output.
            sort_keys: Flag to indicate whether to sort the dictionary keys in the output.

        Returns:
            A string representation of the dataclass object in YAML format.
        """
        obj_dict = asdict(self)
        self._yaml_setup()
        clean_dict = self.remove_ignored_values(obj_dict, ignore_none, ignore_underscore)
        yaml_str = yaml.dump(
            clean_dict,
            Dumper=self._yaml_dumper,
            default_flow_style=False,
            allow_unicode=allow_unicode,
            sort_keys=sort_keys,
        )
        return yaml_str

    @classmethod
    def from_yaml(cls: Type[T], yaml_str: str) -> T:
        """
        Deserializes a YAML string to a dataclass instance.

        Args:
            yaml_str (str): A string containing YAML formatted data.

        Returns:
            T: An instance of the dataclass.
        """
        data: dict[str, Any] = yaml.safe_load(yaml_str)
        instance: T = cls.from_dict(data)
        return instance

    @classmethod
    def load_from_yaml_stream(cls: Type[T], stream: TextIO) -> T:
        """
        Loads a dataclass instance from a YAML stream.

        Args:
            stream (TextIO): The input stream containing YAML data.

        Returns:
            T: An instance of the dataclass.
        """
        yaml_str: str = stream.read()
        instance: T = cls.from_yaml(yaml_str)
        return instance

    @classmethod
    def load_from_yaml_file(cls: Type[T], filename: str) -> T:
        """
        Loads a dataclass instance from a YAML file.

        Args:
            filename (str): The path to the YAML file.

        Returns:
            T: An instance of the dataclass.
        """
        with open(filename, "r") as file:
            return cls.load_from_yaml_stream(file)

    @classmethod
    def load_from_yaml_url(cls: Type[T], url: str) -> T:
        """
        Loads a dataclass instance from a YAML string obtained from a URL.

        Args:
            url (str): The URL pointing to the YAML data.

        Returns:
            T: An instance of the dataclass.
        """
        yaml_str: str = cls.read_from_url(url)
        instance: T = cls.from_yaml(yaml_str)
        return instance

    def save_to_yaml_stream(self, file: TextIO):
        """
        Saves the current dataclass instance to the given YAML stream.

        Args:
            file (TextIO): The stream to which YAML content will be saved.
        """
        yaml_content: str = self.to_yaml()
        file.write(yaml_content)

    def save_to_yaml_file(self, filename: str):
        """
        Saves the current dataclass instance to a YAML file.

        Args:
            filename (str): The path where the YAML file will be saved.
        """

        with open(filename, "w", encoding="utf-8") as file:
            self.save_to_yaml_stream(file)

    @classmethod
    def load_from_json_file(cls: Type[T], filename: Union[str, Path]) -> T:
        """
        Loads a dataclass instance from a JSON file.

        Args:
            filename (str): The path to the JSON file.

        Returns:
            T: An instance of the dataclass.
        """
        with open(filename, "r", encoding="utf-8") as file:
            json_str: str = file.read()
        instance: T = cls.from_json(json_str)
        return instance

    @classmethod
    def load_from_json_url(cls: Type[T], url: str) -> T:
        """
        Loads a dataclass instance from a JSON string obtained from a URL.

        Args:
            url (str): The URL pointing to the JSON data.

        Returns:
            T: An instance of the dataclass.
        """
        json_str: str = cls.read_from_url(url)
        instance: T = cls.from_json(json_str)
        return instance

    def save_to_json_file(self, filename: str, **kwargs: Any):
        """
        Saves the current dataclass instance to a JSON file.

        Args:
            filename (str): The path where the JSON file will be saved.
            **kwargs: Additional keyword arguments for the `to_json` method.
        """
        json_content: str = self.to_json(**kwargs)
        with open(filename, "w", encoding="utf-8") as file:
            file.write(json_content)

    @classmethod
    def read_from_url(cls, url: str) -> str:
        """
        Helper method to fetch content from a URL.
        """
        with urllib.request.urlopen(url) as response:
            if response.status == 200:
                return response.read().decode()
            else:
                raise Exception(f"Unable to load data from URL: {url}")

    @classmethod
    def remove_ignored_values(
        cls,
        value: Any,
        ignore_none: bool = True,
        ignore_underscore: bool = False,
        ignore_empty: bool = True,
    ) -> Any:
        """
        Recursively removes specified types of values from a dictionary or list.
        By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

        Args:
            value: The value to process (dictionary, list, or other).
            ignore_none: Flag to indicate whether None values should be removed.
            ignore_underscore: Flag to indicate whether keys starting with an underscore should be removed.
            ignore_empty: Flag to indicate whether empty collections should be removed.
        """

        def is_valid(v):
            """Check if the value is valid based on the specified flags."""
            if ignore_none and v is None:
                return False
            if ignore_empty:
                if isinstance(v, Mapping) and not v:
                    return False  # Empty dictionary
                if isinstance(v, Iterable) and not isinstance(v, (str, bytes)) and not v:
                    return False  # Empty list, set, tuple, etc., but not string or bytes
            return True

        if isinstance(value, Mapping):
            value = {
                k: YamlAble.remove_ignored_values(v, ignore_none, ignore_underscore, ignore_empty)
                for k, v in value.items()
                if is_valid(v) and (not ignore_underscore or not k.startswith("_"))
            }
        elif isinstance(value, Iterable) and not isinstance(value, (str, bytes)):
            value = [
                YamlAble.remove_ignored_values(v, ignore_none, ignore_underscore, ignore_empty)
                for v in value
                if is_valid(v)
            ]
        return value

    @classmethod
    def from_dict2(cls: Type[T], data: dict) -> T:
        """
        Creates an instance of a dataclass from a dictionary, typically used in deserialization.
        """
        if not data:
            return None
        instance = from_dict(data_class=cls, data=data)
        return instance

from_dict2(data) classmethod

Creates an instance of a dataclass from a dictionary, typically used in deserialization.

Source code in basemkit/yamlable.py
334
335
336
337
338
339
340
341
342
@classmethod
def from_dict2(cls: Type[T], data: dict) -> T:
    """
    Creates an instance of a dataclass from a dictionary, typically used in deserialization.
    """
    if not data:
        return None
    instance = from_dict(data_class=cls, data=data)
    return instance

from_yaml(yaml_str) classmethod

Deserializes a YAML string to a dataclass instance.

Parameters:

Name Type Description Default
yaml_str str

A string containing YAML formatted data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in basemkit/yamlable.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@classmethod
def from_yaml(cls: Type[T], yaml_str: str) -> T:
    """
    Deserializes a YAML string to a dataclass instance.

    Args:
        yaml_str (str): A string containing YAML formatted data.

    Returns:
        T: An instance of the dataclass.
    """
    data: dict[str, Any] = yaml.safe_load(yaml_str)
    instance: T = cls.from_dict(data)
    return instance

load_from_json_file(filename) classmethod

Loads a dataclass instance from a JSON file.

Parameters:

Name Type Description Default
filename str

The path to the JSON file.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in basemkit/yamlable.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
@classmethod
def load_from_json_file(cls: Type[T], filename: Union[str, Path]) -> T:
    """
    Loads a dataclass instance from a JSON file.

    Args:
        filename (str): The path to the JSON file.

    Returns:
        T: An instance of the dataclass.
    """
    with open(filename, "r", encoding="utf-8") as file:
        json_str: str = file.read()
    instance: T = cls.from_json(json_str)
    return instance

load_from_json_url(url) classmethod

Loads a dataclass instance from a JSON string obtained from a URL.

Parameters:

Name Type Description Default
url str

The URL pointing to the JSON data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in basemkit/yamlable.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
@classmethod
def load_from_json_url(cls: Type[T], url: str) -> T:
    """
    Loads a dataclass instance from a JSON string obtained from a URL.

    Args:
        url (str): The URL pointing to the JSON data.

    Returns:
        T: An instance of the dataclass.
    """
    json_str: str = cls.read_from_url(url)
    instance: T = cls.from_json(json_str)
    return instance

load_from_yaml_file(filename) classmethod

Loads a dataclass instance from a YAML file.

Parameters:

Name Type Description Default
filename str

The path to the YAML file.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in basemkit/yamlable.py
186
187
188
189
190
191
192
193
194
195
196
197
198
@classmethod
def load_from_yaml_file(cls: Type[T], filename: str) -> T:
    """
    Loads a dataclass instance from a YAML file.

    Args:
        filename (str): The path to the YAML file.

    Returns:
        T: An instance of the dataclass.
    """
    with open(filename, "r") as file:
        return cls.load_from_yaml_stream(file)

load_from_yaml_stream(stream) classmethod

Loads a dataclass instance from a YAML stream.

Parameters:

Name Type Description Default
stream TextIO

The input stream containing YAML data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in basemkit/yamlable.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@classmethod
def load_from_yaml_stream(cls: Type[T], stream: TextIO) -> T:
    """
    Loads a dataclass instance from a YAML stream.

    Args:
        stream (TextIO): The input stream containing YAML data.

    Returns:
        T: An instance of the dataclass.
    """
    yaml_str: str = stream.read()
    instance: T = cls.from_yaml(yaml_str)
    return instance

load_from_yaml_url(url) classmethod

Loads a dataclass instance from a YAML string obtained from a URL.

Parameters:

Name Type Description Default
url str

The URL pointing to the YAML data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in basemkit/yamlable.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@classmethod
def load_from_yaml_url(cls: Type[T], url: str) -> T:
    """
    Loads a dataclass instance from a YAML string obtained from a URL.

    Args:
        url (str): The URL pointing to the YAML data.

    Returns:
        T: An instance of the dataclass.
    """
    yaml_str: str = cls.read_from_url(url)
    instance: T = cls.from_yaml(yaml_str)
    return instance

read_from_url(url) classmethod

Helper method to fetch content from a URL.

Source code in basemkit/yamlable.py
279
280
281
282
283
284
285
286
287
288
@classmethod
def read_from_url(cls, url: str) -> str:
    """
    Helper method to fetch content from a URL.
    """
    with urllib.request.urlopen(url) as response:
        if response.status == 200:
            return response.read().decode()
        else:
            raise Exception(f"Unable to load data from URL: {url}")

remove_ignored_values(value, ignore_none=True, ignore_underscore=False, ignore_empty=True) classmethod

Recursively removes specified types of values from a dictionary or list. By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

Parameters:

Name Type Description Default
value Any

The value to process (dictionary, list, or other).

required
ignore_none bool

Flag to indicate whether None values should be removed.

True
ignore_underscore bool

Flag to indicate whether keys starting with an underscore should be removed.

False
ignore_empty bool

Flag to indicate whether empty collections should be removed.

True
Source code in basemkit/yamlable.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
@classmethod
def remove_ignored_values(
    cls,
    value: Any,
    ignore_none: bool = True,
    ignore_underscore: bool = False,
    ignore_empty: bool = True,
) -> Any:
    """
    Recursively removes specified types of values from a dictionary or list.
    By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

    Args:
        value: The value to process (dictionary, list, or other).
        ignore_none: Flag to indicate whether None values should be removed.
        ignore_underscore: Flag to indicate whether keys starting with an underscore should be removed.
        ignore_empty: Flag to indicate whether empty collections should be removed.
    """

    def is_valid(v):
        """Check if the value is valid based on the specified flags."""
        if ignore_none and v is None:
            return False
        if ignore_empty:
            if isinstance(v, Mapping) and not v:
                return False  # Empty dictionary
            if isinstance(v, Iterable) and not isinstance(v, (str, bytes)) and not v:
                return False  # Empty list, set, tuple, etc., but not string or bytes
        return True

    if isinstance(value, Mapping):
        value = {
            k: YamlAble.remove_ignored_values(v, ignore_none, ignore_underscore, ignore_empty)
            for k, v in value.items()
            if is_valid(v) and (not ignore_underscore or not k.startswith("_"))
        }
    elif isinstance(value, Iterable) and not isinstance(value, (str, bytes)):
        value = [
            YamlAble.remove_ignored_values(v, ignore_none, ignore_underscore, ignore_empty)
            for v in value
            if is_valid(v)
        ]
    return value

represent_literal(dumper, data)

Custom representer for block scalar style for strings.

Source code in basemkit/yamlable.py
116
117
118
119
120
121
122
def represent_literal(self, dumper: yaml.Dumper, data: str) -> yaml.Node:
    """
    Custom representer for block scalar style for strings.
    """
    if "\n" in data:
        return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
    return dumper.represent_scalar("tag:yaml.org,2002:str", data)

represent_none(_, __)

Custom representer for ignoring None values in the YAML output.

Source code in basemkit/yamlable.py
110
111
112
113
114
def represent_none(self, _, __) -> yaml.Node:
    """
    Custom representer for ignoring None values in the YAML output.
    """
    return self._yaml_dumper.represent_scalar("tag:yaml.org,2002:null", "")

save_to_json_file(filename, **kwargs)

Saves the current dataclass instance to a JSON file.

Parameters:

Name Type Description Default
filename str

The path where the JSON file will be saved.

required
**kwargs Any

Additional keyword arguments for the to_json method.

{}
Source code in basemkit/yamlable.py
267
268
269
270
271
272
273
274
275
276
277
def save_to_json_file(self, filename: str, **kwargs: Any):
    """
    Saves the current dataclass instance to a JSON file.

    Args:
        filename (str): The path where the JSON file will be saved.
        **kwargs: Additional keyword arguments for the `to_json` method.
    """
    json_content: str = self.to_json(**kwargs)
    with open(filename, "w", encoding="utf-8") as file:
        file.write(json_content)

save_to_yaml_file(filename)

Saves the current dataclass instance to a YAML file.

Parameters:

Name Type Description Default
filename str

The path where the YAML file will be saved.

required
Source code in basemkit/yamlable.py
225
226
227
228
229
230
231
232
233
234
def save_to_yaml_file(self, filename: str):
    """
    Saves the current dataclass instance to a YAML file.

    Args:
        filename (str): The path where the YAML file will be saved.
    """

    with open(filename, "w", encoding="utf-8") as file:
        self.save_to_yaml_stream(file)

save_to_yaml_stream(file)

Saves the current dataclass instance to the given YAML stream.

Parameters:

Name Type Description Default
file TextIO

The stream to which YAML content will be saved.

required
Source code in basemkit/yamlable.py
215
216
217
218
219
220
221
222
223
def save_to_yaml_stream(self, file: TextIO):
    """
    Saves the current dataclass instance to the given YAML stream.

    Args:
        file (TextIO): The stream to which YAML content will be saved.
    """
    yaml_content: str = self.to_yaml()
    file.write(yaml_content)

to_yaml(ignore_none=True, ignore_underscore=True, allow_unicode=True, sort_keys=False)

Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables, and using block scalar style for strings.

Parameters:

Name Type Description Default
ignore_none bool

Flag to indicate whether None values should be removed from the YAML output.

True
ignore_underscore bool

Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.

True
allow_unicode bool

Flag to indicate whether to allow unicode characters in the output.

True
sort_keys bool

Flag to indicate whether to sort the dictionary keys in the output.

False

Returns:

Type Description
str

A string representation of the dataclass object in YAML format.

Source code in basemkit/yamlable.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def to_yaml(
    self,
    ignore_none: bool = True,
    ignore_underscore: bool = True,
    allow_unicode: bool = True,
    sort_keys: bool = False,
) -> str:
    """
    Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables,
    and using block scalar style for strings.

    Args:
        ignore_none: Flag to indicate whether None values should be removed from the YAML output.
        ignore_underscore: Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.
        allow_unicode: Flag to indicate whether to allow unicode characters in the output.
        sort_keys: Flag to indicate whether to sort the dictionary keys in the output.

    Returns:
        A string representation of the dataclass object in YAML format.
    """
    obj_dict = asdict(self)
    self._yaml_setup()
    clean_dict = self.remove_ignored_values(obj_dict, ignore_none, ignore_underscore)
    yaml_str = yaml.dump(
        clean_dict,
        Dumper=self._yaml_dumper,
        default_flow_style=False,
        allow_unicode=allow_unicode,
        sort_keys=sort_keys,
    )
    return yaml_str

lod_storable(cls)

Decorator to make a class LoDStorable by inheriting from YamlAble. This decorator also ensures the class is a dataclass and has JSON serialization/deserialization capabilities.

Source code in basemkit/yamlable.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def lod_storable(cls):
    """
    Decorator to make a class LoDStorable by
    inheriting from YamlAble.
    This decorator also ensures the class is a
    dataclass and has JSON serialization/deserialization
    capabilities.
    """
    cls = dataclass(cls)  # Apply the @dataclass decorator
    cls = dataclass_json(cls)  # Apply the @dataclass_json decorator

    class LoDStorable(YamlAble, cls):
        """
        decorator class
        """

        __qualname__ = cls.__qualname__
        pass

    # Ensure the new class created by the decorator (LoDStorable)
    # retains the identity of the original class (cls) for proper
    # serialization and module lookup.
    LoDStorable.__name__ = cls.__name__
    LoDStorable.__doc__ = cls.__doc__
    LoDStorable.__module__ = cls.__module__

    return LoDStorable