Skip to content

omnigraph API Documentation

blazegraph

Blazegraph

Bases: SparqlServer

Dockerized Blazegraph SPARQL server

Source code in omnigraph/blazegraph.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class Blazegraph(SparqlServer):
    """
    Dockerized Blazegraph SPARQL server
    """

    def __init__(
        self,
        config: ServerConfig,
        env: ServerEnv
    ):
        """
        Initialize the Blazegraph manager.

        Args:
            config: Server configuration
            env: Server environment (includes log, shell, debug, verbose)
        """
        super().__init__(config=config, env=env)
        self.dataloader_url = f"{self.config.base_url}/dataloader"

    def status(self) -> dict:
        """
        Get Blazegraph status information.

        Returns:
            Dictionary with status information, empty dict if error
        """
        status_dict = {}

        result = self._make_request("GET", self.config.status_url, timeout=2)

        if result["success"]:
            status_dict["status"] = "ready"
            html_content = result["content"]
            name_value_pattern = r'(?:<span id="(?P<name1>[^"]+)">(?P<value1>[^<]+)</span[^>]*>|&#47;(?P<name2>[^=]+)=(?P<value2>[^\s&#]+))'
            matches = re.finditer(name_value_pattern, html_content, re.DOTALL)

            for match in matches:
                for name_group, value_group in {
                    "name1": "value1",
                    "name2": "value2",
                }.items():
                    name = match.group(name_group)
                    if name:
                        value = match.group(value_group)
                        sanitized_value = value.replace("</p", "").replace("&#47;", "/")
                        sanitized_name = name.replace("-", "_").replace("/", "_")
                        sanitized_name = sanitized_name.replace("&#47;", "/")
                        if not sanitized_name.startswith("/"):
                            status_dict[sanitized_name] = sanitized_value
                        break
        else:
            if result.get("error"):
                status_dict["status"] = f"error: {result['error']}"
            else:
                status_dict["status"] = f"status_code: {result['status_code']}"

        return status_dict

    def load_file(self, filepath: str) -> bool:
        """
        Load a single RDF file into Blazegraph.

        Args:
            filepath: Path to RDF file

        Returns:
            True if loaded successfully
        """
        load_success = False
        try:
            with open(filepath, "rb") as f:
                result = self._make_request(
                    "POST",
                    self.config.sparql_url,
                    headers={"Content-Type": "text/turtle"},
                    data=f.read(),
                    timeout=300,
                )

            if result["success"]:
                self.log.log("✅", self.name, f"Loaded {filepath}")
                load_success = True
            else:
                error_msg = result.get("error", f"HTTP {result['status_code']}")
                self.log.log("❌", self.name, f"Failed to load {filepath}: {error_msg}")
                load_success = False

        except Exception as e:
            self.log.log("❌", self.name, f"Exception loading {filepath}: {e}")
            load_success = False

        return load_success

    def load_files_bulk(self, file_list: list) -> bool:
        """
        Load multiple files using Blazegraph's bulk loader REST API.

        Args:
            file_list: List of file paths to load

        Returns:
            True if loaded successfully
        """
        bulk_load_success = False

        if not file_list:
            bulk_load_success = False
        else:
            # Convert to absolute paths
            abs_paths = [str(Path(f).absolute()) for f in file_list]

            properties = f"""<?xml version="1.0" encoding="UTF-8"?>
            <properties>
                <entry key="format">turtle</entry>
                <entry key="quiet">false</entry>
                <entry key="verbose">1</entry>
                <entry key="namespace">kb</entry>
                <entry key="fileOrDirs">{','.join(abs_paths)}</entry>
            </properties>"""

            result = self._make_request(
                "POST",
                self.dataloader_url,
                headers={"Content-Type": "application/xml"},
                data=properties,
                timeout=3600,
            )

            if result["success"]:
                self.log.log("✅", self.container_name, f"Bulk loaded {len(file_list)} files")
                bulk_load_success = True
            else:
                error_msg = result.get("error", f"HTTP {result['status_code']}")
                self.log.log("❌", self.container_name, f"Bulk load failed: {error_msg}")
                bulk_load_success = False

        return bulk_load_success

    def load_dump_files(self, file_pattern: str = "dump_*.ttl", use_bulk: bool = True) -> int:
        """
        Load all dump files matching pattern.

        Args:
            file_pattern: Glob pattern for dump files
            use_bulk: Use bulk loader if True, individual files if False

        Returns:
            Number of files loaded successfully
        """
        files = sorted(glob.glob(file_pattern))
        loaded_count = 0

        if not files:
            self.log.log(
                "⚠️",
                self.container_name,
                f"No files found matching pattern: {file_pattern}",
            )
            loaded_count = 0
        else:
            self.log.log("✅", self.container_name, f"Found {len(files)} files to load")

            if use_bulk:
                bulk_result = self.load_files_bulk(files)
                loaded_count = len(files) if bulk_result else 0
            else:
                loaded_count = 0
                for filepath in tqdm(files, desc="Loading files"):
                    file_result = self.load_file(filepath)
                    if file_result:
                        loaded_count += 1
                    else:
                        self.log.log("❌", self.container_name, f"Failed to load: {filepath}")

        return loaded_count

    def test_geosparql(self) -> bool:
        """
        Test if GeoSPARQL functions work.

        Returns:
            True if GeoSPARQL is available
        """
        test_query = """
        PREFIX geo: <http://www.opengis.net/ont/geosparql#>
        PREFIX geof: <http://www.opengis.net/def/function/geosparql/>

        SELECT * WHERE {
            BIND(geof:distance("POINT(0 0)"^^geo:wktLiteral, "POINT(1 1)"^^geo:wktLiteral) AS ?dist)
        } LIMIT 1
        """

        result = self._make_request(
            "POST",
            self.sparql_url,
            data={"query": test_query},
            headers={"Accept": "application/sparql-results+json"},
            timeout=10,
        )

        geosparql_available = result["success"]
        return geosparql_available

__init__(config, env)

Initialize the Blazegraph manager.

Parameters:

Name Type Description Default
config ServerConfig

Server configuration

required
env ServerEnv

Server environment (includes log, shell, debug, verbose)

required
Source code in omnigraph/blazegraph.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    config: ServerConfig,
    env: ServerEnv
):
    """
    Initialize the Blazegraph manager.

    Args:
        config: Server configuration
        env: Server environment (includes log, shell, debug, verbose)
    """
    super().__init__(config=config, env=env)
    self.dataloader_url = f"{self.config.base_url}/dataloader"

load_dump_files(file_pattern='dump_*.ttl', use_bulk=True)

Load all dump files matching pattern.

Parameters:

Name Type Description Default
file_pattern str

Glob pattern for dump files

'dump_*.ttl'
use_bulk bool

Use bulk loader if True, individual files if False

True

Returns:

Type Description
int

Number of files loaded successfully

Source code in omnigraph/blazegraph.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def load_dump_files(self, file_pattern: str = "dump_*.ttl", use_bulk: bool = True) -> int:
    """
    Load all dump files matching pattern.

    Args:
        file_pattern: Glob pattern for dump files
        use_bulk: Use bulk loader if True, individual files if False

    Returns:
        Number of files loaded successfully
    """
    files = sorted(glob.glob(file_pattern))
    loaded_count = 0

    if not files:
        self.log.log(
            "⚠️",
            self.container_name,
            f"No files found matching pattern: {file_pattern}",
        )
        loaded_count = 0
    else:
        self.log.log("✅", self.container_name, f"Found {len(files)} files to load")

        if use_bulk:
            bulk_result = self.load_files_bulk(files)
            loaded_count = len(files) if bulk_result else 0
        else:
            loaded_count = 0
            for filepath in tqdm(files, desc="Loading files"):
                file_result = self.load_file(filepath)
                if file_result:
                    loaded_count += 1
                else:
                    self.log.log("❌", self.container_name, f"Failed to load: {filepath}")

    return loaded_count

load_file(filepath)

Load a single RDF file into Blazegraph.

Parameters:

Name Type Description Default
filepath str

Path to RDF file

required

Returns:

Type Description
bool

True if loaded successfully

Source code in omnigraph/blazegraph.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def load_file(self, filepath: str) -> bool:
    """
    Load a single RDF file into Blazegraph.

    Args:
        filepath: Path to RDF file

    Returns:
        True if loaded successfully
    """
    load_success = False
    try:
        with open(filepath, "rb") as f:
            result = self._make_request(
                "POST",
                self.config.sparql_url,
                headers={"Content-Type": "text/turtle"},
                data=f.read(),
                timeout=300,
            )

        if result["success"]:
            self.log.log("✅", self.name, f"Loaded {filepath}")
            load_success = True
        else:
            error_msg = result.get("error", f"HTTP {result['status_code']}")
            self.log.log("❌", self.name, f"Failed to load {filepath}: {error_msg}")
            load_success = False

    except Exception as e:
        self.log.log("❌", self.name, f"Exception loading {filepath}: {e}")
        load_success = False

    return load_success

load_files_bulk(file_list)

Load multiple files using Blazegraph's bulk loader REST API.

Parameters:

Name Type Description Default
file_list list

List of file paths to load

required

Returns:

Type Description
bool

True if loaded successfully

Source code in omnigraph/blazegraph.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def load_files_bulk(self, file_list: list) -> bool:
    """
    Load multiple files using Blazegraph's bulk loader REST API.

    Args:
        file_list: List of file paths to load

    Returns:
        True if loaded successfully
    """
    bulk_load_success = False

    if not file_list:
        bulk_load_success = False
    else:
        # Convert to absolute paths
        abs_paths = [str(Path(f).absolute()) for f in file_list]

        properties = f"""<?xml version="1.0" encoding="UTF-8"?>
        <properties>
            <entry key="format">turtle</entry>
            <entry key="quiet">false</entry>
            <entry key="verbose">1</entry>
            <entry key="namespace">kb</entry>
            <entry key="fileOrDirs">{','.join(abs_paths)}</entry>
        </properties>"""

        result = self._make_request(
            "POST",
            self.dataloader_url,
            headers={"Content-Type": "application/xml"},
            data=properties,
            timeout=3600,
        )

        if result["success"]:
            self.log.log("✅", self.container_name, f"Bulk loaded {len(file_list)} files")
            bulk_load_success = True
        else:
            error_msg = result.get("error", f"HTTP {result['status_code']}")
            self.log.log("❌", self.container_name, f"Bulk load failed: {error_msg}")
            bulk_load_success = False

    return bulk_load_success

status()

Get Blazegraph status information.

Returns:

Type Description
dict

Dictionary with status information, empty dict if error

Source code in omnigraph/blazegraph.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def status(self) -> dict:
    """
    Get Blazegraph status information.

    Returns:
        Dictionary with status information, empty dict if error
    """
    status_dict = {}

    result = self._make_request("GET", self.config.status_url, timeout=2)

    if result["success"]:
        status_dict["status"] = "ready"
        html_content = result["content"]
        name_value_pattern = r'(?:<span id="(?P<name1>[^"]+)">(?P<value1>[^<]+)</span[^>]*>|&#47;(?P<name2>[^=]+)=(?P<value2>[^\s&#]+))'
        matches = re.finditer(name_value_pattern, html_content, re.DOTALL)

        for match in matches:
            for name_group, value_group in {
                "name1": "value1",
                "name2": "value2",
            }.items():
                name = match.group(name_group)
                if name:
                    value = match.group(value_group)
                    sanitized_value = value.replace("</p", "").replace("&#47;", "/")
                    sanitized_name = name.replace("-", "_").replace("/", "_")
                    sanitized_name = sanitized_name.replace("&#47;", "/")
                    if not sanitized_name.startswith("/"):
                        status_dict[sanitized_name] = sanitized_value
                    break
    else:
        if result.get("error"):
            status_dict["status"] = f"error: {result['error']}"
        else:
            status_dict["status"] = f"status_code: {result['status_code']}"

    return status_dict

test_geosparql()

Test if GeoSPARQL functions work.

Returns:

Type Description
bool

True if GeoSPARQL is available

Source code in omnigraph/blazegraph.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def test_geosparql(self) -> bool:
    """
    Test if GeoSPARQL functions work.

    Returns:
        True if GeoSPARQL is available
    """
    test_query = """
    PREFIX geo: <http://www.opengis.net/ont/geosparql#>
    PREFIX geof: <http://www.opengis.net/def/function/geosparql/>

    SELECT * WHERE {
        BIND(geof:distance("POINT(0 0)"^^geo:wktLiteral, "POINT(1 1)"^^geo:wktLiteral) AS ?dist)
    } LIMIT 1
    """

    result = self._make_request(
        "POST",
        self.sparql_url,
        data={"query": test_query},
        headers={"Accept": "application/sparql-results+json"},
        timeout=10,
    )

    geosparql_available = result["success"]
    return geosparql_available

BlazegraphConfig dataclass

Bases: ServerConfig

Blazegraph configuration

Source code in omnigraph/blazegraph.py
12
13
14
15
16
17
18
19
20
21
22
23
24
@dataclass
class BlazegraphConfig(ServerConfig):
    """
    Blazegraph configuration
    """

    def __post_init__(self):
        super().__post_init__()
        blazegraph_base = f"{self.base_url}/bigdata"
        self.status_url = f"{blazegraph_base}/status"
        self.sparql_url = f"{blazegraph_base}/namespace/kb/sparql"
        self.dataloader_url = f"{blazegraph_base}/dataloader"
        self.docker_run_command = f"docker run -d --name {self.container_name} -p {self.port}:8080 {self.image}"

jena

Created on 2025-05-28

Apache Jena SPARQL support

@author: wf

Jena

Bases: SparqlServer

Dockerized Apache Jena Fuseki SPARQL server

Source code in omnigraph/jena.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class Jena(SparqlServer):
    """
    Dockerized Apache Jena Fuseki SPARQL server
    """

    def __init__(
        self,
        config: ServerConfig,
        log: Log = None,
        shell: Shell = None,
        debug: bool = False,
    ):
        """
        Initialize the Jena Fuseki manager.

        Args:
            config: Jena server configuration
            log: Log instance for logging
            shell: Shell instance for Docker commands
            debug: Enable debug output
        """
        super().__init__(config=config, log=log, shell=shell, debug=debug)
        self.dataset = getattr(config, "dataset", "ds")
        self.update_url = f"{self.config.base_url}/{self.dataset}/update"
        self.upload_url = f"{self.config.base_url}/{self.dataset}/data"

    def status(self) -> dict:
        """
        Get Jena Fuseki status information.

        Returns:
            Dictionary with status information, empty dict if error
        """
        result = self._make_request("GET", self.config.status_url, timeout=2)

        if result["success"]:
            status = {"status": "ready"}
        else:
            error_msg = result.get("error", f"status_code: {result['status_code']}")
            status = {"status": f"error: {error_msg}"}
        return status

__init__(config, log=None, shell=None, debug=False)

Initialize the Jena Fuseki manager.

Parameters:

Name Type Description Default
config ServerConfig

Jena server configuration

required
log Log

Log instance for logging

None
shell Shell

Shell instance for Docker commands

None
debug bool

Enable debug output

False
Source code in omnigraph/jena.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    config: ServerConfig,
    log: Log = None,
    shell: Shell = None,
    debug: bool = False,
):
    """
    Initialize the Jena Fuseki manager.

    Args:
        config: Jena server configuration
        log: Log instance for logging
        shell: Shell instance for Docker commands
        debug: Enable debug output
    """
    super().__init__(config=config, log=log, shell=shell, debug=debug)
    self.dataset = getattr(config, "dataset", "ds")
    self.update_url = f"{self.config.base_url}/{self.dataset}/update"
    self.upload_url = f"{self.config.base_url}/{self.dataset}/data"

status()

Get Jena Fuseki status information.

Returns:

Type Description
dict

Dictionary with status information, empty dict if error

Source code in omnigraph/jena.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def status(self) -> dict:
    """
    Get Jena Fuseki status information.

    Returns:
        Dictionary with status information, empty dict if error
    """
    result = self._make_request("GET", self.config.status_url, timeout=2)

    if result["success"]:
        status = {"status": "ready"}
    else:
        error_msg = result.get("error", f"status_code: {result['status_code']}")
        status = {"status": f"error: {error_msg}"}
    return status

JenaConfig dataclass

Bases: ServerConfig

Jena Fuseki configuration

Source code in omnigraph/jena.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@dataclass
class JenaConfig(ServerConfig):
    """
    Jena Fuseki configuration
    """

    dataset: str = "ds"

    def __post_init__(self):
        super().__post_init__()
        self.status_url = f"{self.base_url}/$/ping"
        self.sparql_url = f"{self.base_url}/{self.dataset}/sparql"
        self.update_url = f"{self.base_url}/{self.dataset}/update"
        self.upload_url = f"{self.base_url}/{self.dataset}/data"
        self.docker_run_command = (
            f"docker run -d --name {self.container_name} -p {self.port}:3030 -e ADMIN_PASSWORD=admin {self.image}"
        )

__init__(self, config, log=None, shell=None, debug=False)

Initialize the Jena Fuseki manager.

Parameters:

Name Type Description Default
config ServerConfig

Jena server configuration

required
log Log

Log instance for logging

None
shell Shell

Shell instance for Docker commands

None
debug bool

Enable debug output

False
Source code in omnigraph/jena.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    config: ServerConfig,
    log: Log = None,
    shell: Shell = None,
    debug: bool = False,
):
    """
    Initialize the Jena Fuseki manager.

    Args:
        config: Jena server configuration
        log: Log instance for logging
        shell: Shell instance for Docker commands
        debug: Enable debug output
    """
    super().__init__(config=config, log=log, shell=shell, debug=debug)
    self.dataset = getattr(config, "dataset", "ds")
    self.update_url = f"{self.config.base_url}/{self.dataset}/update"
    self.upload_url = f"{self.config.base_url}/{self.dataset}/data"

ominigraph_paths

Created on 27.05.2025

@author: wf

OmnigraphPaths

Omnigraph Default Paths

Source code in omnigraph/ominigraph_paths.py
10
11
12
13
14
15
16
17
18
19
20
21
22
class OmnigraphPaths:
    """
    Omnigraph Default Paths
    """

    def __init__(self):
        """
        constructor
        """
        self.home_dir = Path.home()
        self.omnigraph_dir = self.home_dir / ".omnigraph"
        self.omnigraph_dir.mkdir(exist_ok=True)
        self.examples_dir = (Path(__file__).parent / "resources" / "examples").resolve()

__init__()

constructor

Source code in omnigraph/ominigraph_paths.py
15
16
17
18
19
20
21
22
def __init__(self):
    """
    constructor
    """
    self.home_dir = Path.home()
    self.omnigraph_dir = self.home_dir / ".omnigraph"
    self.omnigraph_dir.mkdir(exist_ok=True)
    self.examples_dir = (Path(__file__).parent / "resources" / "examples").resolve()

omnigraph_cmd

Created on 2025-05-28

@author: wf

OmnigraphCmd

Command line interface for omnigraph.

Source code in omnigraph/omnigraph_cmd.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class OmnigraphCmd:
    """
    Command line interface for omnigraph.
    """

    def __init__(self):
        """
        Initialize command line interface.
        """
        self.ogp = OmnigraphPaths()
        self.default_yaml_path = self.ogp.examples_dir / "servers.yaml"
        self.version = Version()
        self.program_version_message = f"{self.version.name} {self.version.version}"
        self.parser = self.getArgParser()

    def getArgParser(self, description: str = None, version_msg=None) -> ArgumentParser:
        """
        Setup command line argument parser

        Args:
            description(str): the description
            version_msg(str): the version message

        Returns:
            ArgumentParser: the argument parser
        """
        if description is None:
            description = self.version.description
        if version_msg is None:
            version_msg = self.program_version_message

        parser = ArgumentParser(
            description=description, formatter_class=RawDescriptionHelpFormatter
        )
        parser.add_argument(
            "-a",
            "--about",
            help="show about info [default: %(default)s]",
            action="store_true",
        )
        parser.add_argument(
            "-c","--config",
            type=str,
            default=str(self.default_yaml_path),
            help="Path to server configuration YAML file [default: %(default)s]"
        )
        parser.add_argument(
            "--cmd",
            nargs="+",
            help="commands to execute on servers: start, stop, status"
        )
        parser.add_argument(
            "-d",
            "--debug",
            action="store_true",
            help="show debug info [default: %(default)s]",
        )
        parser.add_argument(
            "-l",
            "--list-servers",
            action="store_true",
            help="List available servers [default: %(default)s]"
        )
        parser.add_argument(
            "-s","--servers",
            nargs="+",
            default=["blazegraph"],
            help="servers: servers to work with - all is an alias for all servers [default: %(default)s]"
        )
        parser.add_argument(
            "-v",
            "--verbose",
            action="store_true",
            help="show verbose output [default: %(default)s]",
        )
        parser.add_argument("-V", "--version", action="version", version=version_msg)
        return parser

    def getServers(self) -> Dict[str, SparqlServer]:
        """
        Get the servers to work with.

        Returns:
            Dictionary of active servers
        """
        servers = {}
        server_names = self.args.servers
        if "all" in server_names:
            server_names = list(self.all_servers.keys())
        for server_name in server_names:
            server = self.all_servers.get(server_name)
            if server:
                servers[server_name] = server
        return servers

    def run_cmds(self, server: SparqlServer, cmds: List[str]) -> bool:
        """
        Run commands on a server.

        Args:
            server: Server instance
            cmds: List of commands to execute

        Returns:
            True if any commands were handled
        """
        handled=False
        s_cmds = {
            "start": ServerCmd(title=f"start {server.name}", func=server.start),
            "stop": ServerCmd(title=f"stop {server.name}", func=server.stop),
            "status": ServerCmd(title=f"status {server.name}", func=server.status)
        }
        if cmds:
            for cmd in cmds:
                s_cmd=s_cmds.get(cmd)
                if s_cmd:
                    s_cmd.run()
                    handled=True
                else:
                    print(f"unsupported command {cmd}")
        return handled


    def handle_args(self,args:Namespace) -> bool:
        """
        Handle command line arguments.

        Returns:
            bool: True if arguments were handled, False otherwise
            args: Namespace
        """
        self.args=args
        handled = False
        self.all_servers={}
        if Path(self.args.config).exists():
            env=ServerEnv(debug=args.debug,verbose=args.verbose)
            omni_server=OmniServer(env=env)
            self.all_servers = omni_server.servers(args.config)
        else:
            print(f"Config file not found: {args.config}")

        if args.about:
            print(self.program_version_message)
            print(f"{len(self.servers)} servers configured")
            print(f"see {self.version.doc_url}")
            webbrowser.open(self.version.doc_url)
            handled = True

        if self.args.list_servers:
            print("Available servers:")
            for name, server in self.servers.items():
                print(f"  {name}: {server.config.server}")
            handled = True

        self.servers=self.getServers()
        for server in self.servers.values():
            handled=handled or self.run_cmds(server,cmds=args.cmd)

        return handled

__init__()

Initialize command line interface.

Source code in omnigraph/omnigraph_cmd.py
55
56
57
58
59
60
61
62
63
def __init__(self):
    """
    Initialize command line interface.
    """
    self.ogp = OmnigraphPaths()
    self.default_yaml_path = self.ogp.examples_dir / "servers.yaml"
    self.version = Version()
    self.program_version_message = f"{self.version.name} {self.version.version}"
    self.parser = self.getArgParser()

getArgParser(description=None, version_msg=None)

Setup command line argument parser

Parameters:

Name Type Description Default
description(str)

the description

required
version_msg(str)

the version message

required

Returns:

Name Type Description
ArgumentParser ArgumentParser

the argument parser

Source code in omnigraph/omnigraph_cmd.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def getArgParser(self, description: str = None, version_msg=None) -> ArgumentParser:
    """
    Setup command line argument parser

    Args:
        description(str): the description
        version_msg(str): the version message

    Returns:
        ArgumentParser: the argument parser
    """
    if description is None:
        description = self.version.description
    if version_msg is None:
        version_msg = self.program_version_message

    parser = ArgumentParser(
        description=description, formatter_class=RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "-a",
        "--about",
        help="show about info [default: %(default)s]",
        action="store_true",
    )
    parser.add_argument(
        "-c","--config",
        type=str,
        default=str(self.default_yaml_path),
        help="Path to server configuration YAML file [default: %(default)s]"
    )
    parser.add_argument(
        "--cmd",
        nargs="+",
        help="commands to execute on servers: start, stop, status"
    )
    parser.add_argument(
        "-d",
        "--debug",
        action="store_true",
        help="show debug info [default: %(default)s]",
    )
    parser.add_argument(
        "-l",
        "--list-servers",
        action="store_true",
        help="List available servers [default: %(default)s]"
    )
    parser.add_argument(
        "-s","--servers",
        nargs="+",
        default=["blazegraph"],
        help="servers: servers to work with - all is an alias for all servers [default: %(default)s]"
    )
    parser.add_argument(
        "-v",
        "--verbose",
        action="store_true",
        help="show verbose output [default: %(default)s]",
    )
    parser.add_argument("-V", "--version", action="version", version=version_msg)
    return parser

getServers()

Get the servers to work with.

Returns:

Type Description
Dict[str, SparqlServer]

Dictionary of active servers

Source code in omnigraph/omnigraph_cmd.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def getServers(self) -> Dict[str, SparqlServer]:
    """
    Get the servers to work with.

    Returns:
        Dictionary of active servers
    """
    servers = {}
    server_names = self.args.servers
    if "all" in server_names:
        server_names = list(self.all_servers.keys())
    for server_name in server_names:
        server = self.all_servers.get(server_name)
        if server:
            servers[server_name] = server
    return servers

handle_args(args)

Handle command line arguments.

Returns:

Name Type Description
bool bool

True if arguments were handled, False otherwise

args bool

Namespace

Source code in omnigraph/omnigraph_cmd.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def handle_args(self,args:Namespace) -> bool:
    """
    Handle command line arguments.

    Returns:
        bool: True if arguments were handled, False otherwise
        args: Namespace
    """
    self.args=args
    handled = False
    self.all_servers={}
    if Path(self.args.config).exists():
        env=ServerEnv(debug=args.debug,verbose=args.verbose)
        omni_server=OmniServer(env=env)
        self.all_servers = omni_server.servers(args.config)
    else:
        print(f"Config file not found: {args.config}")

    if args.about:
        print(self.program_version_message)
        print(f"{len(self.servers)} servers configured")
        print(f"see {self.version.doc_url}")
        webbrowser.open(self.version.doc_url)
        handled = True

    if self.args.list_servers:
        print("Available servers:")
        for name, server in self.servers.items():
            print(f"  {name}: {server.config.server}")
        handled = True

    self.servers=self.getServers()
    for server in self.servers.values():
        handled=handled or self.run_cmds(server,cmds=args.cmd)

    return handled

run_cmds(server, cmds)

Run commands on a server.

Parameters:

Name Type Description Default
server SparqlServer

Server instance

required
cmds List[str]

List of commands to execute

required

Returns:

Type Description
bool

True if any commands were handled

Source code in omnigraph/omnigraph_cmd.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def run_cmds(self, server: SparqlServer, cmds: List[str]) -> bool:
    """
    Run commands on a server.

    Args:
        server: Server instance
        cmds: List of commands to execute

    Returns:
        True if any commands were handled
    """
    handled=False
    s_cmds = {
        "start": ServerCmd(title=f"start {server.name}", func=server.start),
        "stop": ServerCmd(title=f"stop {server.name}", func=server.stop),
        "status": ServerCmd(title=f"status {server.name}", func=server.status)
    }
    if cmds:
        for cmd in cmds:
            s_cmd=s_cmds.get(cmd)
            if s_cmd:
                s_cmd.run()
                handled=True
            else:
                print(f"unsupported command {cmd}")
    return handled

ServerCmd dataclass

Command wrapper for server operations.

Source code in omnigraph/omnigraph_cmd.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@dataclass
class ServerCmd:
    """
    Command wrapper for server operations.
    """

    def __init__(self, title: str, func: Callable):
        """
        Initialize server command.

        Args:
            title: Description of the command
            func: Function to execute
        """
        self.title = title
        self.func = func

    def run(self, verbose: bool = True) -> any:
        """
        Execute the server command.

        Args:
            verbose: Whether to print result

        Returns:
            Result from function execution
        """
        result = self.func()
        if verbose:
            print(f"{self.title}: {result}")
        return result

__init__(title, func)

Initialize server command.

Parameters:

Name Type Description Default
title str

Description of the command

required
func Callable

Function to execute

required
Source code in omnigraph/omnigraph_cmd.py
24
25
26
27
28
29
30
31
32
33
def __init__(self, title: str, func: Callable):
    """
    Initialize server command.

    Args:
        title: Description of the command
        func: Function to execute
    """
    self.title = title
    self.func = func

run(verbose=True)

Execute the server command.

Parameters:

Name Type Description Default
verbose bool

Whether to print result

True

Returns:

Type Description
any

Result from function execution

Source code in omnigraph/omnigraph_cmd.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def run(self, verbose: bool = True) -> any:
    """
    Execute the server command.

    Args:
        verbose: Whether to print result

    Returns:
        Result from function execution
    """
    result = self.func()
    if verbose:
        print(f"{self.title}: {result}")
    return result

main()

Main entry point for command line interface.

Source code in omnigraph/omnigraph_cmd.py
210
211
212
213
214
215
216
def main():
    """
    Main entry point for command line interface.
    """
    cmd = OmnigraphCmd()
    args = cmd.parser.parse_args()
    cmd.handle_args(args)

omniserver

Created on 2025-05-28

@author: wf

OmniServer

Factory class for creating and managing SPARQL server instances.

Source code in omnigraph/omniserver.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class OmniServer:
    """
    Factory class for creating and managing SPARQL server instances.
    """

    def __init__(self,env:ServerEnv):
        """
        constructor
        """
        self.env=env

    def server4Config(self, config: ServerConfig) -> SparqlServer:
        """
        Create a SparqlServer instance based on server type in config.

        Args:
            config: ServerConfig with server type and settings

        Returns:
            SparqlServer instance of appropriate type
        """
        server_instance = None
        config_dict = asdict(config)

        if config.server == "blazegraph":
            blazegraph_config = BlazegraphConfig(**config_dict)
            server_instance = Blazegraph(config=blazegraph_config,env=self.env)
        elif config.server == "qlever":
            qlever_config = QLeverConfig(**config_dict)
            server_instance = QLever(config=qlever_config,env=self.env)
        elif config.server == "jena":
            jena_config = JenaConfig(**config_dict)
            server_instance = Jena(config=jena_config,env=self.env)

        return server_instance

    def servers(self, yaml_path: Path) -> Dict[str, SparqlServer]:
        """
        Load active servers from YAML configuration.

        Args:
            yaml_path: Path to YAML configuration file

        Returns:
            Dictionary mapping server names to SparqlServer instances
        """
        server_configs = ServerConfigs.ofYaml(yaml_path)
        servers_dict = {}

        for server_name, config in server_configs.servers.items():
            if config.active:
                server_instance = self.server4Config(config)
                if server_instance:
                    servers_dict[server_name] = server_instance

        return servers_dict

__init__(env)

constructor

Source code in omnigraph/omniserver.py
22
23
24
25
26
def __init__(self,env:ServerEnv):
    """
    constructor
    """
    self.env=env

server4Config(config)

Create a SparqlServer instance based on server type in config.

Parameters:

Name Type Description Default
config ServerConfig

ServerConfig with server type and settings

required

Returns:

Type Description
SparqlServer

SparqlServer instance of appropriate type

Source code in omnigraph/omniserver.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def server4Config(self, config: ServerConfig) -> SparqlServer:
    """
    Create a SparqlServer instance based on server type in config.

    Args:
        config: ServerConfig with server type and settings

    Returns:
        SparqlServer instance of appropriate type
    """
    server_instance = None
    config_dict = asdict(config)

    if config.server == "blazegraph":
        blazegraph_config = BlazegraphConfig(**config_dict)
        server_instance = Blazegraph(config=blazegraph_config,env=self.env)
    elif config.server == "qlever":
        qlever_config = QLeverConfig(**config_dict)
        server_instance = QLever(config=qlever_config,env=self.env)
    elif config.server == "jena":
        jena_config = JenaConfig(**config_dict)
        server_instance = Jena(config=jena_config,env=self.env)

    return server_instance

servers(yaml_path)

Load active servers from YAML configuration.

Parameters:

Name Type Description Default
yaml_path Path

Path to YAML configuration file

required

Returns:

Type Description
Dict[str, SparqlServer]

Dictionary mapping server names to SparqlServer instances

Source code in omnigraph/omniserver.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def servers(self, yaml_path: Path) -> Dict[str, SparqlServer]:
    """
    Load active servers from YAML configuration.

    Args:
        yaml_path: Path to YAML configuration file

    Returns:
        Dictionary mapping server names to SparqlServer instances
    """
    server_configs = ServerConfigs.ofYaml(yaml_path)
    servers_dict = {}

    for server_name, config in server_configs.servers.items():
        if config.active:
            server_instance = self.server4Config(config)
            if server_instance:
                servers_dict[server_name] = server_instance

    return servers_dict

persistent_log

This is redundant copy to avoid dependeny hell the original is at https://github.com/WolfgangFahl/nicegui_widgets/blob/main/ngwidgets/persistent_log.py

Created on 2024-10-04

@author: wf

Log

Wrapper for persistent logging.

Source code in omnigraph/persistent_log.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@lod_storable
class Log:
    """
    Wrapper for persistent logging.
    """

    entries: List[LogEntry] = field(default_factory=list)

    def color_msg(self, color, msg):
        """Display a colored message"""
        print(f"{color}{msg}{END_COLOR}")

    def __post_init__(self):
        """
        Initializes the log with level mappings and updates the level counts.
        """
        self.do_log = True
        self.do_print = False
        self.levels = {"❌": logging.ERROR, "⚠️": logging.WARNING, "✅": logging.INFO}
        self.level_names = {
            logging.ERROR: "error",
            logging.WARNING: "warn",
            logging.INFO: "info",
        }
        self.update_level_counts()

    def clear(self):
        """
        Clears all log entries.
        """
        self.entries = []
        self.update_level_counts()

    def update_level_counts(self):
        """
        Updates the counts for each log level based on the existing entries.
        """
        self.level_counts = {"error": Counter(), "warn": Counter(), "info": Counter()}
        for entry in self.entries:
            counter = self.get_counter(entry.level_name)
            if counter is not None:
                counter[entry.kind] += 1

    def get_counter(self, level: str) -> Counter:
        """
        Returns the counter for the specified log level.
        """
        return self.level_counts.get(level)

    def get_level_summary(self, level: str, limit: int = 7) -> Tuple[int, str]:
        """
        Get a summary of the most common counts for the specified log level.

        Args:
            level (str): The log level name ('error', 'warn', 'info').
            limit (int): The maximum number of most common entries to include in the summary (default is 7).

        Returns:
            Tuple[int, str]: A tuple containing the count of log entries and a summary message.
        """
        counter = self.get_counter(level)
        if counter:
            count = sum(counter.values())
            most_common_entries = dict(counter.most_common(limit))  # Get the top 'limit' entries
            summary_msg = f"{level.capitalize()} entries: {most_common_entries}"
            return count, summary_msg
        return 0, f"No entries found for level: {level}"

    def log(self, icon: str, kind: str, msg: str):
        """
        Log a message with the specified icon and kind.

        Args:
            icon (str): The icon representing the log level ('❌', '⚠️', '✅').
            kind (str): The category or type of the log message.
            msg (str): The log message to record.
        """
        level = self.levels.get(icon, logging.INFO)
        level_name = self.level_names[level]
        icon_msg = f"{icon}:{msg}"
        log_entry = LogEntry(msg=icon_msg, level_name=level_name, kind=kind)
        self.entries.append(log_entry)

        # Update level counts
        self.level_counts[level_name][kind] += 1

        if self.do_log:
            logging.log(level, icon_msg)
        if self.do_print:
            print(icon_msg)

__post_init__()

Initializes the log with level mappings and updates the level counts.

Source code in omnigraph/persistent_log.py
56
57
58
59
60
61
62
63
64
65
66
67
68
def __post_init__(self):
    """
    Initializes the log with level mappings and updates the level counts.
    """
    self.do_log = True
    self.do_print = False
    self.levels = {"❌": logging.ERROR, "⚠️": logging.WARNING, "✅": logging.INFO}
    self.level_names = {
        logging.ERROR: "error",
        logging.WARNING: "warn",
        logging.INFO: "info",
    }
    self.update_level_counts()

clear()

Clears all log entries.

Source code in omnigraph/persistent_log.py
70
71
72
73
74
75
def clear(self):
    """
    Clears all log entries.
    """
    self.entries = []
    self.update_level_counts()

color_msg(color, msg)

Display a colored message

Source code in omnigraph/persistent_log.py
52
53
54
def color_msg(self, color, msg):
    """Display a colored message"""
    print(f"{color}{msg}{END_COLOR}")

get_counter(level)

Returns the counter for the specified log level.

Source code in omnigraph/persistent_log.py
87
88
89
90
91
def get_counter(self, level: str) -> Counter:
    """
    Returns the counter for the specified log level.
    """
    return self.level_counts.get(level)

get_level_summary(level, limit=7)

Get a summary of the most common counts for the specified log level.

Parameters:

Name Type Description Default
level str

The log level name ('error', 'warn', 'info').

required
limit int

The maximum number of most common entries to include in the summary (default is 7).

7

Returns:

Type Description
Tuple[int, str]

Tuple[int, str]: A tuple containing the count of log entries and a summary message.

Source code in omnigraph/persistent_log.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def get_level_summary(self, level: str, limit: int = 7) -> Tuple[int, str]:
    """
    Get a summary of the most common counts for the specified log level.

    Args:
        level (str): The log level name ('error', 'warn', 'info').
        limit (int): The maximum number of most common entries to include in the summary (default is 7).

    Returns:
        Tuple[int, str]: A tuple containing the count of log entries and a summary message.
    """
    counter = self.get_counter(level)
    if counter:
        count = sum(counter.values())
        most_common_entries = dict(counter.most_common(limit))  # Get the top 'limit' entries
        summary_msg = f"{level.capitalize()} entries: {most_common_entries}"
        return count, summary_msg
    return 0, f"No entries found for level: {level}"

log(icon, kind, msg)

Log a message with the specified icon and kind.

Parameters:

Name Type Description Default
icon str

The icon representing the log level ('❌', '⚠️', '✅').

required
kind str

The category or type of the log message.

required
msg str

The log message to record.

required
Source code in omnigraph/persistent_log.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def log(self, icon: str, kind: str, msg: str):
    """
    Log a message with the specified icon and kind.

    Args:
        icon (str): The icon representing the log level ('❌', '⚠️', '✅').
        kind (str): The category or type of the log message.
        msg (str): The log message to record.
    """
    level = self.levels.get(icon, logging.INFO)
    level_name = self.level_names[level]
    icon_msg = f"{icon}:{msg}"
    log_entry = LogEntry(msg=icon_msg, level_name=level_name, kind=kind)
    self.entries.append(log_entry)

    # Update level counts
    self.level_counts[level_name][kind] += 1

    if self.do_log:
        logging.log(level, icon_msg)
    if self.do_print:
        print(icon_msg)

update_level_counts()

Updates the counts for each log level based on the existing entries.

Source code in omnigraph/persistent_log.py
77
78
79
80
81
82
83
84
85
def update_level_counts(self):
    """
    Updates the counts for each log level based on the existing entries.
    """
    self.level_counts = {"error": Counter(), "warn": Counter(), "info": Counter()}
    for entry in self.entries:
        counter = self.get_counter(entry.level_name)
        if counter is not None:
            counter[entry.kind] += 1

LogEntry

Represents a log entry with a message, kind, and log level name.

Source code in omnigraph/persistent_log.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@lod_storable
class LogEntry:
    """
    Represents a log entry with a message, kind, and log level name.
    """

    msg: str
    kind: str
    level_name: str
    timestamp: Optional[str] = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now().isoformat()

qlever

Created on 2025-05-28

@author: wf

QLever

Bases: SparqlServer

Dockerized QLever SPARQL server

Source code in omnigraph/qlever.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class QLever(SparqlServer):
    """
    Dockerized QLever SPARQL server
    """

    def __init__(
        self,
        container_name: str = "qlever",
        image: str = "adfreiburg/qlever",
        data_dir: str = None,
        dataset: str = "olympics",
        port: int = 7001,
        log: Log = None,
        shell: Shell = None,
        debug: bool = False,
    ):
        """
        Initialize the QLever manager.

        Args:
            container_name: Docker container name
            image: Docker image to use
            port: Port for QLever web interface
            data_dir: where to keep the data
            dataset(str): a default dataset to be loaded
            log: Log instance for logging
            shell: Shell instance for Docker commands
            debug: Enable debug output
        """
        if not data_dir:
            raise ValueError("Data directory needs to be specified")
        self.data_dir = data_dir
        self.dataset = dataset
        super().__init__(container_name, image, port, log, shell, debug)

    def start(self, show_progress: bool = True) -> bool:
        """
        Start QLever using proper workflow.
        """
        # Use base class start to get container running
        started = super().start(show_progress=show_progress)
        if started and self.dataset:
            # Run QLever setup workflow
            setup_cmd = f"docker exec {self.container_name} qlever setup-config {self.dataset}"
            self._run_shell_command(setup_cmd)

            get_data_cmd = f"docker exec {self.container_name} qlever get-data"
            self._run_shell_command(get_data_cmd)

            index_cmd = f"docker exec {self.container_name} qlever index"
            self._run_shell_command(index_cmd)

            start_cmd = f"docker exec {self.container_name} qlever start"
            self._run_shell_command(start_cmd)

        return started

    def status(self) -> dict:
        """
        Get QLever status information.

        Returns:
            Dictionary with status information, empty dict if error
        """
        status_dict = {}
        result = self._make_request("GET", self.status_url, timeout=2)

        if result["success"]:
            status_dict["status"] = "ready"
            try:
                import json

                status_data = json.loads(result["content"])
                status_dict.update(status_data)
            except json.JSONDecodeError:
                status_dict["raw_content"] = result["content"]
        else:
            if result.get("error"):
                status_dict["status"] = f"error: {result['error']}"
            else:
                status_dict["status"] = f"status_code: {result['status_code']}"

        return status_dict

    def load_file(self, filepath: str) -> bool:
        """
        Load a single RDF file into QLever.

        Args:
            filepath: Path to RDF file

        Returns:
            True if loaded successfully
        """
        load_success = False
        try:
            with open(filepath, "rb") as f:
                result = self._make_request(
                    "POST",
                    f"{self.base_url}/api/upload",
                    files={"file": f},
                    timeout=300,
                )

            if result["success"]:
                self.log.log("✅", self.container_name, f"Loaded {filepath}")
                load_success = True
            else:
                error_msg = result.get("error", f"HTTP {result['status_code']}")
                self.log.log("❌", self.container_name, f"Failed to load {filepath}: {error_msg}")
                load_success = False

        except Exception as e:
            self.log.log("❌", self.container_name, f"Exception loading {filepath}: {e}")
            load_success = False

        return load_success

    def load_dump_files(self, file_pattern: str = "dump_*.ttl", use_bulk: bool = True) -> int:
        """
        Load all dump files matching pattern.

        Args:
            file_pattern: Glob pattern for dump files
            use_bulk: Use bulk loader if True, individual files if False

        Returns:
            Number of files loaded successfully
        """
        files = sorted(glob.glob(file_pattern))
        loaded_count = 0

        if not files:
            self.log.log(
                "⚠️",
                self.container_name,
                f"No files found matching pattern: {file_pattern}",
            )
            loaded_count = 0
        else:
            self.log.log("✅", self.container_name, f"Found {len(files)} files to load")

            # QLever typically loads files individually
            loaded_count = 0
            for filepath in tqdm(files, desc="Loading files"):
                file_result = self.load_file(filepath)
                if file_result:
                    loaded_count += 1
                else:
                    self.log.log("❌", self.container_name, f"Failed to load: {filepath}")

        return loaded_count

__init__(container_name='qlever', image='adfreiburg/qlever', data_dir=None, dataset='olympics', port=7001, log=None, shell=None, debug=False)

Initialize the QLever manager.

Parameters:

Name Type Description Default
container_name str

Docker container name

'qlever'
image str

Docker image to use

'adfreiburg/qlever'
port int

Port for QLever web interface

7001
data_dir str

where to keep the data

None
dataset(str)

a default dataset to be loaded

required
log Log

Log instance for logging

None
shell Shell

Shell instance for Docker commands

None
debug bool

Enable debug output

False
Source code in omnigraph/qlever.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    container_name: str = "qlever",
    image: str = "adfreiburg/qlever",
    data_dir: str = None,
    dataset: str = "olympics",
    port: int = 7001,
    log: Log = None,
    shell: Shell = None,
    debug: bool = False,
):
    """
    Initialize the QLever manager.

    Args:
        container_name: Docker container name
        image: Docker image to use
        port: Port for QLever web interface
        data_dir: where to keep the data
        dataset(str): a default dataset to be loaded
        log: Log instance for logging
        shell: Shell instance for Docker commands
        debug: Enable debug output
    """
    if not data_dir:
        raise ValueError("Data directory needs to be specified")
    self.data_dir = data_dir
    self.dataset = dataset
    super().__init__(container_name, image, port, log, shell, debug)

load_dump_files(file_pattern='dump_*.ttl', use_bulk=True)

Load all dump files matching pattern.

Parameters:

Name Type Description Default
file_pattern str

Glob pattern for dump files

'dump_*.ttl'
use_bulk bool

Use bulk loader if True, individual files if False

True

Returns:

Type Description
int

Number of files loaded successfully

Source code in omnigraph/qlever.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def load_dump_files(self, file_pattern: str = "dump_*.ttl", use_bulk: bool = True) -> int:
    """
    Load all dump files matching pattern.

    Args:
        file_pattern: Glob pattern for dump files
        use_bulk: Use bulk loader if True, individual files if False

    Returns:
        Number of files loaded successfully
    """
    files = sorted(glob.glob(file_pattern))
    loaded_count = 0

    if not files:
        self.log.log(
            "⚠️",
            self.container_name,
            f"No files found matching pattern: {file_pattern}",
        )
        loaded_count = 0
    else:
        self.log.log("✅", self.container_name, f"Found {len(files)} files to load")

        # QLever typically loads files individually
        loaded_count = 0
        for filepath in tqdm(files, desc="Loading files"):
            file_result = self.load_file(filepath)
            if file_result:
                loaded_count += 1
            else:
                self.log.log("❌", self.container_name, f"Failed to load: {filepath}")

    return loaded_count

load_file(filepath)

Load a single RDF file into QLever.

Parameters:

Name Type Description Default
filepath str

Path to RDF file

required

Returns:

Type Description
bool

True if loaded successfully

Source code in omnigraph/qlever.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def load_file(self, filepath: str) -> bool:
    """
    Load a single RDF file into QLever.

    Args:
        filepath: Path to RDF file

    Returns:
        True if loaded successfully
    """
    load_success = False
    try:
        with open(filepath, "rb") as f:
            result = self._make_request(
                "POST",
                f"{self.base_url}/api/upload",
                files={"file": f},
                timeout=300,
            )

        if result["success"]:
            self.log.log("✅", self.container_name, f"Loaded {filepath}")
            load_success = True
        else:
            error_msg = result.get("error", f"HTTP {result['status_code']}")
            self.log.log("❌", self.container_name, f"Failed to load {filepath}: {error_msg}")
            load_success = False

    except Exception as e:
        self.log.log("❌", self.container_name, f"Exception loading {filepath}: {e}")
        load_success = False

    return load_success

start(show_progress=True)

Start QLever using proper workflow.

Source code in omnigraph/qlever.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def start(self, show_progress: bool = True) -> bool:
    """
    Start QLever using proper workflow.
    """
    # Use base class start to get container running
    started = super().start(show_progress=show_progress)
    if started and self.dataset:
        # Run QLever setup workflow
        setup_cmd = f"docker exec {self.container_name} qlever setup-config {self.dataset}"
        self._run_shell_command(setup_cmd)

        get_data_cmd = f"docker exec {self.container_name} qlever get-data"
        self._run_shell_command(get_data_cmd)

        index_cmd = f"docker exec {self.container_name} qlever index"
        self._run_shell_command(index_cmd)

        start_cmd = f"docker exec {self.container_name} qlever start"
        self._run_shell_command(start_cmd)

    return started

status()

Get QLever status information.

Returns:

Type Description
dict

Dictionary with status information, empty dict if error

Source code in omnigraph/qlever.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def status(self) -> dict:
    """
    Get QLever status information.

    Returns:
        Dictionary with status information, empty dict if error
    """
    status_dict = {}
    result = self._make_request("GET", self.status_url, timeout=2)

    if result["success"]:
        status_dict["status"] = "ready"
        try:
            import json

            status_data = json.loads(result["content"])
            status_dict.update(status_data)
        except json.JSONDecodeError:
            status_dict["raw_content"] = result["content"]
    else:
        if result.get("error"):
            status_dict["status"] = f"error: {result['error']}"
        else:
            status_dict["status"] = f"status_code: {result['status_code']}"

    return status_dict

shell

This is redundant copy to avoid dependeny hell the original is at https://github.com/WolfgangFahl/nicegui_widgets/blob/main/ngwidgets/shell.py

Created on 2025-05-14

@author: wf

Shell

Runs commands with environment from profile

Source code in omnigraph/shell.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class Shell:
    """
    Runs commands with environment from profile
    """

    def __init__(self, profile=None, shell_path: str = None):
        """
        Initialize shell with optional profile

        Args:
            profile: Path to profile file to source e.g. ~/.zprofile
            shell_path: the shell_path e.g. /bin/zsh
        """
        self.profile = profile
        self.shell_path = shell_path
        if self.shell_path is None:
            self.shell_path = os.environ.get("SHELL", "/bin/bash")
        self.shell_name = os.path.basename(self.shell_path)
        if self.profile is None:
            self.profile = self.find_profile()

    def find_profile(self) -> str:
        """
        Find the appropriate profile file for the current shell

        Searches for the profile file corresponding to the shell_name
        in the user's home directory.

        Returns:
            str: Path to the profile file or None if not found
        """
        profile = None
        home = os.path.expanduser("~")
        # Try common profile files
        profiles = {"zsh": ".zprofile", "bash": ".bash_profile", "sh": ".profile"}
        if self.shell_name in profiles:
            profile_name = profiles[self.shell_name]
            path = os.path.join(home, profile_name)
            if os.path.exists(path):
                profile = path
        return profile

    @classmethod
    def ofArgs(cls, args):
        """
        Create Shell from command line args

        Args:
            args: Arguments with optional profile

        Returns:
            Shell: Configured Shell
        """
        # Use explicit profile or detect
        profile = getattr(args, "profile", None)
        shell = cls(profile=profile)
        return shell

    def run(self, cmd, text=True, debug=False, tee=False) -> subprocess.CompletedProcess:
        """
        Run command with profile, always capturing output and optionally teeing it.

        Args:
            cmd: Command to run
            text: Text mode for subprocess I/O
            debug: Print the command to be run
            tee: If True, also print output live while capturing

        Returns:
            subprocess.CompletedProcess
        """
        shell_cmd = f"source {self.profile} && {cmd}" if self.profile else cmd

        if debug:
            print(f"Running: {shell_cmd}")

        popen_process = subprocess.Popen(
            [self.shell_path, "-c", shell_cmd],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=text,
        )

        std_tee = StdTee.run(popen_process, tee=tee)
        returncode = popen_process.wait()

        process = subprocess.CompletedProcess(
            args=popen_process.args,
            returncode=returncode,
            stdout=std_tee.stdout_buffer.getvalue(),
            stderr=std_tee.stderr_buffer.getvalue(),
        )

        if process.returncode != 0:
            if debug:
                msg = f"""{process.args} failed:
  returncode: {process.returncode}
  stdout    : {process.stdout.strip()}
  stderr    : {process.stderr.strip()}
"""
                print(msg, file=sys.stderr)
            pass

        return process

    def proc_stats(
        self,
        title: str,
        procs: Dict[Path, subprocess.CompletedProcess],
        ignores: List[str] = [],
    ):
        """
        Show process statistics with checkmark/crossmark and success/failure summary.

        Args:
            title (str): A short title to label the output section.
            procs (Dict[Path, subprocess.CompletedProcess]): Mapping of input files to their process results.
            ignores (List[str], optional): List of substrings. If any is found in stderr, the error is ignored.
        """
        total = len(procs)
        failures = 0
        print(f"\n{total} {title}:")
        for idx, (path, result) in enumerate(procs.items(), start=1):
            stderr = result.stderr or ""
            stdout = result.stdout or ""
            ignored = any(ignore in stderr for ignore in ignores)
            has_error = (stderr and not ignored) or ("Error" in stdout)
            if has_error:
                symbol = "❌"
                failures += 1
            else:
                symbol = "✅"
            print(f"{symbol} {idx}/{total}: {path.name}")
        percent_ok = ((total - failures) / total) * 100 if total > 0 else 0
        print(f"\n{total - failures}/{total} ({percent_ok:.1f}%), ❌ {failures}/{total} ({100 - percent_ok:.1f}%)")

__init__(profile=None, shell_path=None)

Initialize shell with optional profile

Parameters:

Name Type Description Default
profile

Path to profile file to source e.g. ~/.zprofile

None
shell_path str

the shell_path e.g. /bin/zsh

None
Source code in omnigraph/shell.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def __init__(self, profile=None, shell_path: str = None):
    """
    Initialize shell with optional profile

    Args:
        profile: Path to profile file to source e.g. ~/.zprofile
        shell_path: the shell_path e.g. /bin/zsh
    """
    self.profile = profile
    self.shell_path = shell_path
    if self.shell_path is None:
        self.shell_path = os.environ.get("SHELL", "/bin/bash")
    self.shell_name = os.path.basename(self.shell_path)
    if self.profile is None:
        self.profile = self.find_profile()

find_profile()

Find the appropriate profile file for the current shell

Searches for the profile file corresponding to the shell_name in the user's home directory.

Returns:

Name Type Description
str str

Path to the profile file or None if not found

Source code in omnigraph/shell.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def find_profile(self) -> str:
    """
    Find the appropriate profile file for the current shell

    Searches for the profile file corresponding to the shell_name
    in the user's home directory.

    Returns:
        str: Path to the profile file or None if not found
    """
    profile = None
    home = os.path.expanduser("~")
    # Try common profile files
    profiles = {"zsh": ".zprofile", "bash": ".bash_profile", "sh": ".profile"}
    if self.shell_name in profiles:
        profile_name = profiles[self.shell_name]
        path = os.path.join(home, profile_name)
        if os.path.exists(path):
            profile = path
    return profile

ofArgs(args) classmethod

Create Shell from command line args

Parameters:

Name Type Description Default
args

Arguments with optional profile

required

Returns:

Name Type Description
Shell

Configured Shell

Source code in omnigraph/shell.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@classmethod
def ofArgs(cls, args):
    """
    Create Shell from command line args

    Args:
        args: Arguments with optional profile

    Returns:
        Shell: Configured Shell
    """
    # Use explicit profile or detect
    profile = getattr(args, "profile", None)
    shell = cls(profile=profile)
    return shell

proc_stats(title, procs, ignores=[])

Show process statistics with checkmark/crossmark and success/failure summary.

Parameters:

Name Type Description Default
title str

A short title to label the output section.

required
procs Dict[Path, CompletedProcess]

Mapping of input files to their process results.

required
ignores List[str]

List of substrings. If any is found in stderr, the error is ignored.

[]
Source code in omnigraph/shell.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def proc_stats(
    self,
    title: str,
    procs: Dict[Path, subprocess.CompletedProcess],
    ignores: List[str] = [],
):
    """
    Show process statistics with checkmark/crossmark and success/failure summary.

    Args:
        title (str): A short title to label the output section.
        procs (Dict[Path, subprocess.CompletedProcess]): Mapping of input files to their process results.
        ignores (List[str], optional): List of substrings. If any is found in stderr, the error is ignored.
    """
    total = len(procs)
    failures = 0
    print(f"\n{total} {title}:")
    for idx, (path, result) in enumerate(procs.items(), start=1):
        stderr = result.stderr or ""
        stdout = result.stdout or ""
        ignored = any(ignore in stderr for ignore in ignores)
        has_error = (stderr and not ignored) or ("Error" in stdout)
        if has_error:
            symbol = "❌"
            failures += 1
        else:
            symbol = "✅"
        print(f"{symbol} {idx}/{total}: {path.name}")
    percent_ok = ((total - failures) / total) * 100 if total > 0 else 0
    print(f"\n{total - failures}/{total} ({percent_ok:.1f}%), ❌ {failures}/{total} ({100 - percent_ok:.1f}%)")

run(cmd, text=True, debug=False, tee=False)

Run command with profile, always capturing output and optionally teeing it.

Parameters:

Name Type Description Default
cmd

Command to run

required
text

Text mode for subprocess I/O

True
debug

Print the command to be run

False
tee

If True, also print output live while capturing

False

Returns:

Type Description
CompletedProcess

subprocess.CompletedProcess

Source code in omnigraph/shell.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
    def run(self, cmd, text=True, debug=False, tee=False) -> subprocess.CompletedProcess:
        """
        Run command with profile, always capturing output and optionally teeing it.

        Args:
            cmd: Command to run
            text: Text mode for subprocess I/O
            debug: Print the command to be run
            tee: If True, also print output live while capturing

        Returns:
            subprocess.CompletedProcess
        """
        shell_cmd = f"source {self.profile} && {cmd}" if self.profile else cmd

        if debug:
            print(f"Running: {shell_cmd}")

        popen_process = subprocess.Popen(
            [self.shell_path, "-c", shell_cmd],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=text,
        )

        std_tee = StdTee.run(popen_process, tee=tee)
        returncode = popen_process.wait()

        process = subprocess.CompletedProcess(
            args=popen_process.args,
            returncode=returncode,
            stdout=std_tee.stdout_buffer.getvalue(),
            stderr=std_tee.stderr_buffer.getvalue(),
        )

        if process.returncode != 0:
            if debug:
                msg = f"""{process.args} failed:
  returncode: {process.returncode}
  stdout    : {process.stdout.strip()}
  stderr    : {process.stderr.strip()}
"""
                print(msg, file=sys.stderr)
            pass

        return process

StdTee

Manages teeing for both stdout and stderr using StreamTee instances. Captures output in instance variables.

Source code in omnigraph/shell.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class StdTee:
    """
    Manages teeing for both stdout and stderr using StreamTee instances.
    Captures output in instance variables.
    """

    def __init__(self, process, tee=True):
        self.stdout_buffer = io.StringIO()
        self.stderr_buffer = io.StringIO()
        self.out_tee = StreamTee(process.stdout, sys.stdout, self.stdout_buffer, tee)
        self.err_tee = StreamTee(process.stderr, sys.stderr, self.stderr_buffer, tee)

    def start(self):
        self.out_tee.start()
        self.err_tee.start()

    def join(self):
        self.out_tee.join()
        self.err_tee.join()

    @classmethod
    def run(cls, process, tee=True):
        """
        Run teeing and capture for the given process.
        Returns a StdTee instance with stdout/stderr captured.
        """
        std_tee = cls(process, tee=tee)
        std_tee.start()
        std_tee.join()
        return std_tee

run(process, tee=True) classmethod

Run teeing and capture for the given process. Returns a StdTee instance with stdout/stderr captured.

Source code in omnigraph/shell.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
@classmethod
def run(cls, process, tee=True):
    """
    Run teeing and capture for the given process.
    Returns a StdTee instance with stdout/stderr captured.
    """
    std_tee = cls(process, tee=tee)
    std_tee.start()
    std_tee.join()
    return std_tee

StreamTee

Tees a single input stream to both a mirror and a capture buffer.

Source code in omnigraph/shell.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class StreamTee:
    """
    Tees a single input stream to both a mirror and a capture buffer.
    """

    def __init__(self, source, mirror, buffer, tee=True):
        self.source = source
        self.mirror = mirror
        self.buffer = buffer
        self.tee = tee
        self.thread = threading.Thread(target=self._run, daemon=True)

    def _run(self):
        for line in iter(self.source.readline, ""):
            if self.tee:
                self.mirror.write(line)
                self.mirror.flush()
            self.buffer.write(line)
        self.source.close()

    def start(self):
        self.thread.start()

    def join(self):
        self.thread.join()

SysTee

Tee sys.stdout and sys.stderr to a logfile while preserving original output.

Source code in omnigraph/shell.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class SysTee:
    """
    Tee sys.stdout and sys.stderr to a logfile while preserving original output.
    """

    def __init__(self, log_path: str):
        self.logfile = open(log_path, "a")
        self.original_stdout = sys.stdout
        self.original_stderr = sys.stderr
        sys.stdout = self
        sys.stderr = self

    def write(self, data):
        self.original_stdout.write(data)
        self.logfile.write(data)

    def flush(self):
        self.original_stdout.flush()
        self.logfile.flush()

    def close(self):
        sys.stdout = self.original_stdout
        sys.stderr = self.original_stderr
        self.logfile.close()

sparql_server

Created on 2025-05-27

@author: wf

ServerConfigs

Collection of server configurations loaded from YAML.

Source code in omnigraph/sparql_server.py
67
68
69
70
71
72
73
74
75
76
77
@lod_storable
class ServerConfigs:
    """Collection of server configurations loaded from YAML."""

    servers: Dict[str, ServerConfig] = field(default_factory=dict)

    @classmethod
    def ofYaml(cls, yaml_path: str) -> "ServerConfigs":
        """Load server configurations from YAML file."""
        server_configs = cls.load_from_yaml_file(yaml_path)
        return server_configs

ofYaml(yaml_path) classmethod

Load server configurations from YAML file.

Source code in omnigraph/sparql_server.py
73
74
75
76
77
@classmethod
def ofYaml(cls, yaml_path: str) -> "ServerConfigs":
    """Load server configurations from YAML file."""
    server_configs = cls.load_from_yaml_file(yaml_path)
    return server_configs

ServerEnv

Server environment configuration.

Source code in omnigraph/sparql_server.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ServerEnv:
    """
    Server environment configuration.
    """

    def __init__(self, log: Log = None, shell: Shell = None, debug: bool = False, verbose: bool = False):
        """
        Initialize server environment.

        Args:
            log: Log instance for logging
            shell: Shell instance for command execution
            debug: Enable debug mode
            verbose: Enable verbose output
        """
        if log is None:
            log=Log()
        self.log = log
        if shell is None:
            shell=Shell()
        self.shell = shell
        self.debug = debug
        self.verbose = verbose

__init__(log=None, shell=None, debug=False, verbose=False)

Initialize server environment.

Parameters:

Name Type Description Default
log Log

Log instance for logging

None
shell Shell

Shell instance for command execution

None
debug bool

Enable debug mode

False
verbose bool

Enable verbose output

False
Source code in omnigraph/sparql_server.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, log: Log = None, shell: Shell = None, debug: bool = False, verbose: bool = False):
    """
    Initialize server environment.

    Args:
        log: Log instance for logging
        shell: Shell instance for command execution
        debug: Enable debug mode
        verbose: Enable verbose output
    """
    if log is None:
        log=Log()
    self.log = log
    if shell is None:
        shell=Shell()
    self.shell = shell
    self.debug = debug
    self.verbose = verbose

SparqlServer

Base class for dockerized SPARQL servers

Source code in omnigraph/sparql_server.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
class SparqlServer:
    """
    Base class for dockerized SPARQL servers
    """

    def __init__(
        self,
        config: ServerConfig,
        env:ServerEnv
    ):
        """
        Initialize the SPARQL server manager.

        """
        self.log = env.log
        self.config = config
        self.name = self.config.name
        self.debug = env.debug
        self.verbose=env.verbose
        self.shell = env.shell

        # Subclasses must set these URLs
        if self.config.sparql_url:
            self.sparql = SPARQL(self.config.sparql_url)

    def _make_request(self, method: str, url: str, timeout: int = 30, **kwargs) -> dict:
        """
        Helper function for making HTTP requests with consistent error handling.

        Args:
            method: HTTP method (GET, POST, etc.)
            url: Request URL
            timeout: Request timeout in seconds
            **kwargs: Additional arguments for requests

        Returns:
            Dictionary with 'success', 'status_code', 'content', and optional 'error'
        """
        request_result = {}
        try:
            response = requests.request(method, url, timeout=timeout, **kwargs)
            request_result = {
                "success": response.status_code in [200, 204],
                "status_code": response.status_code,
                "content": response.text,
                "response": response,
            }
        except Exception as e:
            request_result = {
                "success": False,
                "status_code": None,
                "content": None,
                "error": str(e),
            }
        return request_result

    def run_shell_command(self, command: str, success_msg: str = None, error_msg: str = None) -> bool:
        """
        Helper function for running shell commands with consistent error handling.

        Args:
            command: Shell command to run
            success_msg: Message to log on success
            error_msg: Message to log on error

        Returns:
            True if command succeeded (returncode 0)
        """
        container_name = self.config.container_name
        command_success = False
        try:
            result = self.shell.run(command, debug=self.debug, tee=self.verbose)
            if result.returncode == 0:
                if success_msg:
                    self.log.log("✅", container_name, success_msg)
                command_success = True
            else:
                error_detail = error_msg or f"Command failed: {command}"
                if result.stderr:
                    error_detail += f" - {result.stderr}"
                self.log.log("❌", container_name, error_detail)
                command_success = False
        except Exception as e:
            self.log.log("❌", container_name, f"Exception running command '{command}': {e}")
            command_success = False
        return command_success

    def start(self, show_progress: bool = True) -> bool:
        """
        Start SPARQL server in Docker container.

        Args:
            show_progress: Show progress bar while waiting

        Returns:
            True if started successfully
        """
        container_name = self.config.container_name
        server_name = self.config.name
        start_success = False
        try:
            if self.is_running():
                self.log.log(
                    "✅",
                    container_name,
                    f"Container {container_name} is already running",
                )
                start_success = self.wait_until_ready(show_progress=show_progress)
            elif self.exists():
                self.log.log(
                    "✅",
                    container_name,
                    f"Container {container_name} exists, starting...",
                )
                start_cmd = f"docker start {container_name}"
                start_result = self.run_shell_command(
                    start_cmd,
                    error_msg=f"Failed to start container {container_name}",
                )
                if start_result:
                    start_success = self.wait_until_ready(show_progress=show_progress)
                else:
                    start_success = False
            else:
                self.log.log(
                    "✅",
                    container_name,
                    f"Creating new {server_name} container {container_name}...",
                )
                create_cmd = self.config.docker_run_command
                create_result = self.run_shell_command(
                    create_cmd,
                    error_msg=f"Failed to create container {container_name}",
                )
                if create_result:
                    start_success = self.wait_until_ready(show_progress=show_progress)
                else:
                    start_success = False
        except Exception as e:
            self.log.log(
                "❌",
                container_name,
                f"Error starting {server_name}: {e}",
            )
            start_success = False
        return start_success

    def count_triples(self) -> int:
        """
        Count total triples in the SPARQL server.

        Returns:
            Number of triples
        """
        count_query = "SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }"
        result = self.sparql.getValue(count_query, "count")
        triple_count = int(result) if result else 0
        return triple_count

    def wait_until_ready(self, timeout: int = 30, show_progress: bool = False) -> bool:
        """
        Wait for server to be ready.

        Args:
            timeout: Maximum seconds to wait
            show_progress: Show progress bar while waiting

        Returns:
            True if ready within timeout
        """
        container_name = self.config.container_name
        server_name = self.config.name
        status_url = self.config.status_url
        base_url = self.config.base_url

        self.log.log(
            "✅",
            container_name,
            f"Waiting for {server_name} to start ... {status_url}",
        )

        pbar = None
        if show_progress:
            pbar = tqdm(total=timeout, desc=f"Waiting for {server_name}", unit="s")

        ready_status = False
        for i in range(timeout):
            status_dict = self.status()
            if status_dict.get("status") == "ready":
                if show_progress and pbar:
                    pbar.close()
                self.log.log(
                    "✅",
                    container_name,
                    f"{server_name} ready at {base_url}",
                )
                ready_status = True
                break

            if show_progress and pbar:
                pbar.update(1)
            time.sleep(1)

        if not ready_status:
            if show_progress and pbar:
                pbar.close()
            self.log.log(
                "⚠️",
                container_name,
                f"Timeout waiting for {server_name} to start after {timeout}s",
            )

        return ready_status

    def is_running(self) -> bool:
        """
        Check if container is currently running.

        Returns:
            True if container is running
        """
        running_cmd = f'docker ps --filter "name={self.config.container_name}" --format "{{{{.Names}}}}"'
        result = self.shell.run(running_cmd, debug=self.debug)
        is_container_running = self.config.container_name in result.stdout
        return is_container_running

    def exists(self) -> bool:
        """
        Check if container exists (running or stopped).

        Returns:
            True if container exists
        """
        container_name = self.config.container_name
        check_cmd = f'docker ps -a --filter "name={container_name}" --format "{{{{.Names}}}}"'
        result = self.shell.run(check_cmd, debug=self.debug)
        if result.stderr:
            self.log.log("❌", container_name, result.stderr)
        container_exists = container_name in result.stdout
        return container_exists

    def stop(self) -> bool:
        """
        Stop the server container.

        Returns:
            True if stopped successfully
        """
        container_name=self.config.container_name
        stop_cmd = f"docker stop {container_name}"
        stop_success = self.run_shell_command(
            stop_cmd,
            success_msg=f"Stopped container {container_name}",
            error_msg=f"Failed to stop container {container_name}",
        )
        return stop_success

__init__(config, env)

Initialize the SPARQL server manager.

Source code in omnigraph/sparql_server.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def __init__(
    self,
    config: ServerConfig,
    env:ServerEnv
):
    """
    Initialize the SPARQL server manager.

    """
    self.log = env.log
    self.config = config
    self.name = self.config.name
    self.debug = env.debug
    self.verbose=env.verbose
    self.shell = env.shell

    # Subclasses must set these URLs
    if self.config.sparql_url:
        self.sparql = SPARQL(self.config.sparql_url)

count_triples()

Count total triples in the SPARQL server.

Returns:

Type Description
int

Number of triples

Source code in omnigraph/sparql_server.py
227
228
229
230
231
232
233
234
235
236
237
def count_triples(self) -> int:
    """
    Count total triples in the SPARQL server.

    Returns:
        Number of triples
    """
    count_query = "SELECT (COUNT(*) AS ?count) WHERE { ?s ?p ?o }"
    result = self.sparql.getValue(count_query, "count")
    triple_count = int(result) if result else 0
    return triple_count

exists()

Check if container exists (running or stopped).

Returns:

Type Description
bool

True if container exists

Source code in omnigraph/sparql_server.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def exists(self) -> bool:
    """
    Check if container exists (running or stopped).

    Returns:
        True if container exists
    """
    container_name = self.config.container_name
    check_cmd = f'docker ps -a --filter "name={container_name}" --format "{{{{.Names}}}}"'
    result = self.shell.run(check_cmd, debug=self.debug)
    if result.stderr:
        self.log.log("❌", container_name, result.stderr)
    container_exists = container_name in result.stdout
    return container_exists

is_running()

Check if container is currently running.

Returns:

Type Description
bool

True if container is running

Source code in omnigraph/sparql_server.py
294
295
296
297
298
299
300
301
302
303
304
def is_running(self) -> bool:
    """
    Check if container is currently running.

    Returns:
        True if container is running
    """
    running_cmd = f'docker ps --filter "name={self.config.container_name}" --format "{{{{.Names}}}}"'
    result = self.shell.run(running_cmd, debug=self.debug)
    is_container_running = self.config.container_name in result.stdout
    return is_container_running

run_shell_command(command, success_msg=None, error_msg=None)

Helper function for running shell commands with consistent error handling.

Parameters:

Name Type Description Default
command str

Shell command to run

required
success_msg str

Message to log on success

None
error_msg str

Message to log on error

None

Returns:

Type Description
bool

True if command succeeded (returncode 0)

Source code in omnigraph/sparql_server.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def run_shell_command(self, command: str, success_msg: str = None, error_msg: str = None) -> bool:
    """
    Helper function for running shell commands with consistent error handling.

    Args:
        command: Shell command to run
        success_msg: Message to log on success
        error_msg: Message to log on error

    Returns:
        True if command succeeded (returncode 0)
    """
    container_name = self.config.container_name
    command_success = False
    try:
        result = self.shell.run(command, debug=self.debug, tee=self.verbose)
        if result.returncode == 0:
            if success_msg:
                self.log.log("✅", container_name, success_msg)
            command_success = True
        else:
            error_detail = error_msg or f"Command failed: {command}"
            if result.stderr:
                error_detail += f" - {result.stderr}"
            self.log.log("❌", container_name, error_detail)
            command_success = False
    except Exception as e:
        self.log.log("❌", container_name, f"Exception running command '{command}': {e}")
        command_success = False
    return command_success

start(show_progress=True)

Start SPARQL server in Docker container.

Parameters:

Name Type Description Default
show_progress bool

Show progress bar while waiting

True

Returns:

Type Description
bool

True if started successfully

Source code in omnigraph/sparql_server.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def start(self, show_progress: bool = True) -> bool:
    """
    Start SPARQL server in Docker container.

    Args:
        show_progress: Show progress bar while waiting

    Returns:
        True if started successfully
    """
    container_name = self.config.container_name
    server_name = self.config.name
    start_success = False
    try:
        if self.is_running():
            self.log.log(
                "✅",
                container_name,
                f"Container {container_name} is already running",
            )
            start_success = self.wait_until_ready(show_progress=show_progress)
        elif self.exists():
            self.log.log(
                "✅",
                container_name,
                f"Container {container_name} exists, starting...",
            )
            start_cmd = f"docker start {container_name}"
            start_result = self.run_shell_command(
                start_cmd,
                error_msg=f"Failed to start container {container_name}",
            )
            if start_result:
                start_success = self.wait_until_ready(show_progress=show_progress)
            else:
                start_success = False
        else:
            self.log.log(
                "✅",
                container_name,
                f"Creating new {server_name} container {container_name}...",
            )
            create_cmd = self.config.docker_run_command
            create_result = self.run_shell_command(
                create_cmd,
                error_msg=f"Failed to create container {container_name}",
            )
            if create_result:
                start_success = self.wait_until_ready(show_progress=show_progress)
            else:
                start_success = False
    except Exception as e:
        self.log.log(
            "❌",
            container_name,
            f"Error starting {server_name}: {e}",
        )
        start_success = False
    return start_success

stop()

Stop the server container.

Returns:

Type Description
bool

True if stopped successfully

Source code in omnigraph/sparql_server.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def stop(self) -> bool:
    """
    Stop the server container.

    Returns:
        True if stopped successfully
    """
    container_name=self.config.container_name
    stop_cmd = f"docker stop {container_name}"
    stop_success = self.run_shell_command(
        stop_cmd,
        success_msg=f"Stopped container {container_name}",
        error_msg=f"Failed to stop container {container_name}",
    )
    return stop_success

wait_until_ready(timeout=30, show_progress=False)

Wait for server to be ready.

Parameters:

Name Type Description Default
timeout int

Maximum seconds to wait

30
show_progress bool

Show progress bar while waiting

False

Returns:

Type Description
bool

True if ready within timeout

Source code in omnigraph/sparql_server.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def wait_until_ready(self, timeout: int = 30, show_progress: bool = False) -> bool:
    """
    Wait for server to be ready.

    Args:
        timeout: Maximum seconds to wait
        show_progress: Show progress bar while waiting

    Returns:
        True if ready within timeout
    """
    container_name = self.config.container_name
    server_name = self.config.name
    status_url = self.config.status_url
    base_url = self.config.base_url

    self.log.log(
        "✅",
        container_name,
        f"Waiting for {server_name} to start ... {status_url}",
    )

    pbar = None
    if show_progress:
        pbar = tqdm(total=timeout, desc=f"Waiting for {server_name}", unit="s")

    ready_status = False
    for i in range(timeout):
        status_dict = self.status()
        if status_dict.get("status") == "ready":
            if show_progress and pbar:
                pbar.close()
            self.log.log(
                "✅",
                container_name,
                f"{server_name} ready at {base_url}",
            )
            ready_status = True
            break

        if show_progress and pbar:
            pbar.update(1)
        time.sleep(1)

    if not ready_status:
        if show_progress and pbar:
            pbar.close()
        self.log.log(
            "⚠️",
            container_name,
            f"Timeout waiting for {server_name} to start after {timeout}s",
        )

    return ready_status

version

Created on 2025-05-28

@author: wf

Version

Version handling for nicegui widgets

Source code in omnigraph/version.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@lod_storable
class Version:
    """
    Version handling for nicegui widgets
    """

    name = "omnigraph"
    version = omnigraph.__version__
    date = "2025-05-27"
    updated = "2025-05-28"
    description = "Unified Python interface for multiple graph databases"

    authors = "Wolfgang Fahl"

    doc_url = "https://wiki.bitplan.com/index.php/pyomnigraph"
    chat_url = "https://github.com/WolfgangFahl/pyomnigraph/discussions"
    cm_url = "https://github.com/WolfgangFahl/pyomnigraph"

    license = f"""Copyright 2025 contributors. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""

    longDescription = f"""{name} version {version}
{description}

  Created by {authors} on {date} last updated {updated}"""

yamlable

This is redundant copy to avoid dependeny hell the original is at https://github.com/WolfgangFahl/nicegui_widgets/blob/main/ngwidgets/yamlable.py

Created on 2023-12-08, Extended on 2023-16-12 and 2024-01-25

@author: wf, ChatGPT

Prompts for the development and extension of the 'YamlAble' class within the 'yamable' module:

  1. Develop 'YamlAble' class in 'yamable' module. It should convert dataclass instances to/from YAML.
  2. Implement methods for YAML block scalar style and exclude None values in 'YamlAble' class.
  3. Add functionality to remove None values from dataclass instances before YAML conversion.
  4. Ensure 'YamlAble' processes only dataclass instances, with error handling for non-dataclass objects.
  5. Extend 'YamlAble' for JSON serialization and deserialization.
  6. Add methods for saving/loading dataclass instances to/from YAML and JSON files in 'YamlAble'.
  7. Implement loading of dataclass instances from URLs for both YAML and JSON in 'YamlAble'.
  8. Write tests for 'YamlAble' within the pyLodStorage context. Use 'samples 2' example from pyLoDStorage https://github.com/WolfgangFahl/pyLoDStorage/blob/master/lodstorage/sample2.py as a reference.
  9. Ensure tests cover YAML/JSON serialization, deserialization, and file I/O operations, using the sample-based approach..
  10. Use Google-style docstrings, comments, and type hints in 'YamlAble' class and tests.
  11. Adhere to instructions and seek clarification for any uncertainties.
  12. Add @lod_storable annotation support that will automatically YamlAble support and add @dataclass and @dataclass_json prerequisite behavior to a class

DateConvert

date converter

Source code in omnigraph/yamlable.py
80
81
82
83
84
85
86
87
88
class DateConvert:
    """
    date converter
    """

    @classmethod
    def iso_date_to_datetime(cls, iso_date: str) -> datetime.date:
        date = datetime.strptime(iso_date, "%Y-%m-%d").date() if iso_date else None
        return date

YamlAble

Bases: Generic[T]

An extended YAML handler class for converting dataclass objects to and from YAML format, and handling loading from and saving to files and URLs.

Source code in omnigraph/yamlable.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
class YamlAble(Generic[T]):
    """
    An extended YAML handler class for converting dataclass objects to and from YAML format,
    and handling loading from and saving to files and URLs.
    """

    def _yaml_setup(self):
        """
        Initializes the YamAble handler, setting up custom representers and preparing it for various operations.
        """
        if not is_dataclass(self):
            raise ValueError("I must be a dataclass instance.")
        if not hasattr(self, "_yaml_dumper"):
            self._yaml_dumper = yaml.Dumper
            self._yaml_dumper.ignore_aliases = lambda *_args: True
            self._yaml_dumper.add_representer(type(None), self.represent_none)
            self._yaml_dumper.add_representer(str, self.represent_literal)

    def represent_none(self, _, __) -> yaml.Node:
        """
        Custom representer for ignoring None values in the YAML output.
        """
        return self._yaml_dumper.represent_scalar("tag:yaml.org,2002:null", "")

    def represent_literal(self, dumper: yaml.Dumper, data: str) -> yaml.Node:
        """
        Custom representer for block scalar style for strings.
        """
        if "\n" in data:
            return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
        return dumper.represent_scalar("tag:yaml.org,2002:str", data)

    def to_yaml(
        self,
        ignore_none: bool = True,
        ignore_underscore: bool = True,
        allow_unicode: bool = True,
        sort_keys: bool = False,
    ) -> str:
        """
        Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables,
        and using block scalar style for strings.

        Args:
            ignore_none: Flag to indicate whether None values should be removed from the YAML output.
            ignore_underscore: Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.
            allow_unicode: Flag to indicate whether to allow unicode characters in the output.
            sort_keys: Flag to indicate whether to sort the dictionary keys in the output.

        Returns:
            A string representation of the dataclass object in YAML format.
        """
        obj_dict = asdict(self)
        self._yaml_setup()
        clean_dict = self.remove_ignored_values(obj_dict, ignore_none, ignore_underscore)
        yaml_str = yaml.dump(
            clean_dict,
            Dumper=self._yaml_dumper,
            default_flow_style=False,
            allow_unicode=allow_unicode,
            sort_keys=sort_keys,
        )
        return yaml_str

    @classmethod
    def from_yaml(cls: Type[T], yaml_str: str) -> T:
        """
        Deserializes a YAML string to a dataclass instance.

        Args:
            yaml_str (str): A string containing YAML formatted data.

        Returns:
            T: An instance of the dataclass.
        """
        data: dict[str, Any] = yaml.safe_load(yaml_str)
        instance: T = cls.from_dict(data)
        return instance

    @classmethod
    def load_from_yaml_file(cls: Type[T], filename: str) -> T:
        """
        Loads a dataclass instance from a YAML file.

        Args:
            filename (str): The path to the YAML file.

        Returns:
            T: An instance of the dataclass.
        """
        with open(filename, "r") as file:
            yaml_str: str = file.read()
        instance: T = cls.from_yaml(yaml_str)
        return instance

    @classmethod
    def load_from_yaml_url(cls: Type[T], url: str) -> T:
        """
        Loads a dataclass instance from a YAML string obtained from a URL.

        Args:
            url (str): The URL pointing to the YAML data.

        Returns:
            T: An instance of the dataclass.
        """
        yaml_str: str = cls.read_from_url(url)
        instance: T = cls.from_yaml(yaml_str)
        return instance

    def save_to_yaml_file(self, filename: str):
        """
        Saves the current dataclass instance to a YAML file.

        Args:
            filename (str): The path where the YAML file will be saved.
        """
        yaml_content: str = self.to_yaml()
        with open(filename, "w") as file:
            file.write(yaml_content)

    @classmethod
    def load_from_json_file(cls: Type[T], filename: str) -> T:
        """
        Loads a dataclass instance from a JSON file.

        Args:
            filename (str): The path to the JSON file.

        Returns:
            T: An instance of the dataclass.
        """
        with open(filename, "r") as file:
            json_str: str = file.read()
        instance: T = cls.from_json(json_str)
        return instance

    @classmethod
    def load_from_json_url(cls: Type[T], url: str) -> T:
        """
        Loads a dataclass instance from a JSON string obtained from a URL.

        Args:
            url (str): The URL pointing to the JSON data.

        Returns:
            T: An instance of the dataclass.
        """
        json_str: str = cls.read_from_url(url)
        instance: T = cls.from_json(json_str)
        return instance

    def save_to_json_file(self, filename: str, **kwargs):
        """
        Saves the current dataclass instance to a JSON file.

        Args:
            filename (str): The path where the JSON file will be saved.
            **kwargs: Additional keyword arguments for the `to_json` method.
        """
        json_content: str = self.to_json(**kwargs)
        with open(filename, "w") as file:
            file.write(json_content)

    @classmethod
    def read_from_url(cls, url: str) -> str:
        """
        Helper method to fetch content from a URL.
        """
        with urllib.request.urlopen(url) as response:
            if response.status == 200:
                return response.read().decode()
            else:
                raise Exception(f"Unable to load data from URL: {url}")

    @classmethod
    def remove_ignored_values(
        cls,
        value: Any,
        ignore_none: bool = True,
        ignore_underscore: bool = False,
        ignore_empty: bool = True,
    ) -> Any:
        """
        Recursively removes specified types of values from a dictionary or list.
        By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

        Args:
            value: The value to process (dictionary, list, or other).
            ignore_none: Flag to indicate whether None values should be removed.
            ignore_underscore: Flag to indicate whether keys starting with an underscore should be removed.
            ignore_empty: Flag to indicate whether empty collections should be removed.
        """

        def is_valid(v):
            """Check if the value is valid based on the specified flags."""
            if ignore_none and v is None:
                return False
            if ignore_empty:
                if isinstance(v, Mapping) and not v:
                    return False  # Empty dictionary
                if isinstance(v, Iterable) and not isinstance(v, (str, bytes)) and not v:
                    return False  # Empty list, set, tuple, etc., but not string or bytes
            return True

        if isinstance(value, Mapping):
            value = {
                k: YamlAble.remove_ignored_values(v, ignore_none, ignore_underscore, ignore_empty)
                for k, v in value.items()
                if is_valid(v) and (not ignore_underscore or not k.startswith("_"))
            }
        elif isinstance(value, Iterable) and not isinstance(value, (str, bytes)):
            value = [
                YamlAble.remove_ignored_values(v, ignore_none, ignore_underscore, ignore_empty)
                for v in value
                if is_valid(v)
            ]
        return value

    @classmethod
    def from_dict2(cls: Type[T], data: dict) -> T:
        """
        Creates an instance of a dataclass from a dictionary, typically used in deserialization.
        """
        if not data:
            return None
        instance = from_dict(data_class=cls, data=data)
        return instance

from_dict2(data) classmethod

Creates an instance of a dataclass from a dictionary, typically used in deserialization.

Source code in omnigraph/yamlable.py
310
311
312
313
314
315
316
317
318
@classmethod
def from_dict2(cls: Type[T], data: dict) -> T:
    """
    Creates an instance of a dataclass from a dictionary, typically used in deserialization.
    """
    if not data:
        return None
    instance = from_dict(data_class=cls, data=data)
    return instance

from_yaml(yaml_str) classmethod

Deserializes a YAML string to a dataclass instance.

Parameters:

Name Type Description Default
yaml_str str

A string containing YAML formatted data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in omnigraph/yamlable.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def from_yaml(cls: Type[T], yaml_str: str) -> T:
    """
    Deserializes a YAML string to a dataclass instance.

    Args:
        yaml_str (str): A string containing YAML formatted data.

    Returns:
        T: An instance of the dataclass.
    """
    data: dict[str, Any] = yaml.safe_load(yaml_str)
    instance: T = cls.from_dict(data)
    return instance

load_from_json_file(filename) classmethod

Loads a dataclass instance from a JSON file.

Parameters:

Name Type Description Default
filename str

The path to the JSON file.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in omnigraph/yamlable.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@classmethod
def load_from_json_file(cls: Type[T], filename: str) -> T:
    """
    Loads a dataclass instance from a JSON file.

    Args:
        filename (str): The path to the JSON file.

    Returns:
        T: An instance of the dataclass.
    """
    with open(filename, "r") as file:
        json_str: str = file.read()
    instance: T = cls.from_json(json_str)
    return instance

load_from_json_url(url) classmethod

Loads a dataclass instance from a JSON string obtained from a URL.

Parameters:

Name Type Description Default
url str

The URL pointing to the JSON data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in omnigraph/yamlable.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@classmethod
def load_from_json_url(cls: Type[T], url: str) -> T:
    """
    Loads a dataclass instance from a JSON string obtained from a URL.

    Args:
        url (str): The URL pointing to the JSON data.

    Returns:
        T: An instance of the dataclass.
    """
    json_str: str = cls.read_from_url(url)
    instance: T = cls.from_json(json_str)
    return instance

load_from_yaml_file(filename) classmethod

Loads a dataclass instance from a YAML file.

Parameters:

Name Type Description Default
filename str

The path to the YAML file.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in omnigraph/yamlable.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@classmethod
def load_from_yaml_file(cls: Type[T], filename: str) -> T:
    """
    Loads a dataclass instance from a YAML file.

    Args:
        filename (str): The path to the YAML file.

    Returns:
        T: An instance of the dataclass.
    """
    with open(filename, "r") as file:
        yaml_str: str = file.read()
    instance: T = cls.from_yaml(yaml_str)
    return instance

load_from_yaml_url(url) classmethod

Loads a dataclass instance from a YAML string obtained from a URL.

Parameters:

Name Type Description Default
url str

The URL pointing to the YAML data.

required

Returns:

Name Type Description
T T

An instance of the dataclass.

Source code in omnigraph/yamlable.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@classmethod
def load_from_yaml_url(cls: Type[T], url: str) -> T:
    """
    Loads a dataclass instance from a YAML string obtained from a URL.

    Args:
        url (str): The URL pointing to the YAML data.

    Returns:
        T: An instance of the dataclass.
    """
    yaml_str: str = cls.read_from_url(url)
    instance: T = cls.from_yaml(yaml_str)
    return instance

read_from_url(url) classmethod

Helper method to fetch content from a URL.

Source code in omnigraph/yamlable.py
255
256
257
258
259
260
261
262
263
264
@classmethod
def read_from_url(cls, url: str) -> str:
    """
    Helper method to fetch content from a URL.
    """
    with urllib.request.urlopen(url) as response:
        if response.status == 200:
            return response.read().decode()
        else:
            raise Exception(f"Unable to load data from URL: {url}")

remove_ignored_values(value, ignore_none=True, ignore_underscore=False, ignore_empty=True) classmethod

Recursively removes specified types of values from a dictionary or list. By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

Parameters:

Name Type Description Default
value Any

The value to process (dictionary, list, or other).

required
ignore_none bool

Flag to indicate whether None values should be removed.

True
ignore_underscore bool

Flag to indicate whether keys starting with an underscore should be removed.

False
ignore_empty bool

Flag to indicate whether empty collections should be removed.

True
Source code in omnigraph/yamlable.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
@classmethod
def remove_ignored_values(
    cls,
    value: Any,
    ignore_none: bool = True,
    ignore_underscore: bool = False,
    ignore_empty: bool = True,
) -> Any:
    """
    Recursively removes specified types of values from a dictionary or list.
    By default, it removes keys with None values. Optionally, it can also remove keys starting with an underscore.

    Args:
        value: The value to process (dictionary, list, or other).
        ignore_none: Flag to indicate whether None values should be removed.
        ignore_underscore: Flag to indicate whether keys starting with an underscore should be removed.
        ignore_empty: Flag to indicate whether empty collections should be removed.
    """

    def is_valid(v):
        """Check if the value is valid based on the specified flags."""
        if ignore_none and v is None:
            return False
        if ignore_empty:
            if isinstance(v, Mapping) and not v:
                return False  # Empty dictionary
            if isinstance(v, Iterable) and not isinstance(v, (str, bytes)) and not v:
                return False  # Empty list, set, tuple, etc., but not string or bytes
        return True

    if isinstance(value, Mapping):
        value = {
            k: YamlAble.remove_ignored_values(v, ignore_none, ignore_underscore, ignore_empty)
            for k, v in value.items()
            if is_valid(v) and (not ignore_underscore or not k.startswith("_"))
        }
    elif isinstance(value, Iterable) and not isinstance(value, (str, bytes)):
        value = [
            YamlAble.remove_ignored_values(v, ignore_none, ignore_underscore, ignore_empty)
            for v in value
            if is_valid(v)
        ]
    return value

represent_literal(dumper, data)

Custom representer for block scalar style for strings.

Source code in omnigraph/yamlable.py
115
116
117
118
119
120
121
def represent_literal(self, dumper: yaml.Dumper, data: str) -> yaml.Node:
    """
    Custom representer for block scalar style for strings.
    """
    if "\n" in data:
        return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
    return dumper.represent_scalar("tag:yaml.org,2002:str", data)

represent_none(_, __)

Custom representer for ignoring None values in the YAML output.

Source code in omnigraph/yamlable.py
109
110
111
112
113
def represent_none(self, _, __) -> yaml.Node:
    """
    Custom representer for ignoring None values in the YAML output.
    """
    return self._yaml_dumper.represent_scalar("tag:yaml.org,2002:null", "")

save_to_json_file(filename, **kwargs)

Saves the current dataclass instance to a JSON file.

Parameters:

Name Type Description Default
filename str

The path where the JSON file will be saved.

required
**kwargs

Additional keyword arguments for the to_json method.

{}
Source code in omnigraph/yamlable.py
243
244
245
246
247
248
249
250
251
252
253
def save_to_json_file(self, filename: str, **kwargs):
    """
    Saves the current dataclass instance to a JSON file.

    Args:
        filename (str): The path where the JSON file will be saved.
        **kwargs: Additional keyword arguments for the `to_json` method.
    """
    json_content: str = self.to_json(**kwargs)
    with open(filename, "w") as file:
        file.write(json_content)

save_to_yaml_file(filename)

Saves the current dataclass instance to a YAML file.

Parameters:

Name Type Description Default
filename str

The path where the YAML file will be saved.

required
Source code in omnigraph/yamlable.py
201
202
203
204
205
206
207
208
209
210
def save_to_yaml_file(self, filename: str):
    """
    Saves the current dataclass instance to a YAML file.

    Args:
        filename (str): The path where the YAML file will be saved.
    """
    yaml_content: str = self.to_yaml()
    with open(filename, "w") as file:
        file.write(yaml_content)

to_yaml(ignore_none=True, ignore_underscore=True, allow_unicode=True, sort_keys=False)

Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables, and using block scalar style for strings.

Parameters:

Name Type Description Default
ignore_none bool

Flag to indicate whether None values should be removed from the YAML output.

True
ignore_underscore bool

Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.

True
allow_unicode bool

Flag to indicate whether to allow unicode characters in the output.

True
sort_keys bool

Flag to indicate whether to sort the dictionary keys in the output.

False

Returns:

Type Description
str

A string representation of the dataclass object in YAML format.

Source code in omnigraph/yamlable.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def to_yaml(
    self,
    ignore_none: bool = True,
    ignore_underscore: bool = True,
    allow_unicode: bool = True,
    sort_keys: bool = False,
) -> str:
    """
    Converts this dataclass object to a YAML string, with options to omit None values and/or underscore-prefixed variables,
    and using block scalar style for strings.

    Args:
        ignore_none: Flag to indicate whether None values should be removed from the YAML output.
        ignore_underscore: Flag to indicate whether attributes starting with an underscore should be excluded from the YAML output.
        allow_unicode: Flag to indicate whether to allow unicode characters in the output.
        sort_keys: Flag to indicate whether to sort the dictionary keys in the output.

    Returns:
        A string representation of the dataclass object in YAML format.
    """
    obj_dict = asdict(self)
    self._yaml_setup()
    clean_dict = self.remove_ignored_values(obj_dict, ignore_none, ignore_underscore)
    yaml_str = yaml.dump(
        clean_dict,
        Dumper=self._yaml_dumper,
        default_flow_style=False,
        allow_unicode=allow_unicode,
        sort_keys=sort_keys,
    )
    return yaml_str

lod_storable(cls)

Decorator to make a class LoDStorable by inheriting from YamlAble. This decorator also ensures the class is a dataclass and has JSON serialization/deserialization capabilities.

Source code in omnigraph/yamlable.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def lod_storable(cls):
    """
    Decorator to make a class LoDStorable by
    inheriting from YamlAble.
    This decorator also ensures the class is a
    dataclass and has JSON serialization/deserialization
    capabilities.
    """
    cls = dataclass(cls)  # Apply the @dataclass decorator
    cls = dataclass_json(cls)  # Apply the @dataclass_json decorator

    class LoDStorable(YamlAble, cls):
        """
        decorator class
        """

        __qualname__ = cls.__qualname__
        pass

    LoDStorable.__name__ = cls.__name__
    LoDStorable.__doc__ = cls.__doc__

    return LoDStorable