Skip to content

pyOpenSourceProjects API Documentation

check_project

Created on 2024-08-28.

@author: wf

CheckProject

Checker for an individual open source project.

Source code in osprojects/check_project.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
class CheckProject:
    """Checker for an individual open source project."""

    def __init__(self, parent, project, args):
        self.parent = parent
        self.project = project
        self.args = args
        self.checks: List[Check] = []
        self.project_path = project.folder
        self.project_name = None
        self.requires_python = None
        self.min_python_version_minor = None
        self.max_python_version_minor = 13  # python 3.13 is max version

    @property
    def total(self) -> int:
        return len(self.checks)

    @property
    def ok_checks(self) -> List[Check]:
        ok_checks = [check for check in self.checks if check.ok]
        return ok_checks

    @property
    def failed_checks(self) -> List[Check]:
        failed_checks = [check for check in self.checks if not check.ok]
        return failed_checks

    def add_error(self, ex, path: str):
        self.parent.handle_exception(ex)
        self.add_check(False, msg=f"{str(ex)}", path=path)

    def add_check(
        self, ok, msg: str = "", path: str = None, negative: bool = False
    ) -> Check:
        if not path:
            raise ValueError("path parameter missing")
        marker = ""
        if negative:
            ok = not ok
            marker = "⚠ ️"
        check = Check(ok=ok, path=path, msg=f"{marker}{msg}{path}")
        self.checks.append(check)
        return check

    def add_content_check(
        self, content: str, needle: str, path: str, negative: bool = False
    ) -> Check:
        ok = needle in content
        check = self.add_check(ok, msg=f"{needle} in ", path=path, negative=negative)
        return check

    def add_path_check(self, path) -> Check:
        # Check if path exists
        path_exists = Check.file_exists(path)
        self.checks.append(path_exists)
        return path_exists

    def check_local(self) -> Check:
        local = Check.file_exists(self.project_path)
        return local

    def check_github_workflows(self):
        """Check the github workflow files."""
        workflows_path = os.path.join(self.project_path, ".github", "workflows")
        workflows_exist = self.add_path_check(workflows_path)

        if workflows_exist.ok:
            required_files = ["build.yml", "upload-to-pypi.yml"]
            for file in required_files:
                file_path = os.path.join(workflows_path, file)
                file_exists = self.add_path_check(file_path)

                if file_exists.ok:
                    content = file_exists.content

                    if file == "build.yml":
                        min_python_version_minor = int(
                            self.requires_python.split(".")[-1]
                        )
                        self.add_check(
                            min_python_version_minor == self.min_python_version_minor,
                            msg=f"{min_python_version_minor} (build.yml)!={self.min_python_version_minor} (pyprojec.toml)",
                            path=file_path,
                        )
                        python_versions = f"""python-version: [ {', '.join([f"'3.{i}'" for i in range(self.min_python_version_minor, self.max_python_version_minor+1)])} ]"""
                        self.add_content_check(
                            content,
                            python_versions,
                            file_path,
                        )
                        self.add_content_check(
                            content,
                            "os: [ubuntu-latest, macos-latest, windows-latest]",
                            file_path,
                        )
                        self.add_content_check(
                            content, "uses: actions/checkout@v4", file_path
                        )
                        self.add_content_check(
                            content,
                            "uses: actions/setup-python@v5",
                            file_path,
                        )

                        self.add_content_check(
                            content, "sphinx", file_path, negative=True
                        )
                        scripts_ok = (
                            "scripts/install" in content
                            and "scripts/test" in content
                            or "scripts/installAndTest" in content
                        )
                        self.add_check(scripts_ok, "install and test", file_path)

                    elif file == "upload-to-pypi.yml":
                        self.add_content_check(content, "id-token: write", file_path)
                        self.add_content_check(
                            content, "uses: actions/checkout@v4", file_path
                        )
                        self.add_content_check(
                            content,
                            "uses: actions/setup-python@v5",
                            file_path,
                        )
                        self.add_content_check(
                            content,
                            "uses: pypa/gh-action-pypi-publish@release/v1",
                            file_path,
                        )

    def check_scripts(self):
        scripts_path = os.path.join(self.project_path, "scripts")
        scripts_exist = self.add_path_check(scripts_path)
        if scripts_exist.ok:
            required_files = ["blackisort", "test", "install", "doc", "release"]
            for file in required_files:
                file_path = os.path.join(scripts_path, file)
                file_exists = self.add_path_check(file_path)
                if file_exists.ok:
                    content = file_exists.content
                    if file == "doc":
                        self.add_content_check(
                            content, "sphinx", file_path, negative=True
                        )
                        self.add_content_check(
                            content, "WF 2024-07-30 - updated", file_path
                        )
                    if file == "test":
                        self.add_content_check(content, "WF 2024-08-03", file_path)
                    if file == "release":
                        self.add_content_check(content, "scripts/doc -d", file_path)

    def check_readme(self):
        readme_path = os.path.join(self.project_path, "README.md")
        readme_exists = self.add_path_check(readme_path)
        if not hasattr(self, "project_name"):
            self.add_check(
                False,
                "project_name from pyproject.toml needed for README.md check",
                self.project_path,
            )
            return
        if readme_exists.ok:
            readme_content = readme_exists.content
            badge_lines = [
                "[![pypi](https://img.shields.io/pypi/pyversions/{self.project_name})](https://pypi.org/project/{self.project_name}/)",
                "[![Github Actions Build](https://github.com/{self.project.fqid}/actions/workflows/build.yml/badge.svg)](https://github.com/{self.project.fqid}/actions/workflows/build.yml)",
                "[![PyPI Status](https://img.shields.io/pypi/v/{self.project_name}.svg)](https://pypi.python.org/pypi/{self.project_name}/)",
                "[![GitHub issues](https://img.shields.io/github/issues/{self.project.fqid}.svg)](https://github.com/{self.project.fqid}/issues)",
                "[![GitHub closed issues](https://img.shields.io/github/issues-closed/{self.project.fqid}.svg)](https://github.com/{self.project.fqid}/issues/?q=is%3Aissue+is%3Aclosed)",
                "[![API Docs](https://img.shields.io/badge/API-Documentation-blue)](https://{self.project.owner}.github.io/{self.project.project_id}/)",
                "[![License](https://img.shields.io/github/license/{self.project.fqid}.svg)](https://www.apache.org/licenses/LICENSE-2.0)",
            ]
            for line in badge_lines:
                formatted_line = line.format(self=self)
                self.add_content_check(
                    content=readme_content,
                    needle=formatted_line,
                    path=readme_path,
                )
            self.add_content_check(
                readme_content, "readthedocs", readme_path, negative=True
            )

    def check_pyproject_toml(self) -> bool:
        """pyproject.toml."""
        toml_path = os.path.join(self.project_path, "pyproject.toml")
        toml_exists = self.add_path_check(toml_path)
        if toml_exists.ok:
            content = toml_exists.content
            toml_dict = tomllib.loads(content)
            project_check = self.add_check(
                "project" in toml_dict, "[project]", toml_path
            )
            if project_check.ok:
                self.project_name = toml_dict["project"]["name"]
                requires_python_check = self.add_check(
                    "requires-python" in toml_dict["project"],
                    "requires-python",
                    toml_path,
                )
                if requires_python_check.ok:
                    self.requires_python = toml_dict["project"]["requires-python"]
                    min_python_version = version.parse(
                        self.requires_python.replace(">=", "")
                    )
                    min_version_needed = "3.9"
                    version_ok = min_python_version >= version.parse(min_version_needed)
                    self.add_check(
                        version_ok, f"requires-python>={min_version_needed}", toml_path
                    )
                    self.min_python_version_minor = int(
                        str(min_python_version).split(".")[-1]
                    )
                    for minor_version in range(
                        self.min_python_version_minor, self.max_python_version_minor + 1
                    ):
                        needle = f"Programming Language :: Python :: 3.{minor_version}"
                        self.add_content_check(content, needle, toml_path)
            self.add_content_check(content, "hatchling", toml_path)
            self.add_content_check(
                content, "[tool.hatch.build.targets.wheel.sources]", toml_path
            )
        return toml_exists.ok

    def check_git(self) -> bool:
        """Check git repository information using GitHub class.

        Returns:
            bool: True if git owner matches project owner and the repo is not a fork
        """
        owner_match = False
        is_fork = False
        try:
            local_owner = self.project.owner
            remote_owner = self.project.repo_info["owner"]["login"]
            is_fork = self.project.repo_info["fork"]
            owner_match = local_owner.lower() == remote_owner.lower() and not is_fork
            self.add_check(
                owner_match,
                f"Git owner ({remote_owner}) matches project owner ({local_owner}) and is not a fork",
                self.project_path,
            )

            local_project_id = self.project.project_id
            remote_repo_name = self.project.repo_info["name"]
            repo_match = local_project_id.lower() == remote_repo_name.lower()
            self.add_check(
                repo_match,
                f"Git repo name ({remote_repo_name}) matches project id ({local_project_id})",
                self.project_path,
            )

            # Check if there are uncommitted changes (this still requires local git access)
            local_repo = Repo(self.project_path)
            self.add_check(
                not local_repo.is_dirty(), "uncomitted changes for", self.project_path
            )

            # Check latest GitHub Actions workflow run
            latest_run = GitHubAction.get_latest_workflow_run(self.project)
            if latest_run:
                self.add_check(
                    latest_run["conclusion"] == "success",
                    f"Latest GitHub Actions run: {latest_run['conclusion']}",
                    latest_run["html_url"],
                )
            else:
                self.add_check(
                    False,
                    "No GitHub Actions runs found",
                    self.project.repo.ticketUrl(),
                )

        except InvalidGitRepositoryError:
            self.add_check(False, "Not a valid git repository", self.project_path)
        except NoSuchPathError:
            self.add_check(
                False, "Git repository path does not exist", self.project_path
            )
        except Exception as ex:
            self.add_error(ex, self.project_path)

        return owner_match and not is_fork

    def check(self, title: str):
        """Check the given project and print results."""
        self.check_local()
        self.check_git()
        if self.check_pyproject_toml():
            self.check_github_workflows()
            self.check_readme()
            self.check_scripts()

        # ok_count=len(ok_checks)
        failed_count = len(self.failed_checks)
        summary = (
            f"❌ {failed_count:2}/{self.total:2}"
            if failed_count > 0
            else f"✅ {self.total:2}/{self.total:2}"
        )
        print(f"{title}{summary}:{self.project}{self.project.url}")
        if failed_count > 0:
            # Sort checks by path
            sorted_checks = sorted(self.checks, key=lambda c: c.path or "")

            # Group checks by path
            checks_by_path = {}
            for check in sorted_checks:
                if check.path not in checks_by_path:
                    checks_by_path[check.path] = []
                checks_by_path[check.path].append(check)

            # Display results
            for path, path_checks in checks_by_path.items():
                path_failed = sum(1 for c in path_checks if not c.ok)
                if path_failed > 0 or self.args.debug:
                    print(f"❌ {path}: {path_failed}")
                    i = 0
                    for check in path_checks:
                        show = not check.ok or self.args.debug
                        if show:
                            i += 1
                            print(f"    {i:3}{check.marker}:{check.msg}")

                    if self.args.editor and path_failed > 0:
                        if os.path.isfile(path):
                            # @TODO Make editor configurable
                            Editor.open(path, default_editor_cmd="/usr/local/bin/atom")
                        else:
                            Editor.open_filepath(path)

check(title)

Check the given project and print results.

Source code in osprojects/check_project.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def check(self, title: str):
    """Check the given project and print results."""
    self.check_local()
    self.check_git()
    if self.check_pyproject_toml():
        self.check_github_workflows()
        self.check_readme()
        self.check_scripts()

    # ok_count=len(ok_checks)
    failed_count = len(self.failed_checks)
    summary = (
        f"❌ {failed_count:2}/{self.total:2}"
        if failed_count > 0
        else f"✅ {self.total:2}/{self.total:2}"
    )
    print(f"{title}{summary}:{self.project}{self.project.url}")
    if failed_count > 0:
        # Sort checks by path
        sorted_checks = sorted(self.checks, key=lambda c: c.path or "")

        # Group checks by path
        checks_by_path = {}
        for check in sorted_checks:
            if check.path not in checks_by_path:
                checks_by_path[check.path] = []
            checks_by_path[check.path].append(check)

        # Display results
        for path, path_checks in checks_by_path.items():
            path_failed = sum(1 for c in path_checks if not c.ok)
            if path_failed > 0 or self.args.debug:
                print(f"❌ {path}: {path_failed}")
                i = 0
                for check in path_checks:
                    show = not check.ok or self.args.debug
                    if show:
                        i += 1
                        print(f"    {i:3}{check.marker}:{check.msg}")

                if self.args.editor and path_failed > 0:
                    if os.path.isfile(path):
                        # @TODO Make editor configurable
                        Editor.open(path, default_editor_cmd="/usr/local/bin/atom")
                    else:
                        Editor.open_filepath(path)

check_git()

Check git repository information using GitHub class.

Returns:

Name Type Description
bool bool

True if git owner matches project owner and the repo is not a fork

Source code in osprojects/check_project.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def check_git(self) -> bool:
    """Check git repository information using GitHub class.

    Returns:
        bool: True if git owner matches project owner and the repo is not a fork
    """
    owner_match = False
    is_fork = False
    try:
        local_owner = self.project.owner
        remote_owner = self.project.repo_info["owner"]["login"]
        is_fork = self.project.repo_info["fork"]
        owner_match = local_owner.lower() == remote_owner.lower() and not is_fork
        self.add_check(
            owner_match,
            f"Git owner ({remote_owner}) matches project owner ({local_owner}) and is not a fork",
            self.project_path,
        )

        local_project_id = self.project.project_id
        remote_repo_name = self.project.repo_info["name"]
        repo_match = local_project_id.lower() == remote_repo_name.lower()
        self.add_check(
            repo_match,
            f"Git repo name ({remote_repo_name}) matches project id ({local_project_id})",
            self.project_path,
        )

        # Check if there are uncommitted changes (this still requires local git access)
        local_repo = Repo(self.project_path)
        self.add_check(
            not local_repo.is_dirty(), "uncomitted changes for", self.project_path
        )

        # Check latest GitHub Actions workflow run
        latest_run = GitHubAction.get_latest_workflow_run(self.project)
        if latest_run:
            self.add_check(
                latest_run["conclusion"] == "success",
                f"Latest GitHub Actions run: {latest_run['conclusion']}",
                latest_run["html_url"],
            )
        else:
            self.add_check(
                False,
                "No GitHub Actions runs found",
                self.project.repo.ticketUrl(),
            )

    except InvalidGitRepositoryError:
        self.add_check(False, "Not a valid git repository", self.project_path)
    except NoSuchPathError:
        self.add_check(
            False, "Git repository path does not exist", self.project_path
        )
    except Exception as ex:
        self.add_error(ex, self.project_path)

    return owner_match and not is_fork

check_github_workflows()

Check the github workflow files.

Source code in osprojects/check_project.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def check_github_workflows(self):
    """Check the github workflow files."""
    workflows_path = os.path.join(self.project_path, ".github", "workflows")
    workflows_exist = self.add_path_check(workflows_path)

    if workflows_exist.ok:
        required_files = ["build.yml", "upload-to-pypi.yml"]
        for file in required_files:
            file_path = os.path.join(workflows_path, file)
            file_exists = self.add_path_check(file_path)

            if file_exists.ok:
                content = file_exists.content

                if file == "build.yml":
                    min_python_version_minor = int(
                        self.requires_python.split(".")[-1]
                    )
                    self.add_check(
                        min_python_version_minor == self.min_python_version_minor,
                        msg=f"{min_python_version_minor} (build.yml)!={self.min_python_version_minor} (pyprojec.toml)",
                        path=file_path,
                    )
                    python_versions = f"""python-version: [ {', '.join([f"'3.{i}'" for i in range(self.min_python_version_minor, self.max_python_version_minor+1)])} ]"""
                    self.add_content_check(
                        content,
                        python_versions,
                        file_path,
                    )
                    self.add_content_check(
                        content,
                        "os: [ubuntu-latest, macos-latest, windows-latest]",
                        file_path,
                    )
                    self.add_content_check(
                        content, "uses: actions/checkout@v4", file_path
                    )
                    self.add_content_check(
                        content,
                        "uses: actions/setup-python@v5",
                        file_path,
                    )

                    self.add_content_check(
                        content, "sphinx", file_path, negative=True
                    )
                    scripts_ok = (
                        "scripts/install" in content
                        and "scripts/test" in content
                        or "scripts/installAndTest" in content
                    )
                    self.add_check(scripts_ok, "install and test", file_path)

                elif file == "upload-to-pypi.yml":
                    self.add_content_check(content, "id-token: write", file_path)
                    self.add_content_check(
                        content, "uses: actions/checkout@v4", file_path
                    )
                    self.add_content_check(
                        content,
                        "uses: actions/setup-python@v5",
                        file_path,
                    )
                    self.add_content_check(
                        content,
                        "uses: pypa/gh-action-pypi-publish@release/v1",
                        file_path,
                    )

check_pyproject_toml()

pyproject.toml.

Source code in osprojects/check_project.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def check_pyproject_toml(self) -> bool:
    """pyproject.toml."""
    toml_path = os.path.join(self.project_path, "pyproject.toml")
    toml_exists = self.add_path_check(toml_path)
    if toml_exists.ok:
        content = toml_exists.content
        toml_dict = tomllib.loads(content)
        project_check = self.add_check(
            "project" in toml_dict, "[project]", toml_path
        )
        if project_check.ok:
            self.project_name = toml_dict["project"]["name"]
            requires_python_check = self.add_check(
                "requires-python" in toml_dict["project"],
                "requires-python",
                toml_path,
            )
            if requires_python_check.ok:
                self.requires_python = toml_dict["project"]["requires-python"]
                min_python_version = version.parse(
                    self.requires_python.replace(">=", "")
                )
                min_version_needed = "3.9"
                version_ok = min_python_version >= version.parse(min_version_needed)
                self.add_check(
                    version_ok, f"requires-python>={min_version_needed}", toml_path
                )
                self.min_python_version_minor = int(
                    str(min_python_version).split(".")[-1]
                )
                for minor_version in range(
                    self.min_python_version_minor, self.max_python_version_minor + 1
                ):
                    needle = f"Programming Language :: Python :: 3.{minor_version}"
                    self.add_content_check(content, needle, toml_path)
        self.add_content_check(content, "hatchling", toml_path)
        self.add_content_check(
            content, "[tool.hatch.build.targets.wheel.sources]", toml_path
        )
    return toml_exists.ok

checkos

Created on 2024-07-30.

@author: wf

CheckOS

Checker for a set of open source projects.

Source code in osprojects/checkos.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class CheckOS:
    """Checker for a set of open source projects."""

    def __init__(
        self, args: Namespace, osprojects: OsProjects, max_python_version_minor=12
    ):
        self.args = args
        self.verbose = args.verbose
        self.workspace = args.workspace
        self.osprojects = osprojects
        self.checks = []
        # python 3.12 is max version
        self.max_python_version_minor = max_python_version_minor

    @classmethod
    def from_args(cls, args: Namespace):
        osprojects = OsProjects.from_folder(args.workspace, with_progress=True)
        return cls(args, osprojects)

    def select_projects(self):
        try:
            if self.args.project:
                if self.args.owners:
                    return self.osprojects.select_projects(
                        owners=self.args.owners, project_id=self.args.project
                    )
                elif self.args.local:
                    return self.osprojects.select_projects(
                        project_id=self.args.project, local_only=True
                    )
                else:
                    raise ValueError("--local or --owner needed with --project")
            elif self.args.owners:
                return self.osprojects.select_projects(owners=self.args.owners)
            elif self.args.local:
                return self.osprojects.select_projects(local_only=True)
            else:
                raise ValueError(
                    "Please provide --owner and --project, or use --local option."
                )
        except ValueError as e:
            print(f"Error: {str(e)}")
            return []

    def filter_projects(self):
        if self.args.language:
            self.osprojects.filter_projects(language=self.args.language)
        if self.args.local:
            self.osprojects.filter_projects(local_only=True)

    def check_projects(self):
        """Select, filter, and check all projects based on the provided
        arguments."""
        self.select_projects()
        self.filter_projects()

        for i, (_url, project) in enumerate(
            self.osprojects.selected_projects.items(), 1
        ):
            checker = CheckProject(self, project, self.args)
            checker.check(f"{i:3}:")

    def handle_exception(self, ex: Exception):
        CheckOS.show_exception(ex, self.args.debug)

    @staticmethod
    def show_exception(ex: Exception, debug: bool = False):
        err_msg = f"Error: {str(ex)}"
        logging.error(err_msg)
        if debug:
            print(traceback.format_exc())

check_projects()

Select, filter, and check all projects based on the provided arguments.

Source code in osprojects/checkos.py
66
67
68
69
70
71
72
73
74
75
76
def check_projects(self):
    """Select, filter, and check all projects based on the provided
    arguments."""
    self.select_projects()
    self.filter_projects()

    for i, (_url, project) in enumerate(
        self.osprojects.selected_projects.items(), 1
    ):
        checker = CheckProject(self, project, self.args)
        checker.check(f"{i:3}:")

main(_argv=None)

Main command line entry point.

Source code in osprojects/checkos.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def main(_argv=None):
    """Main command line entry point."""
    parser = argparse.ArgumentParser(description="Check open source projects")
    parser.add_argument(
        "-d",
        "--debug",
        action="store_true",
        help="add debug output",
    )
    parser.add_argument(
        "-e",
        "--editor",
        action="store_true",
        help="open default editor on failed files",
    )
    parser.add_argument("-o", "--owners", nargs="+", help="project owners")
    parser.add_argument("-p", "--project", help="name of the project")
    parser.add_argument("-l", "--language", help="filter projects by language")
    parser.add_argument(
        "--local", action="store_true", help="check only locally available projects"
    )
    parser.add_argument(
        "-v", "--verbose", action="store_true", help="show verbose output"
    )
    parser.add_argument(
        "-ws",
        "--workspace",
        help="(Eclipse) workspace directory",
        default=os.path.expanduser("~/py-workspace"),
    )

    args = parser.parse_args(args=_argv)

    try:
        checker = CheckOS.from_args(args)
        checker.check_projects()
    except Exception as ex:
        CheckOS.show_exception(ex, debug=args.debug)
        raise ex

editor

Created on 2022-11-27.

@author: wf

Editor

Helper class to open the system defined editor.

see https://stackoverflow.com/questions/1442841/lauch-default-editor-like-webbrowser-module

Source code in osprojects/editor.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class Editor:
    """Helper class to open the system defined editor.

    see
    https://stackoverflow.com/questions/1442841/lauch-default-editor-like-webbrowser-module
    """

    @classmethod
    def open_filepath(cls, filepath: str):
        if platform.system() == "Darwin":  # macOS
            subprocess.call(("open", filepath))
        elif platform.system() == "Windows":  # Windows
            os.startfile(filepath, "open")
        else:  # linux variants
            subprocess.call(("xdg-open", filepath))

    @classmethod
    def extract_text(cls, html_text: str) -> str:
        """Extract the text from the given html_text.

        Args:
            html_text(str): the input for the html text

        Returns:
            str: the plain text
        """
        soup = BeautifulSoup(html_text, features="html.parser")

        # kill all script and style elements
        for script in soup(["script", "style"]):
            script.extract()  # rip it out

        # get text
        text = soup.get_text()

        # break into lines and remove leading and trailing space on each
        lines = (line.strip() for line in text.splitlines())
        # break multi-headlines into a line each
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        # drop blank lines
        text = "\n".join(chunk for chunk in chunks if chunk)
        return text

    @classmethod
    def open(
        cls,
        file_source: str,
        extract_text: bool = True,
        default_editor_cmd: str = "/usr/local/bin/atom",
    ) -> str:
        """Open an editor for the given file_source.

        Args:
            file_source(str): the path to the file
            extract_text(bool): if True extract the text from html sources

        Returns:
            str: the path to the file e.g. a temporary file if the file_source points to an url
        """
        # handle urls
        # https://stackoverflow.com/a/45886824/1497139
        if file_source.startswith("http"):
            url_source = urlopen(file_source)
            # https://stackoverflow.com/a/19156107/1497139
            charset = url_source.headers.get_content_charset()
            # if charset fails here you might want to set it to utf-8 as a default!
            text = url_source.read().decode(charset)
            if extract_text:
                # https://stackoverflow.com/a/24618186/1497139
                text = cls.extract_text(text)

            return cls.open_tmp_text(text)

        editor_cmd = None
        editor_env = os.getenv("EDITOR")
        if editor_env:
            editor_cmd = editor_env
        if platform.system() == "Darwin":
            if not editor_env:
                # https://stackoverflow.com/questions/22390709/how-can-i-open-the-atom-editor-from-the-command-line-in-os-x
                editor_cmd = default_editor_cmd
        if editor_cmd:
            os_cmd = f"{editor_cmd} {file_source}"
            os.system(os_cmd)
        return file_source

    @classmethod
    def open_tmp_text(cls, text: str, file_name: str = None) -> str:
        """Open an editor for the given text in a newly created temporary file.

        Args:
            text(str): the text to write to a temporary file and then open
            file_name(str): the name to use for the file

        Returns:
            str: the path to the temp file
        """
        # see https://stackoverflow.com/a/8577226/1497139
        # https://stackoverflow.com/a/3924253/1497139
        with tempfile.NamedTemporaryFile(delete=False) as tmp:
            with open(tmp.name, "w") as tmp_file:
                tmp_file.write(text)
                tmp_file.close()
            if file_name is None:
                file_path = tmp.name
            else:
                # https://stackoverflow.com/questions/3167154/how-to-split-a-dos-path-into-its-components-in-python
                path = Path(tmp.name)
                # https://stackoverflow.com/a/49798311/1497139
                file_path = path.parent / file_name
                os.rename(tmp.name, file_path)

            return cls.open(str(file_path))

extract_text(html_text) classmethod

Extract the text from the given html_text.

Parameters:

Name Type Description Default
html_text(str)

the input for the html text

required

Returns:

Name Type Description
str str

the plain text

Source code in osprojects/editor.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@classmethod
def extract_text(cls, html_text: str) -> str:
    """Extract the text from the given html_text.

    Args:
        html_text(str): the input for the html text

    Returns:
        str: the plain text
    """
    soup = BeautifulSoup(html_text, features="html.parser")

    # kill all script and style elements
    for script in soup(["script", "style"]):
        script.extract()  # rip it out

    # get text
    text = soup.get_text()

    # break into lines and remove leading and trailing space on each
    lines = (line.strip() for line in text.splitlines())
    # break multi-headlines into a line each
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    # drop blank lines
    text = "\n".join(chunk for chunk in chunks if chunk)
    return text

open(file_source, extract_text=True, default_editor_cmd='/usr/local/bin/atom') classmethod

Open an editor for the given file_source.

Parameters:

Name Type Description Default
file_source(str)

the path to the file

required
extract_text(bool)

if True extract the text from html sources

required

Returns:

Name Type Description
str str

the path to the file e.g. a temporary file if the file_source points to an url

Source code in osprojects/editor.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@classmethod
def open(
    cls,
    file_source: str,
    extract_text: bool = True,
    default_editor_cmd: str = "/usr/local/bin/atom",
) -> str:
    """Open an editor for the given file_source.

    Args:
        file_source(str): the path to the file
        extract_text(bool): if True extract the text from html sources

    Returns:
        str: the path to the file e.g. a temporary file if the file_source points to an url
    """
    # handle urls
    # https://stackoverflow.com/a/45886824/1497139
    if file_source.startswith("http"):
        url_source = urlopen(file_source)
        # https://stackoverflow.com/a/19156107/1497139
        charset = url_source.headers.get_content_charset()
        # if charset fails here you might want to set it to utf-8 as a default!
        text = url_source.read().decode(charset)
        if extract_text:
            # https://stackoverflow.com/a/24618186/1497139
            text = cls.extract_text(text)

        return cls.open_tmp_text(text)

    editor_cmd = None
    editor_env = os.getenv("EDITOR")
    if editor_env:
        editor_cmd = editor_env
    if platform.system() == "Darwin":
        if not editor_env:
            # https://stackoverflow.com/questions/22390709/how-can-i-open-the-atom-editor-from-the-command-line-in-os-x
            editor_cmd = default_editor_cmd
    if editor_cmd:
        os_cmd = f"{editor_cmd} {file_source}"
        os.system(os_cmd)
    return file_source

open_tmp_text(text, file_name=None) classmethod

Open an editor for the given text in a newly created temporary file.

Parameters:

Name Type Description Default
text(str)

the text to write to a temporary file and then open

required
file_name(str)

the name to use for the file

required

Returns:

Name Type Description
str str

the path to the temp file

Source code in osprojects/editor.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@classmethod
def open_tmp_text(cls, text: str, file_name: str = None) -> str:
    """Open an editor for the given text in a newly created temporary file.

    Args:
        text(str): the text to write to a temporary file and then open
        file_name(str): the name to use for the file

    Returns:
        str: the path to the temp file
    """
    # see https://stackoverflow.com/a/8577226/1497139
    # https://stackoverflow.com/a/3924253/1497139
    with tempfile.NamedTemporaryFile(delete=False) as tmp:
        with open(tmp.name, "w") as tmp_file:
            tmp_file.write(text)
            tmp_file.close()
        if file_name is None:
            file_path = tmp.name
        else:
            # https://stackoverflow.com/questions/3167154/how-to-split-a-dos-path-into-its-components-in-python
            path = Path(tmp.name)
            # https://stackoverflow.com/a/49798311/1497139
            file_path = path.parent / file_name
            os.rename(tmp.name, file_path)

        return cls.open(str(file_path))

github_api

Created on 2024-08-27.

@author: wf

GitHubAction dataclass

Represents a GitHub Action with its identifying information and log content.

Attributes:

Name Type Description
repo GitHubRepo

The repository associated with this action.

run_id int

The ID of the workflow run.

job_id int

The ID of the job within the run.

log_content str

The log content of the action.

Source code in osprojects/github_api.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
@dataclass
class GitHubAction:
    """Represents a GitHub Action with its identifying information and log
    content.

    Attributes:
        repo (GitHubRepo): The repository associated with this action.
        run_id (int): The ID of the workflow run.
        job_id (int): The ID of the job within the run.
        log_content (str): The log content of the action.
    """

    repo: GitHubRepo
    run_id: int
    job_id: int
    log_content: str = field(default=None, repr=False)
    do_cache: bool = True

    def __post_init__(self):
        self.log_id = (
            f"{self.repo.owner}_{self.repo.project_id}_{self.run_id}_{self.job_id}"
        )
        self.log_file = os.path.join(
            self.repo.github.log_dir, f"action_log_{self.log_id}.txt"
        )
        # If log file exists, read the content
        if os.path.exists(self.log_file):
            with open(self.log_file, "r", encoding="utf-8") as f:
                self.log_content = f.read()

    @classmethod
    def from_url(cls, url: str) -> "GitHubAction":
        """Create a GitHubAction instance from a GitHub Actions URL and fetch
        its logs.

        Args:
            url (str): The GitHub Actions URL.

        Returns:
            GitHubAction: An instance of GitHubAction containing parsed information.

        Raises:
            ValueError: If the URL format is invalid or missing required components.
        """
        parsed_url = urlparse(url)
        path_parts = parsed_url.path.split("/")

        if len(path_parts) < 8 or path_parts[3] != "actions" or path_parts[4] != "runs":
            raise ValueError("Invalid GitHub Actions URL format")

        try:
            repo = GitHubRepo(owner=path_parts[1], project_id=path_parts[2])
            return cls(repo=repo, run_id=int(path_parts[5]), job_id=int(path_parts[7]))
        except (IndexError, ValueError) as e:
            raise ValueError(f"Failed to parse GitHub Actions URL: {e}")

    @classmethod
    def get_latest_workflow_run(cls, project):
        """Get the latest GitHub Actions workflow run for a given project.

        Args:
            project (OsProject): The project to check for the latest workflow run.

        Returns:
            dict: Information about the latest workflow run, or None if not found.
        """
        url = f"https://api.github.com/repos/{project.owner}/{project.project_id}/actions/runs"
        response = project.repo.github.get_response("fetch latest workflow run", url)
        runs = response.json().get("workflow_runs", [])
        run = None
        if runs:
            run = runs[0]  # Return the latest run
        return run

    def fetch_logs(self):
        """Fetch the logs for this GitHub Action."""
        if self.log_content is None:
            api_url = f"https://api.github.com/repos/{self.repo.owner}/{self.repo.project_id}/actions/jobs/{self.job_id}/logs"
            log_response = self.repo.github.get_response(
                "fetch job logs", api_url, allow_redirects=True
            )
            self.log_content = log_response.content.decode("utf-8-sig")
            if self.do_cache:
                self.save_logs()

    def save_logs(self):
        """Save the log content to a local file."""
        if self.log_content is None:
            raise ValueError("No log content to save. Make sure to fetch logs first.")
        with open(self.log_file, "w", encoding="utf-8") as f:
            f.write(self.log_content)

fetch_logs()

Fetch the logs for this GitHub Action.

Source code in osprojects/github_api.py
322
323
324
325
326
327
328
329
330
331
def fetch_logs(self):
    """Fetch the logs for this GitHub Action."""
    if self.log_content is None:
        api_url = f"https://api.github.com/repos/{self.repo.owner}/{self.repo.project_id}/actions/jobs/{self.job_id}/logs"
        log_response = self.repo.github.get_response(
            "fetch job logs", api_url, allow_redirects=True
        )
        self.log_content = log_response.content.decode("utf-8-sig")
        if self.do_cache:
            self.save_logs()

from_url(url) classmethod

Create a GitHubAction instance from a GitHub Actions URL and fetch its logs.

Parameters:

Name Type Description Default
url str

The GitHub Actions URL.

required

Returns:

Name Type Description
GitHubAction GitHubAction

An instance of GitHubAction containing parsed information.

Raises:

Type Description
ValueError

If the URL format is invalid or missing required components.

Source code in osprojects/github_api.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@classmethod
def from_url(cls, url: str) -> "GitHubAction":
    """Create a GitHubAction instance from a GitHub Actions URL and fetch
    its logs.

    Args:
        url (str): The GitHub Actions URL.

    Returns:
        GitHubAction: An instance of GitHubAction containing parsed information.

    Raises:
        ValueError: If the URL format is invalid or missing required components.
    """
    parsed_url = urlparse(url)
    path_parts = parsed_url.path.split("/")

    if len(path_parts) < 8 or path_parts[3] != "actions" or path_parts[4] != "runs":
        raise ValueError("Invalid GitHub Actions URL format")

    try:
        repo = GitHubRepo(owner=path_parts[1], project_id=path_parts[2])
        return cls(repo=repo, run_id=int(path_parts[5]), job_id=int(path_parts[7]))
    except (IndexError, ValueError) as e:
        raise ValueError(f"Failed to parse GitHub Actions URL: {e}")

get_latest_workflow_run(project) classmethod

Get the latest GitHub Actions workflow run for a given project.

Parameters:

Name Type Description Default
project OsProject

The project to check for the latest workflow run.

required

Returns:

Name Type Description
dict

Information about the latest workflow run, or None if not found.

Source code in osprojects/github_api.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
@classmethod
def get_latest_workflow_run(cls, project):
    """Get the latest GitHub Actions workflow run for a given project.

    Args:
        project (OsProject): The project to check for the latest workflow run.

    Returns:
        dict: Information about the latest workflow run, or None if not found.
    """
    url = f"https://api.github.com/repos/{project.owner}/{project.project_id}/actions/runs"
    response = project.repo.github.get_response("fetch latest workflow run", url)
    runs = response.json().get("workflow_runs", [])
    run = None
    if runs:
        run = runs[0]  # Return the latest run
    return run

save_logs()

Save the log content to a local file.

Source code in osprojects/github_api.py
333
334
335
336
337
338
def save_logs(self):
    """Save the log content to a local file."""
    if self.log_content is None:
        raise ValueError("No log content to save. Make sure to fetch logs first.")
    with open(self.log_file, "w", encoding="utf-8") as f:
        f.write(self.log_content)

GitHubApi

access to GitHubApi - needed for rate limit handling avoidance via access token

Source code in osprojects/github_api.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class GitHubApi:
    """
    access to GitHubApi - needed for rate limit handling avoidance
    via access token
    """

    githubapi_instance: "GitHubApi" = None

    @classmethod
    def get_instance(cls) -> "GitHubApi":
        """Singleton access."""
        if cls.githubapi_instance is None:
            cls.githubapi_instance = cls()
        return cls.githubapi_instance

    def __init__(self):
        """constructor."""
        home_dir = os.path.expanduser("~")
        self.base_dir = os.path.join(home_dir, ".github")
        os.makedirs(self.base_dir, exist_ok=True)
        self.cache_dir = os.path.join(self.base_dir, "cache")
        os.makedirs(self.cache_dir, exist_ok=True)
        self.log_dir = os.path.join(self.base_dir, "log")
        os.makedirs(self.log_dir, exist_ok=True)
        self.access_token = self.load_access_token()
        self.headers = (
            {"Authorization": f"token {self.access_token}"} if self.access_token else {}
        )
        self.api_url = "https://api.github.com"

    def load_access_token(self) -> str:
        """If $HOME/.github/access_token.json exists read the token from
        there."""
        # Specify the path to the access token file
        token_file_path = os.path.join(self.base_dir, "access_token.json")

        # Check if the file exists and read the token
        if os.path.exists(token_file_path):
            with open(token_file_path, "r") as token_file:
                token_data = json.load(token_file)
                return token_data.get("access_token")

        # Return None if no token file is found
        return None

    def get_response(self, title: str, url: str, params={}, allow_redirects=True):
        """Get response from GitHub API or Google Docs API.

        Args:
            title (str): Description of the request
            url (str): URL to send the request to
            params (dict): Query parameters for the request
            allow_redirects (bool): Whether to follow redirects

        Returns:
            requests.Response: The response object
        """
        response = requests.get(
            url, headers=self.headers, params=params, allow_redirects=allow_redirects
        )

        if response.status_code == 302 and not allow_redirects:
            # Return the redirect URL if we're not following redirects
            return response.headers["Location"]

        if response.status_code not in [200, 302]:
            err_msg = (
                f"Failed to {title} for {url}: {response.status_code} - {response.text}"
            )
            raise Exception(err_msg)

        return response

    def repos_for_owner(self, owner: str, cache_expiry: int = 300) -> list[dict]:
        """Retrieve all repositories for the given owner, using cache if
        available and valid, or via API otherwise.

        This method first checks if the repository data is available in the cache. If not, it fetches the
        data from the GitHub API and caches it for future use.

        Args:
            owner (str): The username of the owner whose repositories are being retrieved.
            cache_expiry (int, optional): The cache expiry time in seconds. Defaults to 60 seconds (1 minute).

        Returns:
            list[dict]: A list of dictionaries representing repositories.
        """
        # Attempt to retrieve from cache
        cache_file, cache_content, cache_age = self.repos_for_owner_from_cache(owner)

        # Use cache if it exists and is not expired
        if cache_content is not None and (
            cache_age is None or cache_age < cache_expiry
        ):
            return cache_content

        # If cache is not available or expired, retrieve from API
        repos = self.repos_for_owner_via_api(owner)

        # Cache the result
        with open(cache_file, "w") as f:
            json.dump(repos, f)

        return repos

    def repos_for_owner_from_cache(
        self, owner: str
    ) -> tuple[str, list[dict] | None, float | None]:
        """Retrieve repositories for the given owner from the cache.

        Args:
            owner (str): The username of the owner whose repositories are being retrieved.

        Returns:
            tuple[str, list[dict] | None, float | None]: A tuple containing:
                - cache_file (str): The path to the cache file.
                - cache_content (list[dict] | None): A list of dictionaries representing repositories if cached data exists, None otherwise.
                - cache_age (float | None): The age of the cache in seconds if cached data exists, None otherwise.
        """
        cache_file = os.path.join(self.cache_dir, f"{owner}_repos.json")
        cache_content = None
        cache_age = None

        # Check if cached data exists and is still valid
        if os.path.exists(cache_file):
            cache_age = time.time() - os.path.getmtime(cache_file)
            with open(cache_file, "r") as f:
                cache_content = json.load(f)

        return cache_file, cache_content, cache_age

    def repos_for_owner_via_api(self, owner: str) -> list[dict]:
        """Retrieve all repositories for the given owner directly from the
        GitHub API.

        Args:
            owner (str): The username of the owner whose repositories are being retrieved.

        Returns:
            list[dict]: A list of dictionaries representing repositories retrieved from the GitHub API.
        """
        url = f"{self.api_url}/users/{owner}/repos"
        params = {
            "type": "all",
            "per_page": 100,
        }  # Include all repo types, 100 per page
        all_repos = []
        page = 1

        while True:
            params["page"] = page
            response = self.get_response("fetch repositories", url, params)
            repos = response.json()
            if not repos:
                break  # No more repositories to fetch

            all_repos.extend(repos)
            page += 1

        repos = all_repos
        return repos

__init__()

constructor.

Source code in osprojects/github_api.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(self):
    """constructor."""
    home_dir = os.path.expanduser("~")
    self.base_dir = os.path.join(home_dir, ".github")
    os.makedirs(self.base_dir, exist_ok=True)
    self.cache_dir = os.path.join(self.base_dir, "cache")
    os.makedirs(self.cache_dir, exist_ok=True)
    self.log_dir = os.path.join(self.base_dir, "log")
    os.makedirs(self.log_dir, exist_ok=True)
    self.access_token = self.load_access_token()
    self.headers = (
        {"Authorization": f"token {self.access_token}"} if self.access_token else {}
    )
    self.api_url = "https://api.github.com"

get_instance() classmethod

Singleton access.

Source code in osprojects/github_api.py
25
26
27
28
29
30
@classmethod
def get_instance(cls) -> "GitHubApi":
    """Singleton access."""
    if cls.githubapi_instance is None:
        cls.githubapi_instance = cls()
    return cls.githubapi_instance

get_response(title, url, params={}, allow_redirects=True)

Get response from GitHub API or Google Docs API.

Parameters:

Name Type Description Default
title str

Description of the request

required
url str

URL to send the request to

required
params dict

Query parameters for the request

{}
allow_redirects bool

Whether to follow redirects

True

Returns:

Type Description

requests.Response: The response object

Source code in osprojects/github_api.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_response(self, title: str, url: str, params={}, allow_redirects=True):
    """Get response from GitHub API or Google Docs API.

    Args:
        title (str): Description of the request
        url (str): URL to send the request to
        params (dict): Query parameters for the request
        allow_redirects (bool): Whether to follow redirects

    Returns:
        requests.Response: The response object
    """
    response = requests.get(
        url, headers=self.headers, params=params, allow_redirects=allow_redirects
    )

    if response.status_code == 302 and not allow_redirects:
        # Return the redirect URL if we're not following redirects
        return response.headers["Location"]

    if response.status_code not in [200, 302]:
        err_msg = (
            f"Failed to {title} for {url}: {response.status_code} - {response.text}"
        )
        raise Exception(err_msg)

    return response

load_access_token()

If $HOME/.github/access_token.json exists read the token from there.

Source code in osprojects/github_api.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def load_access_token(self) -> str:
    """If $HOME/.github/access_token.json exists read the token from
    there."""
    # Specify the path to the access token file
    token_file_path = os.path.join(self.base_dir, "access_token.json")

    # Check if the file exists and read the token
    if os.path.exists(token_file_path):
        with open(token_file_path, "r") as token_file:
            token_data = json.load(token_file)
            return token_data.get("access_token")

    # Return None if no token file is found
    return None

repos_for_owner(owner, cache_expiry=300)

Retrieve all repositories for the given owner, using cache if available and valid, or via API otherwise.

This method first checks if the repository data is available in the cache. If not, it fetches the data from the GitHub API and caches it for future use.

Parameters:

Name Type Description Default
owner str

The username of the owner whose repositories are being retrieved.

required
cache_expiry int

The cache expiry time in seconds. Defaults to 60 seconds (1 minute).

300

Returns:

Type Description
list[dict]

list[dict]: A list of dictionaries representing repositories.

Source code in osprojects/github_api.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def repos_for_owner(self, owner: str, cache_expiry: int = 300) -> list[dict]:
    """Retrieve all repositories for the given owner, using cache if
    available and valid, or via API otherwise.

    This method first checks if the repository data is available in the cache. If not, it fetches the
    data from the GitHub API and caches it for future use.

    Args:
        owner (str): The username of the owner whose repositories are being retrieved.
        cache_expiry (int, optional): The cache expiry time in seconds. Defaults to 60 seconds (1 minute).

    Returns:
        list[dict]: A list of dictionaries representing repositories.
    """
    # Attempt to retrieve from cache
    cache_file, cache_content, cache_age = self.repos_for_owner_from_cache(owner)

    # Use cache if it exists and is not expired
    if cache_content is not None and (
        cache_age is None or cache_age < cache_expiry
    ):
        return cache_content

    # If cache is not available or expired, retrieve from API
    repos = self.repos_for_owner_via_api(owner)

    # Cache the result
    with open(cache_file, "w") as f:
        json.dump(repos, f)

    return repos

repos_for_owner_from_cache(owner)

Retrieve repositories for the given owner from the cache.

Parameters:

Name Type Description Default
owner str

The username of the owner whose repositories are being retrieved.

required

Returns:

Type Description
tuple[str, list[dict] | None, float | None]

tuple[str, list[dict] | None, float | None]: A tuple containing: - cache_file (str): The path to the cache file. - cache_content (list[dict] | None): A list of dictionaries representing repositories if cached data exists, None otherwise. - cache_age (float | None): The age of the cache in seconds if cached data exists, None otherwise.

Source code in osprojects/github_api.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def repos_for_owner_from_cache(
    self, owner: str
) -> tuple[str, list[dict] | None, float | None]:
    """Retrieve repositories for the given owner from the cache.

    Args:
        owner (str): The username of the owner whose repositories are being retrieved.

    Returns:
        tuple[str, list[dict] | None, float | None]: A tuple containing:
            - cache_file (str): The path to the cache file.
            - cache_content (list[dict] | None): A list of dictionaries representing repositories if cached data exists, None otherwise.
            - cache_age (float | None): The age of the cache in seconds if cached data exists, None otherwise.
    """
    cache_file = os.path.join(self.cache_dir, f"{owner}_repos.json")
    cache_content = None
    cache_age = None

    # Check if cached data exists and is still valid
    if os.path.exists(cache_file):
        cache_age = time.time() - os.path.getmtime(cache_file)
        with open(cache_file, "r") as f:
            cache_content = json.load(f)

    return cache_file, cache_content, cache_age

repos_for_owner_via_api(owner)

Retrieve all repositories for the given owner directly from the GitHub API.

Parameters:

Name Type Description Default
owner str

The username of the owner whose repositories are being retrieved.

required

Returns:

Type Description
list[dict]

list[dict]: A list of dictionaries representing repositories retrieved from the GitHub API.

Source code in osprojects/github_api.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def repos_for_owner_via_api(self, owner: str) -> list[dict]:
    """Retrieve all repositories for the given owner directly from the
    GitHub API.

    Args:
        owner (str): The username of the owner whose repositories are being retrieved.

    Returns:
        list[dict]: A list of dictionaries representing repositories retrieved from the GitHub API.
    """
    url = f"{self.api_url}/users/{owner}/repos"
    params = {
        "type": "all",
        "per_page": 100,
    }  # Include all repo types, 100 per page
    all_repos = []
    page = 1

    while True:
        params["page"] = page
        response = self.get_response("fetch repositories", url, params)
        repos = response.json()
        if not repos:
            break  # No more repositories to fetch

        all_repos.extend(repos)
        page += 1

    repos = all_repos
    return repos

GitHubRepo dataclass

Represents a GitHub Repository.

Attributes:

Name Type Description
owner str

The owner of the repository.

project_id str

The name/id of the repository.

Source code in osprojects/github_api.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@dataclass
class GitHubRepo:
    """Represents a GitHub Repository.

    Attributes:
        owner (str): The owner of the repository.
        project_id (str): The name/id of the repository.
    """

    owner: str
    project_id: str

    def __post_init__(self):
        self.github = GitHubApi.get_instance()

    @classmethod
    def from_url(cls, url: str) -> (str, str):
        """Resolve project url to owner and project name.

        Returns:
            (owner, project)
        """
        # https://www.rfc-editor.org/rfc/rfc3986#appendix-B
        pattern = r"((https?:\/\/github\.com\/)|(git@github\.com:))(?P<owner>[^/?#]+)\/(?P<project_id>[^\./?#]+)(\.git)?"
        match = re.match(pattern=pattern, string=url)
        repo = None
        if match:
            owner = match.group("owner")
            project_id = match.group("project_id")
            if owner and project_id:
                repo = cls(owner=owner, project_id=project_id)
            else:
                pass
        else:
            pass
        return repo

    def ticketUrl(self):
        return f"{self.github.api_url}/repos/{self.owner}/{self.project_id}/issues"

    def projectUrl(self):
        return f"https://github.com/{self.owner}/{self.project_id}"

    def getIssueRecords(self, limit: int = None, **params) -> List[Dict]:
        all_issues_records = []
        nextResults = True
        params["per_page"] = 100
        params["page"] = 1
        fetched_count = 0  # Counter to track the number of issues fetched
        while nextResults:
            response = self.github.get_response(
                "fetch tickets", self.ticketUrl(), params
            )
            issue_records = json.loads(response.text)
            all_issues_records.extend(issue_records)
            fetched_count += 1
            # Check if we have reached the limit
            if limit is not None and fetched_count >= limit:
                nextResults = False
                break

            if len(issue_records) < 100:
                nextResults = False
            else:
                params["page"] += 1
        return all_issues_records

from_url(url) classmethod

Resolve project url to owner and project name.

Returns:

Type Description
(str, str)

(owner, project)

Source code in osprojects/github_api.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@classmethod
def from_url(cls, url: str) -> (str, str):
    """Resolve project url to owner and project name.

    Returns:
        (owner, project)
    """
    # https://www.rfc-editor.org/rfc/rfc3986#appendix-B
    pattern = r"((https?:\/\/github\.com\/)|(git@github\.com:))(?P<owner>[^/?#]+)\/(?P<project_id>[^\./?#]+)(\.git)?"
    match = re.match(pattern=pattern, string=url)
    repo = None
    if match:
        owner = match.group("owner")
        project_id = match.group("project_id")
        if owner and project_id:
            repo = cls(owner=owner, project_id=project_id)
        else:
            pass
    else:
        pass
    return repo

osproject

Created on 2022-01-24.

@author: wf

Commit

Bases: object

A commit.

Source code in osprojects/osproject.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class Commit(object):
    """A commit."""

    @staticmethod
    def getSamples():
        samples = [
            {
                "host": "https://github.com/WolfgangFahl/pyOpenSourceProjects",
                "path": "",
                "project": "pyOpenSourceProjects",
                "subject": "Initial commit",
                "name": "GitHub",  # TicketSystem
                "date": datetime.datetime.fromisoformat("2022-01-24 07:02:55+01:00"),
                "hash": "106254f",
            }
        ]
        return samples

    def toWikiMarkup(self):
        """Returns Commit as wiki markup."""
        params = [
            f"{attr}={getattr(self, attr, '')}" for attr in self.getSamples()[0].keys()
        ]
        markup = f"{{{{commit|{'|'.join(params)}|storemode=subobject|viewmode=line}}}}"
        return markup

toWikiMarkup()

Returns Commit as wiki markup.

Source code in osprojects/osproject.py
82
83
84
85
86
87
88
def toWikiMarkup(self):
    """Returns Commit as wiki markup."""
    params = [
        f"{attr}={getattr(self, attr, '')}" for attr in self.getSamples()[0].keys()
    ]
    markup = f"{{{{commit|{'|'.join(params)}|storemode=subobject|viewmode=line}}}}"
    return markup

OsProject

A GitHub based opens source project.

Source code in osprojects/osproject.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
class OsProject:
    """A GitHub based opens source project."""

    def __init__(self, owner: str = None, project_id: str = None):
        self.repo_info = None  # might be fetched
        self.folder = None  # set for local projects
        if owner and project_id:
            self.repo = GitHubRepo(owner=owner, project_id=project_id)

    @classmethod
    def fromUrl(cls, url: str) -> "OsProject":
        """Init OsProject from given url."""
        if "github.com" in url:
            os_project = cls()
            os_project.repo = GitHubRepo.from_url(url)
        else:
            raise Exception(f"url '{url}' is not a github.com url ")
        return os_project

    @classmethod
    def fromRepo(cls):
        """Init OsProject from repo in current working directory."""
        url = subprocess.check_output(["git", "config", "--get", "remote.origin.url"])
        url = url.decode().strip("\n")
        repo = cls.fromUrl(url)
        return repo

    def getIssues(self, limit: int = None, **params) -> List[Ticket]:

        # Fetch the raw issue records using the new getIssueRecords method
        issue_records = self.repo.getIssueRecords(limit=limit, **params)

        issues = []
        for record in issue_records:
            tr = {
                "project": self.repo.project_id,
                "title": record.get("title"),
                "body": record.get("body", ""),
                "createdAt": (
                    parse(record.get("created_at")) if record.get("created_at") else ""
                ),
                "closedAt": (
                    parse(record.get("closed_at")) if record.get("closed_at") else ""
                ),
                "state": record.get("state"),
                "number": record.get("number"),
                "url": f"{self.projectUrl()}/issues/{record.get('number')}",
            }
            issues.append(Ticket.init_from_dict(**tr))

            # Check if we have reached the limit
            if limit is not None and len(issues) >= limit:
                break

        return issues

    def getAllTickets(
        self, limit: int = None, with_sort: bool = True
    ) -> Dict[int, Ticket]:
        """
        Get all Tickets of the project - closed and open ones

        Args:
            limit(int): if set, limit the number of tickets retrieved
            with_sort(bool): if True, sort the tickets by number in descending order

        Returns:
            Dict[int, Ticket]: A dictionary of tickets keyed by their number
        """
        tickets = self.getIssues(state="all", limit=limit)

        # Sort the tickets if with_sort is True
        if with_sort:
            tickets.sort(key=lambda r: getattr(r, "number"), reverse=True)

        # Convert the list of tickets into a dictionary keyed by the ticket number
        tickets_dict = {ticket.number: ticket for ticket in tickets}

        return tickets_dict

    def getComments(self, issue_number: int) -> List[dict]:
        """Fetch all comments for a specific issue number from GitHub."""
        comments_url = self.commentUrl(issue_number)
        response = self.get_response("fetch comments", comments_url)
        return response.json()

    def projectUrl(self):
        return self.repo.projectUrl()

    def commitUrl(self, commit_id: str):
        return f"{self.projectUrl()}/commit/{commit_id}"

    def commentUrl(self, issue_number: int):
        """Construct the URL for accessing comments of a specific issue."""
        return f"{self.repo.github.api_url}/repos/{self.repo.owner}/{self.repo.project_id}/issues/{issue_number}/comments"

    @property
    def project_id(self):
        return self.repo.project_id

    @property
    def owner(self):
        return self.repo.owner

    @property
    def title(self):
        return self.repo_info.get("name") or self.project_id

    @property
    def url(self):
        return (
            self.repo_info.get("html_url")
            or f"https://github.com/{self.repo.owner}/{self.project_id}"
        )

    @property
    def description(self):
        return self.repo_info.get("description") or ""

    @property
    def language(self):
        return self.repo_info.get("language") or "python"

    @property
    def created_at(self):
        created_at = self.repo_info.get("created_at")
        return (
            datetime.datetime.fromisoformat(created_at.rstrip("Z"))
            if created_at
            else None
        )

    @property
    def updated_at(self):
        updated_at = self.repo_info.get("updated_at")
        return (
            datetime.datetime.fromisoformat(updated_at.rstrip("Z"))
            if updated_at
            else None
        )

    @property
    def stars(self):
        return self.repo_info.get("stargazers_count")

    @property
    def forks(self):
        return self.repo_info.get("forks_count")

    @property
    def fqid(self):
        return f"{self.repo.owner}/{self.repo.project_id}"

    def __str__(self):
        return self.fqid

    @staticmethod
    def getSamples():
        samples = [
            {
                "project_id": "pyOpenSourceProjects",
                "owner": "WolfgangFahl",
                "title": "pyOpenSourceProjects",
                "url": "https://github.com/WolfgangFahl/pyOpenSourceProjects",
                "description": "Helper Library to organize open source Projects",
                "language": "Python",
                "created_at": datetime.datetime(year=2022, month=1, day=24),
                "updated_at": datetime.datetime(year=2022, month=1, day=24),
                "stars": 5,
                "forks": 2,
            }
        ]
        return samples

    def getCommits(self) -> List[Commit]:
        commits = []
        gitlogCmd = [
            "git",
            "--no-pager",
            "log",
            "--reverse",
            r'--pretty=format:{"name":"%cn","date":"%cI","hash":"%h"}',
        ]
        gitLogCommitSubject = ["git", "log", "--format=%s", "-n", "1"]
        rawCommitLogs = subprocess.check_output(gitlogCmd).decode()
        for rawLog in rawCommitLogs.split("\n"):
            log = json.loads(rawLog)
            if log.get("date", None) is not None:
                log["date"] = datetime.datetime.fromisoformat(log["date"])
            log["project"] = self.project_id
            log["host"] = self.projectUrl()
            log["path"] = ""
            log["subject"] = subprocess.check_output(
                [*gitLogCommitSubject, log["hash"]]
            )[
                :-1
            ].decode()  # seperate query to avoid json escaping issues
            commit = Commit()
            for k, v in log.items():
                setattr(commit, k, v)
            commits.append(commit)
        return commits

commentUrl(issue_number)

Construct the URL for accessing comments of a specific issue.

Source code in osprojects/osproject.py
388
389
390
def commentUrl(self, issue_number: int):
    """Construct the URL for accessing comments of a specific issue."""
    return f"{self.repo.github.api_url}/repos/{self.repo.owner}/{self.repo.project_id}/issues/{issue_number}/comments"

fromRepo() classmethod

Init OsProject from repo in current working directory.

Source code in osprojects/osproject.py
315
316
317
318
319
320
321
@classmethod
def fromRepo(cls):
    """Init OsProject from repo in current working directory."""
    url = subprocess.check_output(["git", "config", "--get", "remote.origin.url"])
    url = url.decode().strip("\n")
    repo = cls.fromUrl(url)
    return repo

fromUrl(url) classmethod

Init OsProject from given url.

Source code in osprojects/osproject.py
305
306
307
308
309
310
311
312
313
@classmethod
def fromUrl(cls, url: str) -> "OsProject":
    """Init OsProject from given url."""
    if "github.com" in url:
        os_project = cls()
        os_project.repo = GitHubRepo.from_url(url)
    else:
        raise Exception(f"url '{url}' is not a github.com url ")
    return os_project

getAllTickets(limit=None, with_sort=True)

Get all Tickets of the project - closed and open ones

Parameters:

Name Type Description Default
limit(int)

if set, limit the number of tickets retrieved

required
with_sort(bool)

if True, sort the tickets by number in descending order

required

Returns:

Type Description
Dict[int, Ticket]

Dict[int, Ticket]: A dictionary of tickets keyed by their number

Source code in osprojects/osproject.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def getAllTickets(
    self, limit: int = None, with_sort: bool = True
) -> Dict[int, Ticket]:
    """
    Get all Tickets of the project - closed and open ones

    Args:
        limit(int): if set, limit the number of tickets retrieved
        with_sort(bool): if True, sort the tickets by number in descending order

    Returns:
        Dict[int, Ticket]: A dictionary of tickets keyed by their number
    """
    tickets = self.getIssues(state="all", limit=limit)

    # Sort the tickets if with_sort is True
    if with_sort:
        tickets.sort(key=lambda r: getattr(r, "number"), reverse=True)

    # Convert the list of tickets into a dictionary keyed by the ticket number
    tickets_dict = {ticket.number: ticket for ticket in tickets}

    return tickets_dict

getComments(issue_number)

Fetch all comments for a specific issue number from GitHub.

Source code in osprojects/osproject.py
376
377
378
379
380
def getComments(self, issue_number: int) -> List[dict]:
    """Fetch all comments for a specific issue number from GitHub."""
    comments_url = self.commentUrl(issue_number)
    response = self.get_response("fetch comments", comments_url)
    return response.json()

OsProjects

A set of open source projects.

Source code in osprojects/osproject.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class OsProjects:
    """A set of open source projects."""

    def __init__(self):
        """constructor."""
        self.projects = {}
        self.projects_by_url = {}
        self.local_projects = {}
        self.selected_projects = {}
        self.owners = []
        self.github = GitHubApi.get_instance()

    def clear_selection(self):
        self.selected_projects = {}

    def add_selection(self, project):
        is_fork = project.repo_info["fork"]
        if not is_fork:
            self.selected_projects[project.projectUrl()] = project

    def select_projects(self, owners=None, project_id=None, local_only=False):
        """Select projects based on given criteria.

        Args:
            owners (Optional[list[str]]): The owners of the projects to select.
            project_id (Optional[str]): The ID of a specific project to select.
            local_only (bool): Whether to select only local projects.

        Returns:
            Dict[str, OsProject]: A dictionary of selected projects.

        Raises:
            ValueError: If owner or local_only flag is not specified with project_id.
        """
        if project_id:
            if owners:
                for owner in owners:
                    key = f"https://github.com/{owner}/{project_id}"
                    project = self.projects_by_url.get(key)
                    if project:
                        self.add_selection(project)
            elif local_only:
                for _url, project in self.local_projects.items():
                    if project.project_id == project_id:
                        self.add_selection(project)
            else:
                raise ValueError(
                    "Owner or local_only flag must be specified with project_id"
                )

        elif owners:
            for owner in owners:
                if owner in self.projects:
                    for project in self.projects[owner].values():
                        self.add_selection(project)
        elif local_only:
            for project in self.local_projects.values():
                self.add_selection(project)
        else:
            for project in self.projects_by_url.values():
                self.add_selection(project)

        return self.selected_projects

    def filter_projects(self, language=None, local_only=False):
        """Filter the selected projects based on language and locality.

        Args:
            language (str, optional): The programming language to filter by.
            local_only (bool, optional): If True, only return local projects.

        Returns:
            Dict[str, OsProject]: The filtered projects.
        """
        filtered_projects = {}

        for url, project in self.selected_projects.items():
            include_project = True

            if language and project.language != language:
                include_project = False

            if local_only and not project.folder:
                include_project = False

            if include_project:
                filtered_projects[url] = project

        self.selected_projects = filtered_projects
        return self.selected_projects

    def add_projects_of_owner(self, owner: str, cache_expiry: int = 300):
        """Add the projects of the given owner."""
        if not owner in self.projects:
            self.projects[owner] = {}
            repo_infos = self.github.repos_for_owner(owner, cache_expiry)
            for repo_info in repo_infos:
                project_id = repo_info["name"]
                os_project = OsProject(owner=owner, project_id=project_id)
                os_project.repo_info = repo_info
                self.projects[owner][project_id] = os_project
                self.projects_by_url[os_project.projectUrl()] = os_project
        else:
            # owner already known
            pass

    @classmethod
    def from_owners(cls, owners: list[str]):
        osp = cls()
        for owner in owners:
            osp.add_projects_of_owner(owner)
        return osp

    @classmethod
    def get_project_url_from_git_config(cls, project_path: str) -> Optional[str]:
        """Get the project URL from the git config file.

        Args:
            project_path (str): The path to the project directory.

        Returns:
            Optional[str]: The project URL if found, None otherwise.
        """
        config_path = os.path.join(project_path, ".git", "config")
        if not os.path.exists(config_path):
            return None

        config = configparser.ConfigParser()
        config.read(config_path)

        if 'remote "origin"' not in config:
            return None

        url = config['remote "origin"']["url"]
        # remove trailing / if any
        url = url.rstrip("/")
        return url

    @classmethod
    def from_folder(cls, folder_path: str, with_progress: bool = False) -> "OsProjects":
        """Collect all github projects from the given folders.

        Args:
            folder_path (str): The path to the folder containing projects.
            with_progress (bool): Whether to display a progress bar. Defaults to True.

        Returns:
            OsProjects: An instance of OsProjects with collected projects.
        """
        osp = cls()
        owners, repos_by_folder = cls.github_repos_of_folder(folder_path)

        def process_owners(owners_iterable: Iterable[str]):
            for owner in owners_iterable:
                osp.add_projects_of_owner(owner)

        if with_progress:
            process_owners(tqdm(owners, desc="Processing owners"))
        else:
            process_owners(owners)

        for folder, repo in repos_by_folder.items():
            project_url = repo.projectUrl()
            if project_url not in osp.projects_by_url:
                logging.warning(f"{project_url} not found in projects_by_url")
            else:
                local_project = osp.projects_by_url[project_url]
                local_project.folder = folder
                osp.local_projects[project_url] = local_project

        return osp

    @classmethod
    def github_repos_of_folder(
        cls, folder_path: str
    ) -> Tuple[Set[str], Dict[str, GitHubRepo]]:
        """Collect GitHub repositories from a given folder.

        Args:
            folder_path (str): The path to the folder to search for repositories.

        Returns:
            Tuple[Set[str], Dict[str, GitHubRepo]]: A tuple containing a set of owners
            and a dictionary of repositories keyed by folder path.
        """
        all_folders = []
        repos_by_folder: Dict[str, GitHubRepo] = {}
        owners: Set[str] = set()

        for d in os.listdir(folder_path):
            sub_folder = os.path.join(folder_path, d)
            if os.path.isdir(sub_folder):
                all_folders.append(sub_folder)

        for folder in all_folders:
            project_url = cls.get_project_url_from_git_config(folder)
            if project_url:
                github_repo = GitHubRepo.from_url(project_url)
                if github_repo:
                    repos_by_folder[folder] = github_repo
                    owners.add(github_repo.owner)

        return owners, repos_by_folder

__init__()

constructor.

Source code in osprojects/osproject.py
 94
 95
 96
 97
 98
 99
100
101
def __init__(self):
    """constructor."""
    self.projects = {}
    self.projects_by_url = {}
    self.local_projects = {}
    self.selected_projects = {}
    self.owners = []
    self.github = GitHubApi.get_instance()

add_projects_of_owner(owner, cache_expiry=300)

Add the projects of the given owner.

Source code in osprojects/osproject.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def add_projects_of_owner(self, owner: str, cache_expiry: int = 300):
    """Add the projects of the given owner."""
    if not owner in self.projects:
        self.projects[owner] = {}
        repo_infos = self.github.repos_for_owner(owner, cache_expiry)
        for repo_info in repo_infos:
            project_id = repo_info["name"]
            os_project = OsProject(owner=owner, project_id=project_id)
            os_project.repo_info = repo_info
            self.projects[owner][project_id] = os_project
            self.projects_by_url[os_project.projectUrl()] = os_project
    else:
        # owner already known
        pass

filter_projects(language=None, local_only=False)

Filter the selected projects based on language and locality.

Parameters:

Name Type Description Default
language str

The programming language to filter by.

None
local_only bool

If True, only return local projects.

False

Returns:

Type Description

Dict[str, OsProject]: The filtered projects.

Source code in osprojects/osproject.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def filter_projects(self, language=None, local_only=False):
    """Filter the selected projects based on language and locality.

    Args:
        language (str, optional): The programming language to filter by.
        local_only (bool, optional): If True, only return local projects.

    Returns:
        Dict[str, OsProject]: The filtered projects.
    """
    filtered_projects = {}

    for url, project in self.selected_projects.items():
        include_project = True

        if language and project.language != language:
            include_project = False

        if local_only and not project.folder:
            include_project = False

        if include_project:
            filtered_projects[url] = project

    self.selected_projects = filtered_projects
    return self.selected_projects

from_folder(folder_path, with_progress=False) classmethod

Collect all github projects from the given folders.

Parameters:

Name Type Description Default
folder_path str

The path to the folder containing projects.

required
with_progress bool

Whether to display a progress bar. Defaults to True.

False

Returns:

Name Type Description
OsProjects OsProjects

An instance of OsProjects with collected projects.

Source code in osprojects/osproject.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
@classmethod
def from_folder(cls, folder_path: str, with_progress: bool = False) -> "OsProjects":
    """Collect all github projects from the given folders.

    Args:
        folder_path (str): The path to the folder containing projects.
        with_progress (bool): Whether to display a progress bar. Defaults to True.

    Returns:
        OsProjects: An instance of OsProjects with collected projects.
    """
    osp = cls()
    owners, repos_by_folder = cls.github_repos_of_folder(folder_path)

    def process_owners(owners_iterable: Iterable[str]):
        for owner in owners_iterable:
            osp.add_projects_of_owner(owner)

    if with_progress:
        process_owners(tqdm(owners, desc="Processing owners"))
    else:
        process_owners(owners)

    for folder, repo in repos_by_folder.items():
        project_url = repo.projectUrl()
        if project_url not in osp.projects_by_url:
            logging.warning(f"{project_url} not found in projects_by_url")
        else:
            local_project = osp.projects_by_url[project_url]
            local_project.folder = folder
            osp.local_projects[project_url] = local_project

    return osp

get_project_url_from_git_config(project_path) classmethod

Get the project URL from the git config file.

Parameters:

Name Type Description Default
project_path str

The path to the project directory.

required

Returns:

Type Description
Optional[str]

Optional[str]: The project URL if found, None otherwise.

Source code in osprojects/osproject.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@classmethod
def get_project_url_from_git_config(cls, project_path: str) -> Optional[str]:
    """Get the project URL from the git config file.

    Args:
        project_path (str): The path to the project directory.

    Returns:
        Optional[str]: The project URL if found, None otherwise.
    """
    config_path = os.path.join(project_path, ".git", "config")
    if not os.path.exists(config_path):
        return None

    config = configparser.ConfigParser()
    config.read(config_path)

    if 'remote "origin"' not in config:
        return None

    url = config['remote "origin"']["url"]
    # remove trailing / if any
    url = url.rstrip("/")
    return url

github_repos_of_folder(folder_path) classmethod

Collect GitHub repositories from a given folder.

Parameters:

Name Type Description Default
folder_path str

The path to the folder to search for repositories.

required

Returns:

Type Description
Set[str]

Tuple[Set[str], Dict[str, GitHubRepo]]: A tuple containing a set of owners

Dict[str, GitHubRepo]

and a dictionary of repositories keyed by folder path.

Source code in osprojects/osproject.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
@classmethod
def github_repos_of_folder(
    cls, folder_path: str
) -> Tuple[Set[str], Dict[str, GitHubRepo]]:
    """Collect GitHub repositories from a given folder.

    Args:
        folder_path (str): The path to the folder to search for repositories.

    Returns:
        Tuple[Set[str], Dict[str, GitHubRepo]]: A tuple containing a set of owners
        and a dictionary of repositories keyed by folder path.
    """
    all_folders = []
    repos_by_folder: Dict[str, GitHubRepo] = {}
    owners: Set[str] = set()

    for d in os.listdir(folder_path):
        sub_folder = os.path.join(folder_path, d)
        if os.path.isdir(sub_folder):
            all_folders.append(sub_folder)

    for folder in all_folders:
        project_url = cls.get_project_url_from_git_config(folder)
        if project_url:
            github_repo = GitHubRepo.from_url(project_url)
            if github_repo:
                repos_by_folder[folder] = github_repo
                owners.add(github_repo.owner)

    return owners, repos_by_folder

select_projects(owners=None, project_id=None, local_only=False)

Select projects based on given criteria.

Parameters:

Name Type Description Default
owners Optional[list[str]]

The owners of the projects to select.

None
project_id Optional[str]

The ID of a specific project to select.

None
local_only bool

Whether to select only local projects.

False

Returns:

Type Description

Dict[str, OsProject]: A dictionary of selected projects.

Raises:

Type Description
ValueError

If owner or local_only flag is not specified with project_id.

Source code in osprojects/osproject.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def select_projects(self, owners=None, project_id=None, local_only=False):
    """Select projects based on given criteria.

    Args:
        owners (Optional[list[str]]): The owners of the projects to select.
        project_id (Optional[str]): The ID of a specific project to select.
        local_only (bool): Whether to select only local projects.

    Returns:
        Dict[str, OsProject]: A dictionary of selected projects.

    Raises:
        ValueError: If owner or local_only flag is not specified with project_id.
    """
    if project_id:
        if owners:
            for owner in owners:
                key = f"https://github.com/{owner}/{project_id}"
                project = self.projects_by_url.get(key)
                if project:
                    self.add_selection(project)
        elif local_only:
            for _url, project in self.local_projects.items():
                if project.project_id == project_id:
                    self.add_selection(project)
        else:
            raise ValueError(
                "Owner or local_only flag must be specified with project_id"
            )

    elif owners:
        for owner in owners:
            if owner in self.projects:
                for project in self.projects[owner].values():
                    self.add_selection(project)
    elif local_only:
        for project in self.local_projects.values():
            self.add_selection(project)
    else:
        for project in self.projects_by_url.values():
            self.add_selection(project)

    return self.selected_projects

Ticket

Bases: object

A Ticket.

Source code in osprojects/osproject.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class Ticket(object):
    """A Ticket."""

    @staticmethod
    def getSamples():
        samples = [
            {
                "number": 2,
                "title": "Get Tickets in Wiki notation from github API",
                "createdAt": datetime.datetime.fromisoformat(
                    "2022-01-24 07:41:29+00:00"
                ),
                "closedAt": datetime.datetime.fromisoformat(
                    "2022-01-25 07:43:04+00:00"
                ),
                "url": "https://github.com/WolfgangFahl/pyOpenSourceProjects/issues/2",
                "project": "pyOpenSourceProjects",
                "state": "closed",
            }
        ]
        return samples

    @classmethod
    def init_from_dict(cls, **records):
        """Inits Ticket from given args."""
        issue = Ticket()
        for k, v in records.items():
            setattr(issue, k, v)
        return issue

    def toWikiMarkup(self) -> str:
        """Returns Ticket in wiki markup."""
        return f"""# {{{{Ticket
|number={self.number}
|title={self.title}
|project={self.project}
|createdAt={self.createdAt if self.createdAt else ""}
|closedAt={self.closedAt if self.closedAt else ""}
|state={self.state}
}}}}"""

init_from_dict(**records) classmethod

Inits Ticket from given args.

Source code in osprojects/osproject.py
44
45
46
47
48
49
50
@classmethod
def init_from_dict(cls, **records):
    """Inits Ticket from given args."""
    issue = Ticket()
    for k, v in records.items():
        setattr(issue, k, v)
    return issue

toWikiMarkup()

Returns Ticket in wiki markup.

Source code in osprojects/osproject.py
52
53
54
55
56
57
58
59
60
61
    def toWikiMarkup(self) -> str:
        """Returns Ticket in wiki markup."""
        return f"""# {{{{Ticket
|number={self.number}
|title={self.title}
|project={self.project}
|createdAt={self.createdAt if self.createdAt else ""}
|closedAt={self.closedAt if self.closedAt else ""}
|state={self.state}
}}}}"""

gitlog2wiki(_argv=None)

Cmdline interface to get gitlog entries in wiki markup.

Source code in osprojects/osproject.py
500
501
502
503
504
505
506
507
508
def gitlog2wiki(_argv=None):
    """Cmdline interface to get gitlog entries in wiki markup."""
    parser = argparse.ArgumentParser(description="gitlog2wiki")
    if _argv:
        _args = parser.parse_args(args=_argv)

    osProject = OsProject.fromRepo()
    commits = osProject.getCommits()
    print("\n".join([c.toWikiMarkup() for c in commits]))

main(_argv=None)

Main command line entry point.

Source code in osprojects/osproject.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def main(_argv=None):
    """Main command line entry point."""
    parser = argparse.ArgumentParser(description="Issue2ticket")
    parser.add_argument("-o", "--owner", help="project owner")
    parser.add_argument("-p", "--project", help="name of the project")
    parser.add_argument(
        "--repo",
        action="store_true",
        help="get needed information form repository of current location",
    )
    parser.add_argument(
        "-s",
        "--state",
        choices=["open", "closed", "all"],
        default="all",
        help="only issues with the given state",
    )
    parser.add_argument("-V", "--version", action="version", version="gitlog2wiki 0.1")

    args = parser.parse_args(args=_argv)
    if args.project and args.owner:
        osProject = OsProject(
            owner=args.owner,
            project_id=args.project,
        )
    else:
        osProject = OsProject.fromRepo()
    tickets = osProject.getIssues(state=args.state)
    print("\n".join([t.toWikiMarkup() for t in tickets]))