Skip to content

nicescholia API Documentation

cmd

Command line entry point

ScholiaCmd

Bases: WebserverCmd

Command Line Interface

Source code in nscholia/cmd.py
13
14
15
16
17
18
19
20
class ScholiaCmd(WebserverCmd):
    """
    Command Line Interface
    """

    def getArgParser(self, description: str, version_msg) -> ArgumentParser:
        parser = super().getArgParser(description, version_msg)
        return parser

dashboard

Dashboard

UI for monitoring endpoints using ListOfDictsGrid.

Source code in nscholia/dashboard.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class Dashboard:
    """
    UI for monitoring endpoints using ListOfDictsGrid.
    """

    def __init__(self, solution):
        self.solution = solution
        self.webserver = solution.webserver
        self.grid = None  # Will hold the ListOfDictsGrid instance

        # Initialize the endpoints provider
        self.endpoints_provider = Endpoints()

    async def check_all(self):
        """Run checks for all endpoints in the grid"""
        if not self.grid:
            return

        ui.notify("Checking endpoints...")

        # Access the List of Dicts (LOD) directly from the wrapper
        rows = self.grid.lod

        for row in rows:
            # Visual update for checking state
            row["status"] = "Checking..."
            row["color"] = "#f0f0f0"  # Light gray
            row["triples"] = 0
            row["timestamp"] = ""

            # Update the grid view to show 'Checking...' state immediately
            self.grid.update()

            # Async check
            try:
                url = row["url"]
                # First check if endpoint is online
                result = await Monitor.check(url)

                # Update basic availability result
                if result.is_online:
                    row["status"] = f"Online ({result.status_code})"
                    row["latency"] = result.latency
                    row["color"] = "#d1fae5"  # light green

                    # Now get update state information (triples & timestamp)
                    ep_key = row["endpoint_key"]
                    endpoints_data = self.endpoints_provider.get_endpoints()

                    if ep_key in endpoints_data:
                        ep = endpoints_data[ep_key]
                        # Run update state query in executor to avoid blocking
                        update_state = await asyncio.get_event_loop().run_in_executor(
                            None, UpdateState.from_endpoint, self.endpoints_provider, ep
                        )

                        if update_state.success:
                            row["triples"] = update_state.triples or 0
                            row["timestamp"] = update_state.timestamp or ""
                        else:
                            row["triples"] = 0
                            row["timestamp"] = update_state.error or "N/A"
                            # Optionally adjust status if update query failed
                            if update_state.error:
                                row[
                                    "status"
                                ] += f" (Update query: {update_state.error})"
                else:
                    row["status"] = result.error or f"Error {result.status_code}"
                    row["latency"] = 0
                    row["triples"] = 0
                    row["timestamp"] = ""
                    row["color"] = "#fee2e2"  # light red

            except Exception as ex:
                row["status"] = str(ex)
                row["latency"] = 0
                row["triples"] = 0
                row["timestamp"] = ""
                row["color"] = "#fee2e2"

        # Final update to show results
        self.grid.update()
        ui.notify("Status check complete")

    def setup_ui(self):
        """
        Render the dashboard
        """
        with ui.row().classes("w-full items-center mb-4"):
            ui.label("Endpoint Monitor").classes("text-2xl font-bold")
            ui.button("Refresh", icon="refresh", on_click=self.check_all)

        # 1. Fetch data
        endpoints_data = self.endpoints_provider.get_endpoints()

        rows = []
        for key, ep in endpoints_data.items():
            # Prefer checking the website URL over the SPARQL endpoint
            check_url = getattr(ep, "website", None)
            if not check_url:
                check_url = getattr(ep, "endpoint", getattr(ep, "url", ""))

            ep_url = getattr(ep, "endpoint", getattr(ep, "url", ""))
            ep_name = getattr(ep, "name", key)
            ep_group = getattr(ep, "group", "General")

            link_html = Link.create(
                check_url if hasattr(ep, "website") else ep_url, "Link"
            )

            rows.append(
                {
                    "group": ep_group,
                    "name": ep_name,
                    "url": check_url,  # URL to check for availability
                    "endpoint_url": ep_url,  # Original SPARQL endpoint
                    "endpoint_key": key,  # Store the key for later lookup
                    "link": link_html,
                    "status": "Pending",
                    "latency": 0.0,
                    "triples": 0,
                    "timestamp": "",
                    "color": "#ffffff",
                }
            )

        column_defs = [
            {"headerName": "Group", "field": "group", "rowGroup": True, "hide": True},
            {
                "headerName": "Service",
                "field": "name",
                "sortable": True,
                "filter": True,
                "flex": 2,
            },
            {
                "headerName": "URL",
                "field": "link",
                "width": 70,
            },
            {
                "headerName": "Status",
                "field": "status",
                "sortable": True,
                "flex": 2,
            },
            {
                "headerName": "Latency (s)",
                "field": "latency",
                "sortable": True,
                "width": 120,
                "type": "numericColumn",
                "valueFormatter": "params.value ? params.value.toFixed(3) : '0.000'",
            },
            {
                "headerName": "Triples",
                "field": "triples",
                "sortable": True,
                "width": 130,
                "type": "numericColumn",
                "valueFormatter": "params.value ? params.value.toLocaleString() : '0'",
            },
            {
                "headerName": "Last Update",
                "field": "timestamp",
                "sortable": True,
                "width": 200,
            },
        ]

        grid_options = {
            "rowSelection": "single",
            "animateRows": True,
            ":getRowStyle": """(params) => {
                return { background: params.data.color };
            }""",
        }

        config = GridConfig(
            column_defs=column_defs,
            key_col="url",
            options=grid_options,
            html_columns=[2],
            auto_size_columns=True,
            theme="balham",
        )

        self.grid = ListOfDictsGrid(lod=rows, config=config)
        ui.timer(0.5, self.check_all, once=True)

check_all() async

Run checks for all endpoints in the grid

Source code in nscholia/dashboard.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def check_all(self):
    """Run checks for all endpoints in the grid"""
    if not self.grid:
        return

    ui.notify("Checking endpoints...")

    # Access the List of Dicts (LOD) directly from the wrapper
    rows = self.grid.lod

    for row in rows:
        # Visual update for checking state
        row["status"] = "Checking..."
        row["color"] = "#f0f0f0"  # Light gray
        row["triples"] = 0
        row["timestamp"] = ""

        # Update the grid view to show 'Checking...' state immediately
        self.grid.update()

        # Async check
        try:
            url = row["url"]
            # First check if endpoint is online
            result = await Monitor.check(url)

            # Update basic availability result
            if result.is_online:
                row["status"] = f"Online ({result.status_code})"
                row["latency"] = result.latency
                row["color"] = "#d1fae5"  # light green

                # Now get update state information (triples & timestamp)
                ep_key = row["endpoint_key"]
                endpoints_data = self.endpoints_provider.get_endpoints()

                if ep_key in endpoints_data:
                    ep = endpoints_data[ep_key]
                    # Run update state query in executor to avoid blocking
                    update_state = await asyncio.get_event_loop().run_in_executor(
                        None, UpdateState.from_endpoint, self.endpoints_provider, ep
                    )

                    if update_state.success:
                        row["triples"] = update_state.triples or 0
                        row["timestamp"] = update_state.timestamp or ""
                    else:
                        row["triples"] = 0
                        row["timestamp"] = update_state.error or "N/A"
                        # Optionally adjust status if update query failed
                        if update_state.error:
                            row[
                                "status"
                            ] += f" (Update query: {update_state.error})"
            else:
                row["status"] = result.error or f"Error {result.status_code}"
                row["latency"] = 0
                row["triples"] = 0
                row["timestamp"] = ""
                row["color"] = "#fee2e2"  # light red

        except Exception as ex:
            row["status"] = str(ex)
            row["latency"] = 0
            row["triples"] = 0
            row["timestamp"] = ""
            row["color"] = "#fee2e2"

    # Final update to show results
    self.grid.update()
    ui.notify("Status check complete")

setup_ui()

Render the dashboard

Source code in nscholia/dashboard.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def setup_ui(self):
    """
    Render the dashboard
    """
    with ui.row().classes("w-full items-center mb-4"):
        ui.label("Endpoint Monitor").classes("text-2xl font-bold")
        ui.button("Refresh", icon="refresh", on_click=self.check_all)

    # 1. Fetch data
    endpoints_data = self.endpoints_provider.get_endpoints()

    rows = []
    for key, ep in endpoints_data.items():
        # Prefer checking the website URL over the SPARQL endpoint
        check_url = getattr(ep, "website", None)
        if not check_url:
            check_url = getattr(ep, "endpoint", getattr(ep, "url", ""))

        ep_url = getattr(ep, "endpoint", getattr(ep, "url", ""))
        ep_name = getattr(ep, "name", key)
        ep_group = getattr(ep, "group", "General")

        link_html = Link.create(
            check_url if hasattr(ep, "website") else ep_url, "Link"
        )

        rows.append(
            {
                "group": ep_group,
                "name": ep_name,
                "url": check_url,  # URL to check for availability
                "endpoint_url": ep_url,  # Original SPARQL endpoint
                "endpoint_key": key,  # Store the key for later lookup
                "link": link_html,
                "status": "Pending",
                "latency": 0.0,
                "triples": 0,
                "timestamp": "",
                "color": "#ffffff",
            }
        )

    column_defs = [
        {"headerName": "Group", "field": "group", "rowGroup": True, "hide": True},
        {
            "headerName": "Service",
            "field": "name",
            "sortable": True,
            "filter": True,
            "flex": 2,
        },
        {
            "headerName": "URL",
            "field": "link",
            "width": 70,
        },
        {
            "headerName": "Status",
            "field": "status",
            "sortable": True,
            "flex": 2,
        },
        {
            "headerName": "Latency (s)",
            "field": "latency",
            "sortable": True,
            "width": 120,
            "type": "numericColumn",
            "valueFormatter": "params.value ? params.value.toFixed(3) : '0.000'",
        },
        {
            "headerName": "Triples",
            "field": "triples",
            "sortable": True,
            "width": 130,
            "type": "numericColumn",
            "valueFormatter": "params.value ? params.value.toLocaleString() : '0'",
        },
        {
            "headerName": "Last Update",
            "field": "timestamp",
            "sortable": True,
            "width": 200,
        },
    ]

    grid_options = {
        "rowSelection": "single",
        "animateRows": True,
        ":getRowStyle": """(params) => {
            return { background: params.data.color };
        }""",
    }

    config = GridConfig(
        column_defs=column_defs,
        key_col="url",
        options=grid_options,
        html_columns=[2],
        auto_size_columns=True,
        theme="balham",
    )

    self.grid = ListOfDictsGrid(lod=rows, config=config)
    ui.timer(0.5, self.check_all, once=True)

endpoints

created 2025-12-17 author wf

Endpoints

endpoints access

Source code in nscholia/endpoints.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class Endpoints:
    """
    endpoints access
    """

    def __init__(self):
        self.nqm = NamedQueryManager.from_samples()
        # Initialize QueryManager with the specific YAML path for dashboard queries
        yaml_path = (
            Path(__file__).parent.parent
            / "nscholia_examples"
            / "dashboard_queries.yaml"
        )
        if not os.path.exists(yaml_path):
            raise FileNotFoundError(f"Query YAML file not found: {yaml_path}")
        self.qm = QueryManager(
            lang="sparql", queriesPath=yaml_path, with_default=False, debug=False
        )

    def get_endpoints(self) -> Dict[str, Any]:
        """
        list all endpoints
        """
        endpoints = self.nqm.endpoints
        return endpoints

    def runQuery(self, query: Query) -> Optional[List[Dict[str, Any]]]:
        """
        Run a SPARQL query and return results as list of dicts

        Args:
            query: Query object to execute

        Returns:
            List of dictionaries containing query results, or None if error
        """
        endpoint = SPARQL(query.endpoint)
        if query.params.has_params:
            query.apply_default_params()
        qlod = endpoint.queryAsListOfDicts(
            query.query, param_dict=query.params.params_dict
        )
        return qlod

    def update_state_query_for_endpoint(self, ep: Endpoint) -> Query:
        """
        get the update state query for the given endpoint
        """
        query = None
        if ep.database == "blazegraph" and "wikidata" in ep.name.lower():
            query_name = "WikidataUpdateState"
        elif ep.database == "qlever":
            query_name = "QLeverUpdateState"
        else:
            query_name = "TripleCount"
        if query_name in self.qm.queriesByName:
            query = self.qm.queriesByName.get(query_name)
            query.endpoint = ep.endpoint
        return query

get_endpoints()

list all endpoints

Source code in nscholia/endpoints.py
36
37
38
39
40
41
def get_endpoints(self) -> Dict[str, Any]:
    """
    list all endpoints
    """
    endpoints = self.nqm.endpoints
    return endpoints

runQuery(query)

Run a SPARQL query and return results as list of dicts

Parameters:

Name Type Description Default
query Query

Query object to execute

required

Returns:

Type Description
Optional[List[Dict[str, Any]]]

List of dictionaries containing query results, or None if error

Source code in nscholia/endpoints.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def runQuery(self, query: Query) -> Optional[List[Dict[str, Any]]]:
    """
    Run a SPARQL query and return results as list of dicts

    Args:
        query: Query object to execute

    Returns:
        List of dictionaries containing query results, or None if error
    """
    endpoint = SPARQL(query.endpoint)
    if query.params.has_params:
        query.apply_default_params()
    qlod = endpoint.queryAsListOfDicts(
        query.query, param_dict=query.params.params_dict
    )
    return qlod

update_state_query_for_endpoint(ep)

get the update state query for the given endpoint

Source code in nscholia/endpoints.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def update_state_query_for_endpoint(self, ep: Endpoint) -> Query:
    """
    get the update state query for the given endpoint
    """
    query = None
    if ep.database == "blazegraph" and "wikidata" in ep.name.lower():
        query_name = "WikidataUpdateState"
    elif ep.database == "qlever":
        query_name = "QLeverUpdateState"
    else:
        query_name = "TripleCount"
    if query_name in self.qm.queriesByName:
        query = self.qm.queriesByName.get(query_name)
        query.endpoint = ep.endpoint
    return query

UpdateState dataclass

the update state of and endpoint

Source code in nscholia/endpoints.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@dataclass
class UpdateState:
    """
    the update state of and endpoint
    """

    endpoint_name: str
    triples: Optional[int] = None
    timestamp: Optional[str] = None
    success: bool = False
    error: Optional[str] = None

    @classmethod
    def from_endpoint(cls, em: Endpoints, ep: Endpoint):
        update_state = cls(triples=0, timestamp=ep.data_seeded, endpoint_name=ep.name)
        try:
            query = em.update_state_query_for_endpoint(ep)
            qlod = em.runQuery(query)
            success = qlod and len(qlod) > 0
            if success:
                update_state.success = True
                record = qlod[0]
                if "tripleCount" in record:
                    update_state.triples = int(record.get("tripleCount"))
                for var_name in ["timestamp", "updates_complete_until"]:
                    if var_name in record:
                        timestamp = record.get(var_name)
                        if isinstance(timestamp, datetime):
                            timestamp = timestamp.isoformat()
                        update_state.timestamp = timestamp
                        break
            else:
                update_state.error = "query failed"
        except Exception as ex:
            update_state.error = str(ex)
        return update_state

monitor

Availability monitoring logic

Monitor

Checks endpoint availability

Source code in nscholia/monitor.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class Monitor:
    """
    Checks endpoint availability
    """

    # Default User-Agent to avoid being blocked by servers
    DEFAULT_USER_AGENT = (
        "nscholia-monitor/1.0 (https://github.com/WolfgangFahl/nscholia)"
    )

    @staticmethod
    async def check(
        url: str, timeout: float = 5.0, user_agent: str = None
    ) -> StatusResult:
        """
        Check if an endpoint is available.

        Args:
            url: URL to check
            timeout: Request timeout in seconds
            user_agent: Custom user agent string
        """
        if user_agent is None:
            user_agent = Monitor.DEFAULT_USER_AGENT

        headers = {"User-Agent": user_agent}
        start_time = time.time()

        try:
            async with httpx.AsyncClient(follow_redirects=True) as client:
                response = await client.get(url, headers=headers, timeout=timeout)
                duration = time.time() - start_time
                return StatusResult(
                    endpoint_name="",  # Filled by caller
                    url=url,
                    status_code=response.status_code,
                    latency=round(duration, 3),
                )
        except httpx.TimeoutException:
            return StatusResult(endpoint_name="", url=url, error="Timeout")
        except Exception as e:
            return StatusResult(endpoint_name="", url=url, error=str(e))

check(url, timeout=5.0, user_agent=None) async staticmethod

Check if an endpoint is available.

Parameters:

Name Type Description Default
url str

URL to check

required
timeout float

Request timeout in seconds

5.0
user_agent str

Custom user agent string

None
Source code in nscholia/monitor.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@staticmethod
async def check(
    url: str, timeout: float = 5.0, user_agent: str = None
) -> StatusResult:
    """
    Check if an endpoint is available.

    Args:
        url: URL to check
        timeout: Request timeout in seconds
        user_agent: Custom user agent string
    """
    if user_agent is None:
        user_agent = Monitor.DEFAULT_USER_AGENT

    headers = {"User-Agent": user_agent}
    start_time = time.time()

    try:
        async with httpx.AsyncClient(follow_redirects=True) as client:
            response = await client.get(url, headers=headers, timeout=timeout)
            duration = time.time() - start_time
            return StatusResult(
                endpoint_name="",  # Filled by caller
                url=url,
                status_code=response.status_code,
                latency=round(duration, 3),
            )
    except httpx.TimeoutException:
        return StatusResult(endpoint_name="", url=url, error="Timeout")
    except Exception as e:
        return StatusResult(endpoint_name="", url=url, error=str(e))

version

Version dataclass

Version handling for nicescholia

Source code in nscholia/version.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@dataclass
class Version:
    """
    Version handling for nicescholia
    """

    name = "nicescholia"
    version = nscholia.__version__
    date = "2025-12-17"
    updated = "2025-12-17"
    description = "nicegui based scholia"
    authors = "Wolfgang Fahl"
    doc_url = "https://wiki.bitplan.com/index.php/nicescholia"
    chat_url = "https://github.com/WolfgangFahl/nicescholia/discussions"
    cm_url = "https://github.com/WolfgangFahl/nicescholia"
    license = """Copyright 2025 contributors. All rights reserved.

  Licensed under the Apache License 2.0
  http://www.apache.org/licenses/LICENSE-2.0

  Distributed on an "AS IS" basis without warranties
  or conditions of any kind, either express or implied."""

    longDescription = f"""{name} version {version}
{description}

  Created by {authors} on {date} last updated {updated}"""

webserver

Webserver definition

ScholiaSolution

Bases: InputWebSolution

Handling specific page requests for a client session.

Source code in nscholia/webserver.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class ScholiaSolution(InputWebSolution):
    """
    Handling specific page requests for a client session.
    """

    def __init__(self, webserver, client: client):
        super().__init__(webserver, client)

    def setup_menu(self, detailed: bool = True):
        """
        Configure the navigation menu
        """
        # Call safe setup from parent
        super().setup_menu(detailed=detailed)

        # Add custom links
        with self.header:
            self.link_button("Dashboard", "/", "dashboard")
            # Example of external link
            self.link_button(
                "GitHub",
                "https://github.com/WolfgangFahl/nicescholia",
                "code",
                new_tab=True,
            )

    async def home(self):
        """
        The main page content
        """

        def show():
            # Instantiate the View Component
            self.dashboard = Dashboard(self)
            self.dashboard.setup_ui()

        await self.setup_content_div(show)

home() async

The main page content

Source code in nscholia/webserver.py
60
61
62
63
64
65
66
67
68
69
70
async def home(self):
    """
    The main page content
    """

    def show():
        # Instantiate the View Component
        self.dashboard = Dashboard(self)
        self.dashboard.setup_ui()

    await self.setup_content_div(show)

setup_menu(detailed=True)

Configure the navigation menu

Source code in nscholia/webserver.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def setup_menu(self, detailed: bool = True):
    """
    Configure the navigation menu
    """
    # Call safe setup from parent
    super().setup_menu(detailed=detailed)

    # Add custom links
    with self.header:
        self.link_button("Dashboard", "/", "dashboard")
        # Example of external link
        self.link_button(
            "GitHub",
            "https://github.com/WolfgangFahl/nicescholia",
            "code",
            new_tab=True,
        )

ScholiaWebserver

Bases: InputWebserver

The main webserver class

Source code in nscholia/webserver.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ScholiaWebserver(InputWebserver):
    """
    The main webserver class
    """

    @classmethod
    def get_config(cls) -> WebserverConfig:
        config = WebserverConfig(
            short_name="nicescholia",
            timeout=6.0,
            copy_right="(c) 2025 Wolfgang Fahl",
            version=Version(),
            default_port=9000,
        )
        server_config = WebserverConfig.get(config)
        server_config.solution_class = ScholiaSolution
        return server_config

    def __init__(self):
        super().__init__(config=ScholiaWebserver.get_config())